Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
453 views
in Technique[技术] by (71.8m points)

Stream from kafka to kafka with Spark Structured Streaming in scala

I am trying to run simple variations of examples from official spark tutorial and a book "spark streaming in action".
The content of exceptions are strange. What is wrong with my code?

First of all I start kafka zookeeper, server, producer and 2 consumers. Then I run following code:

// read from kafka
val df = sparkService.sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", topic1)
    .load()

// write to kafka
import sparkService.sparkSession.implicits._

val query = df.selectExpr("CAST(key as STRING)", "CAST(value as STRING)")
    .writeStream
    .outputMode(OutputMode.Append())
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", topic2)
    .option("checkpointLocation", "/home/pt/Dokumenty/tmp/")
    .option("failOnDataLoss", "false") // only when testing
    .start()

query.awaitTermination(30000)

Error occurs on writting to kafka:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1 1609627750463

question from:https://stackoverflow.com/questions/65646194/stream-from-kafka-to-kafka-with-spark-structured-streaming-in-scala

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...