Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
196 views
in Technique[技术] by (71.8m points)

Writing Spark DataFrame to Kafka is ignoring the partition column and kafka.partitioner.class

I am trying to write a Spark DF (batch DF) to Kafka and i need to write the data to specific partitions.

I tried the following code

myDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
  .option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
  .option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
  .option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
  .option("kafka.partitioner.class", "util.MyCustomPartitioner")
  .option("topic",kafkaProps.getTopicName)
  .save()

And the Schema of the DF i am writing is

+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+

I had to repartition (to 1 partition) the "myDF" since i need to order the data based on date column.

It is writing the data to a Single partition but not the one that is in the DF's "partition" column or the one returned by the Custom Partitioner (which is same as the value in the partition column).

Thanks Sateesh

question from:https://stackoverflow.com/questions/65921675/writing-spark-dataframe-to-kafka-is-ignoring-the-partition-column-and-kafka-part

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The feature to use the column "partition" in your Dataframe is only available with version 3.x and not earlier according to the 2.4.7 docs

However, using the option kafka.partitioner.class will still work. Spark Structured Streaming allows you to use plain KafkaConsumer configuration when using the prefix kafka., so this will also work on version 2.4.4.

Below code runs fine with Spark 3.0.1 and Confluent community edition 5.5.0. On Spark 2.4.4, the "partition" column does not have any impact, but my custom partitioner class applies.

case class KafkaRecord(partition: Int, value: String)

val spark = SparkSession.builder()
  .appName("test")
  .master("local[*]")
  .getOrCreate()


// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .save()

What you then see in the console-consumer:

# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice

and

# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob

Also getting the same results when using a custom Partitioner

.option("kafka.partitioner.class", "org.test.CustomPartitioner")

where my custom Partitioner is defined as

package org.test
class CustomPartitioner extends Partitioner {

  override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
    if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
      0
    } else {
      1
    }
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...