Spark Streaming - read and write on Kafka topic
I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach
case x: String =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
)
)
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
scala apache-spark streaming spark-streaming apache-kafka
add a comment |
I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach
case x: String =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
)
)
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
scala apache-spark streaming spark-streaming apache-kafka
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30
add a comment |
I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach
case x: String =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
)
)
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
scala apache-spark streaming spark-streaming apache-kafka
I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach
case x: String =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
)
)
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
scala apache-spark streaming spark-streaming apache-kafka
scala apache-spark streaming spark-streaming apache-kafka
edited Nov 11 at 15:38
mrsrinivas
15k76487
15k76487
asked Jul 23 '15 at 14:39
Chobeat
1,88442748
1,88442748
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30
add a comment |
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30
add a comment |
7 Answers
7
active
oldest
votes
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
add a comment |
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
Hope this helps.
If I may ask, how to implement this idea in Python, especially thelazy
part?
– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
add a comment |
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
add a comment |
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
He uses a wrapper that lazily creates the producer:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
add a comment |
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).
(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
add a comment |
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it's accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
add a comment |
This might be what you want to do. You basically create one producer for each partition of records.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
Hope that helps
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f31590592%2fspark-streaming-read-and-write-on-kafka-topic%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
7 Answers
7
active
oldest
votes
7 Answers
7
active
oldest
votes
active
oldest
votes
active
oldest
votes
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
add a comment |
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
add a comment |
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
edited Mar 1 '16 at 15:47
answered Jul 23 '15 at 15:19
Marius Soutier
9,90812743
9,90812743
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
add a comment |
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?
– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
What was the issue you ran into with regards to checkpointing?
– Michael G. Noll
Sep 16 '16 at 19:39
3
3
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?– CᴴᴀZ
Feb 10 '17 at 8:39
foreachPartition
will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?– CᴴᴀZ
Feb 10 '17 at 8:39
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
Please include the content of the link(s) so that when they break your answer still has value.
– Danny Varod
Apr 17 at 8:49
add a comment |
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
Hope this helps.
If I may ask, how to implement this idea in Python, especially thelazy
part?
– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
add a comment |
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
Hope this helps.
If I may ask, how to implement this idea in Python, especially thelazy
part?
– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
add a comment |
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
Hope this helps.
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
Hope this helps.
answered Sep 16 '16 at 19:56
Michael G. Noll
7,6002642
7,6002642
If I may ask, how to implement this idea in Python, especially thelazy
part?
– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
add a comment |
If I may ask, how to implement this idea in Python, especially thelazy
part?
– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
If I may ask, how to implement this idea in Python, especially the
lazy
part?– avocado
Sep 27 '17 at 6:36
If I may ask, how to implement this idea in Python, especially the
lazy
part?– avocado
Sep 27 '17 at 6:36
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
done, here is the link, stackoverflow.com/q/46464003/2235936
– avocado
Sep 28 '17 at 8:05
add a comment |
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
add a comment |
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
add a comment |
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.
The Writer can be found here: https://github.com/cloudera/spark-kafka-writer
answered Jul 23 '15 at 23:31
maasg
30.2k870101
30.2k870101
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
add a comment |
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
2
2
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer
– Mekal
Sep 6 '16 at 3:36
1
1
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)
– Michael G. Noll
Sep 16 '16 at 19:59
add a comment |
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
He uses a wrapper that lazily creates the producer:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
add a comment |
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
He uses a wrapper that lazily creates the producer:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
add a comment |
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
He uses a wrapper that lazily creates the producer:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
I was having the same issue and found this post.
The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
He uses a wrapper that lazily creates the producer:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
edited Jan 31 '17 at 20:52
cricket_007
79.1k1142109
79.1k1142109
answered Sep 7 '16 at 8:50
gcaliari
9615
9615
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
add a comment |
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).
– Ra41P
Jun 1 at 6:29
add a comment |
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).
(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
add a comment |
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).
(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
add a comment |
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).
(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext
until it becomes acceptable (obv. there's a latency cost to doing this).
(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)
answered Jul 23 '15 at 15:53
lmm
13.5k22034
13.5k22034
add a comment |
add a comment |
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it's accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
add a comment |
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it's accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
add a comment |
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it's accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it's accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
edited Dec 10 at 8:58
answered Nov 11 at 15:26
mrsrinivas
15k76487
15k76487
add a comment |
add a comment |
This might be what you want to do. You basically create one producer for each partition of records.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
Hope that helps
add a comment |
This might be what you want to do. You basically create one producer for each partition of records.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
Hope that helps
add a comment |
This might be what you want to do. You basically create one producer for each partition of records.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
Hope that helps
This might be what you want to do. You basically create one producer for each partition of records.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
Hope that helps
answered Jul 24 '15 at 3:59
sainath reddy
629
629
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f31590592%2fspark-streaming-read-and-write-on-kafka-topic%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Spark 2.2 and above - Both read and write operations on Kafka possible
– mrsrinivas
Nov 11 at 15:30