Receiving empty data from Kafka - Spark Streaming [SOLVED] -
why getting empty data messages when read topic kafka? problem decoder?
*there no error or exception.
thank you.
code:
def main(args: array[string]) { val sparkconf = new sparkconf().setappname("queue status") val ssc = new streamingcontext(sparkconf, seconds(1)) ssc.checkpoint("/tmp/") val kafkaconfig = map("zookeeper.connect" -> "ip.internal:2181", "group.id" -> "queue-status") val kafkatopics = map("queue_status" -> 1) val kafkastream = kafkautils.createstream[string, queuestatusmessage, stringdecoder, queuestatusmessagekafkadeserializer]( ssc, kafkaconfig, kafkatopics, storagelevel.memory_and_disk) kafkastream.window(minutes(1),seconds(10)).print() ssc.start() ssc.awaittermination() }
the kafka decoder:
class queuestatusmessagekafkadeserializer(props: verifiableproperties = null) extends decoder[queuestatusmessage] { override def frombytes(bytes: array[byte]): queuestatusmessage = queuestatusmessage.parsefrom(bytes) }
the (empty) result:
------------------------------------------- time: 1440010266000 ms ------------------------------------------- (null,queuestatusmessage(,,0,none,none)) (null,queuestatusmessage(,,0,none,none)) (null,queuestatusmessage(,,0,none,none)) (null,queuestatusmessage(,,0,none,none))
solution:
just strictly specified types in kafka topic map:
val kafkatopics = map[string, int]("queue_status" -> 1)
still don't know reason problem, code working fine now.
Comments
Post a Comment