The feature to use the column "partition" in your Dataframe is only available with version 3.x and not earlier according to the 2.4.7 docs
However, using the option kafka.partitioner.class
will still work. Spark Structured Streaming allows you to use plain KafkaConsumer configuration when using the prefix kafka.
, so this will also work on version 2.4.4.
Below code runs fine with Spark 3.0.1 and Confluent community edition 5.5.0. On Spark 2.4.4, the "partition" column does not have any impact, but my custom partitioner class applies.
case class KafkaRecord(partition: Int, value: String)
val spark = SparkSession.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
What you then see in the console-consumer:
# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice
and
# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob
Also getting the same results when using a custom Partitioner
.option("kafka.partitioner.class", "org.test.CustomPartitioner")
where my custom Partitioner is defined as
package org.test
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
0
} else {
1
}
}
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…