Spark Streaming - read and write on Kafka topic












29















I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:



input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)


and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.



I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?










share|improve this question

























  • Spark 2.2 and above - Both read and write operations on Kafka possible

    – mrsrinivas
    Nov 11 '18 at 15:30


















29















I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:



input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)


and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.



I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?










share|improve this question

























  • Spark 2.2 and above - Both read and write operations on Kafka possible

    – mrsrinivas
    Nov 11 '18 at 15:30
















29












29








29


26






I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:



input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)


and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.



I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?










share|improve this question
















I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:



input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)


and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.



I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?







scala apache-kafka spark-streaming spark-streaming-kafka






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 23 at 8:44









user3190018

381415




381415










asked Jul 23 '15 at 14:39









ChobeatChobeat

1,89442848




1,89442848













  • Spark 2.2 and above - Both read and write operations on Kafka possible

    – mrsrinivas
    Nov 11 '18 at 15:30





















  • Spark 2.2 and above - Both read and write operations on Kafka possible

    – mrsrinivas
    Nov 11 '18 at 15:30



















Spark 2.2 and above - Both read and write operations on Kafka possible

– mrsrinivas
Nov 11 '18 at 15:30







Spark 2.2 and above - Both read and write operations on Kafka possible

– mrsrinivas
Nov 11 '18 at 15:30














7 Answers
7






active

oldest

votes


















18














My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).



Another option is to use an object pool as illustrated in this example:



https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala



I however found it hard to implement when using checkpointing.



Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):



http://allegro.tech/2015/08/spark-kafka-integration.html






share|improve this answer


























  • What was the issue you ran into with regards to checkpointing?

    – Michael G. Noll
    Sep 16 '16 at 19:39






  • 3





    foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

    – CᴴᴀZ
    Feb 10 '17 at 8:39













  • Please include the content of the link(s) so that when they break your answer still has value.

    – Danny Varod
    Apr 17 '18 at 8:49



















26














Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.



I'd suggest the following approach:




  • Use (and re-use) one KafkaProducer instance per executor process/JVM.


Here's the high-level setup for this approach:




  1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.

  2. You "ship" the wrapped producer to each executor by using a broadcast variable.

  3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.


The code snippets below work with Spark Streaming as of Spark 2.0.



Step 1: Wrapping KafkaProducer



import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()

def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))

def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

import scala.collection.JavaConversions._

def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)

sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}

producer
}
new MySparkKafkaProducer(createProducerFunc)
}

def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}


Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance



import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}


Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)



import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}


Hope this helps.






share|improve this answer
























  • If I may ask, how to implement this idea in Python, especially the lazy part?

    – avocado
    Sep 27 '17 at 6:36











  • done, here is the link, stackoverflow.com/q/46464003/2235936

    – avocado
    Sep 28 '17 at 8:05



















8














There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.



The Writer can be found here: https://github.com/cloudera/spark-kafka-writer






share|improve this answer



















  • 2





    404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

    – Mekal
    Sep 6 '16 at 3:36








  • 1





    Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

    – Michael G. Noll
    Sep 16 '16 at 19:59



















7














I was having the same issue and found this post.



The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.



    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))


He uses a wrapper that lazily creates the producer:



    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

lazy val producer = createProducer()

def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}


object KafkaSink {
def apply(config: Map[String, Object]): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)

sys.addShutdownHook {
producer.close()
}

producer
}
new KafkaSink(f)
}
}


The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:



    dstream.foreachRDD { rdd =>
rdd.foreach { message =>
kafkaSink.value.send("topicName", message)
}
}





share|improve this answer


























  • What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

    – Ra41P
    Jun 1 '18 at 6:29



















3














Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext until it becomes acceptable (obv. there's a latency cost to doing this).



(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)






share|improve this answer































    3















    With Spark >= 2.2



    Both read and write operations are possible on Kafka using Structured Streaming API




    Build stream from Kafka topic



    // Subscribe to a topic and read messages from the earliest to latest offsets
    val ds= spark
    .readStream // use `read` for batch, like DataFrame
    .format("kafka")
    .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
    .option("subscribe", "source-topic1")
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .load()


    Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.



    val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .as[(String, String)]


    Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.



    Write stream to Kafka topic



    dsStruc
    .writeStream // use `write` for batch, like DataFrame
    .format("kafka")
    .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
    .option("topic", "target-topic1")
    .start()


    More configuration for Kafka integration to read or write



    Key artifacts to add in the application



     "org.apache.spark" % "spark-core_2.11" % 2.2.0,
    "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
    "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,





    share|improve this answer


























    • Any reason for downvote ?

      – mrsrinivas
      Jan 23 at 15:27



















    2














    This might be what you want to do. You basically create one producer for each partition of records.



    input.foreachRDD(rdd =>
    rdd.foreachPartition(
    partitionOfRecords =>
    {
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String,String](props)

    partitionOfRecords.foreach
    {
    case x:String=>{
    println(x)

    val message=new ProducerRecord[String, String]("output",null,x)
    producer.send(message)
    }
    }
    })
    )


    Hope that helps






    share|improve this answer























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f31590592%2fspark-streaming-read-and-write-on-kafka-topic%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      7 Answers
      7






      active

      oldest

      votes








      7 Answers
      7






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      18














      My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).



      Another option is to use an object pool as illustrated in this example:



      https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala



      I however found it hard to implement when using checkpointing.



      Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):



      http://allegro.tech/2015/08/spark-kafka-integration.html






      share|improve this answer


























      • What was the issue you ran into with regards to checkpointing?

        – Michael G. Noll
        Sep 16 '16 at 19:39






      • 3





        foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

        – CᴴᴀZ
        Feb 10 '17 at 8:39













      • Please include the content of the link(s) so that when they break your answer still has value.

        – Danny Varod
        Apr 17 '18 at 8:49
















      18














      My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).



      Another option is to use an object pool as illustrated in this example:



      https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala



      I however found it hard to implement when using checkpointing.



      Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):



      http://allegro.tech/2015/08/spark-kafka-integration.html






      share|improve this answer


























      • What was the issue you ran into with regards to checkpointing?

        – Michael G. Noll
        Sep 16 '16 at 19:39






      • 3





        foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

        – CᴴᴀZ
        Feb 10 '17 at 8:39













      • Please include the content of the link(s) so that when they break your answer still has value.

        – Danny Varod
        Apr 17 '18 at 8:49














      18












      18








      18







      My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).



      Another option is to use an object pool as illustrated in this example:



      https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala



      I however found it hard to implement when using checkpointing.



      Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):



      http://allegro.tech/2015/08/spark-kafka-integration.html






      share|improve this answer















      My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).



      Another option is to use an object pool as illustrated in this example:



      https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala



      I however found it hard to implement when using checkpointing.



      Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):



      http://allegro.tech/2015/08/spark-kafka-integration.html







      share|improve this answer














      share|improve this answer



      share|improve this answer








      edited Mar 1 '16 at 15:47

























      answered Jul 23 '15 at 15:19









      Marius SoutierMarius Soutier

      10k13045




      10k13045













      • What was the issue you ran into with regards to checkpointing?

        – Michael G. Noll
        Sep 16 '16 at 19:39






      • 3





        foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

        – CᴴᴀZ
        Feb 10 '17 at 8:39













      • Please include the content of the link(s) so that when they break your answer still has value.

        – Danny Varod
        Apr 17 '18 at 8:49



















      • What was the issue you ran into with regards to checkpointing?

        – Michael G. Noll
        Sep 16 '16 at 19:39






      • 3





        foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

        – CᴴᴀZ
        Feb 10 '17 at 8:39













      • Please include the content of the link(s) so that when they break your answer still has value.

        – Danny Varod
        Apr 17 '18 at 8:49

















      What was the issue you ran into with regards to checkpointing?

      – Michael G. Noll
      Sep 16 '16 at 19:39





      What was the issue you ran into with regards to checkpointing?

      – Michael G. Noll
      Sep 16 '16 at 19:39




      3




      3





      foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

      – CᴴᴀZ
      Feb 10 '17 at 8:39







      foreachPartition will be good if we are working with fixed number of RDDs, but in Spark Streaming (where we have micro-batches) RDDs are created eternally and so does partitions. How to circumvent this in Spark Streaming?

      – CᴴᴀZ
      Feb 10 '17 at 8:39















      Please include the content of the link(s) so that when they break your answer still has value.

      – Danny Varod
      Apr 17 '18 at 8:49





      Please include the content of the link(s) so that when they break your answer still has value.

      – Danny Varod
      Apr 17 '18 at 8:49













      26














      Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.



      I'd suggest the following approach:




      • Use (and re-use) one KafkaProducer instance per executor process/JVM.


      Here's the high-level setup for this approach:




      1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.

      2. You "ship" the wrapped producer to each executor by using a broadcast variable.

      3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.


      The code snippets below work with Spark Streaming as of Spark 2.0.



      Step 1: Wrapping KafkaProducer



      import java.util.concurrent.Future

      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

      class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

      /* This is the key idea that allows us to work around running into
      NotSerializableExceptions. */
      lazy val producer = createProducer()

      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, key, value))

      def send(topic: String, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, value))

      }

      object MySparkKafkaProducer {

      import scala.collection.JavaConversions._

      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
      val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
      // Ensure that, on executor JVM shutdown, the Kafka producer sends
      // any buffered messages to Kafka before shutting down.
      producer.close()
      }

      producer
      }
      new MySparkKafkaProducer(createProducerFunc)
      }

      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

      }


      Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance



      import org.apache.kafka.clients.producer.ProducerConfig

      val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
      }

      ssc.checkpoint("checkpoint-directory")

      val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
      val p = new Properties()
      p.setProperty("bootstrap.servers", "broker1:9092")
      p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
      }


      Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)



      import java.util.concurrent.Future
      import org.apache.kafka.clients.producer.RecordMetadata

      val stream: DStream[String] = ???
      stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
      val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
      }.toStream
      metadata.foreach { metadata => metadata.get() }
      }
      }


      Hope this helps.






      share|improve this answer
























      • If I may ask, how to implement this idea in Python, especially the lazy part?

        – avocado
        Sep 27 '17 at 6:36











      • done, here is the link, stackoverflow.com/q/46464003/2235936

        – avocado
        Sep 28 '17 at 8:05
















      26














      Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.



      I'd suggest the following approach:




      • Use (and re-use) one KafkaProducer instance per executor process/JVM.


      Here's the high-level setup for this approach:




      1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.

      2. You "ship" the wrapped producer to each executor by using a broadcast variable.

      3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.


      The code snippets below work with Spark Streaming as of Spark 2.0.



      Step 1: Wrapping KafkaProducer



      import java.util.concurrent.Future

      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

      class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

      /* This is the key idea that allows us to work around running into
      NotSerializableExceptions. */
      lazy val producer = createProducer()

      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, key, value))

      def send(topic: String, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, value))

      }

      object MySparkKafkaProducer {

      import scala.collection.JavaConversions._

      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
      val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
      // Ensure that, on executor JVM shutdown, the Kafka producer sends
      // any buffered messages to Kafka before shutting down.
      producer.close()
      }

      producer
      }
      new MySparkKafkaProducer(createProducerFunc)
      }

      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

      }


      Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance



      import org.apache.kafka.clients.producer.ProducerConfig

      val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
      }

      ssc.checkpoint("checkpoint-directory")

      val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
      val p = new Properties()
      p.setProperty("bootstrap.servers", "broker1:9092")
      p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
      }


      Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)



      import java.util.concurrent.Future
      import org.apache.kafka.clients.producer.RecordMetadata

      val stream: DStream[String] = ???
      stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
      val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
      }.toStream
      metadata.foreach { metadata => metadata.get() }
      }
      }


      Hope this helps.






      share|improve this answer
























      • If I may ask, how to implement this idea in Python, especially the lazy part?

        – avocado
        Sep 27 '17 at 6:36











      • done, here is the link, stackoverflow.com/q/46464003/2235936

        – avocado
        Sep 28 '17 at 8:05














      26












      26








      26







      Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.



      I'd suggest the following approach:




      • Use (and re-use) one KafkaProducer instance per executor process/JVM.


      Here's the high-level setup for this approach:




      1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.

      2. You "ship" the wrapped producer to each executor by using a broadcast variable.

      3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.


      The code snippets below work with Spark Streaming as of Spark 2.0.



      Step 1: Wrapping KafkaProducer



      import java.util.concurrent.Future

      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

      class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

      /* This is the key idea that allows us to work around running into
      NotSerializableExceptions. */
      lazy val producer = createProducer()

      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, key, value))

      def send(topic: String, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, value))

      }

      object MySparkKafkaProducer {

      import scala.collection.JavaConversions._

      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
      val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
      // Ensure that, on executor JVM shutdown, the Kafka producer sends
      // any buffered messages to Kafka before shutting down.
      producer.close()
      }

      producer
      }
      new MySparkKafkaProducer(createProducerFunc)
      }

      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

      }


      Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance



      import org.apache.kafka.clients.producer.ProducerConfig

      val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
      }

      ssc.checkpoint("checkpoint-directory")

      val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
      val p = new Properties()
      p.setProperty("bootstrap.servers", "broker1:9092")
      p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
      }


      Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)



      import java.util.concurrent.Future
      import org.apache.kafka.clients.producer.RecordMetadata

      val stream: DStream[String] = ???
      stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
      val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
      }.toStream
      metadata.foreach { metadata => metadata.get() }
      }
      }


      Hope this helps.






      share|improve this answer













      Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.



      I'd suggest the following approach:




      • Use (and re-use) one KafkaProducer instance per executor process/JVM.


      Here's the high-level setup for this approach:




      1. First, you must "wrap" Kafka's KafkaProducer because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer not being serializable.

      2. You "ship" the wrapped producer to each executor by using a broadcast variable.

      3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.


      The code snippets below work with Spark Streaming as of Spark 2.0.



      Step 1: Wrapping KafkaProducer



      import java.util.concurrent.Future

      import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

      class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

      /* This is the key idea that allows us to work around running into
      NotSerializableExceptions. */
      lazy val producer = createProducer()

      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, key, value))

      def send(topic: String, value: V): Future[RecordMetadata] =
      producer.send(new ProducerRecord[K, V](topic, value))

      }

      object MySparkKafkaProducer {

      import scala.collection.JavaConversions._

      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
      val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
      // Ensure that, on executor JVM shutdown, the Kafka producer sends
      // any buffered messages to Kafka before shutting down.
      producer.close()
      }

      producer
      }
      new MySparkKafkaProducer(createProducerFunc)
      }

      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

      }


      Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer instance



      import org.apache.kafka.clients.producer.ProducerConfig

      val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
      }

      ssc.checkpoint("checkpoint-directory")

      val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
      val p = new Properties()
      p.setProperty("bootstrap.servers", "broker1:9092")
      p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
      }


      Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer instance (for each executor)



      import java.util.concurrent.Future
      import org.apache.kafka.clients.producer.RecordMetadata

      val stream: DStream[String] = ???
      stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
      val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
      }.toStream
      metadata.foreach { metadata => metadata.get() }
      }
      }


      Hope this helps.







      share|improve this answer












      share|improve this answer



      share|improve this answer










      answered Sep 16 '16 at 19:56









      Michael G. NollMichael G. Noll

      8,0302843




      8,0302843













      • If I may ask, how to implement this idea in Python, especially the lazy part?

        – avocado
        Sep 27 '17 at 6:36











      • done, here is the link, stackoverflow.com/q/46464003/2235936

        – avocado
        Sep 28 '17 at 8:05



















      • If I may ask, how to implement this idea in Python, especially the lazy part?

        – avocado
        Sep 27 '17 at 6:36











      • done, here is the link, stackoverflow.com/q/46464003/2235936

        – avocado
        Sep 28 '17 at 8:05

















      If I may ask, how to implement this idea in Python, especially the lazy part?

      – avocado
      Sep 27 '17 at 6:36





      If I may ask, how to implement this idea in Python, especially the lazy part?

      – avocado
      Sep 27 '17 at 6:36













      done, here is the link, stackoverflow.com/q/46464003/2235936

      – avocado
      Sep 28 '17 at 8:05





      done, here is the link, stackoverflow.com/q/46464003/2235936

      – avocado
      Sep 28 '17 at 8:05











      8














      There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.



      The Writer can be found here: https://github.com/cloudera/spark-kafka-writer






      share|improve this answer



















      • 2





        404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

        – Mekal
        Sep 6 '16 at 3:36








      • 1





        Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

        – Michael G. Noll
        Sep 16 '16 at 19:59
















      8














      There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.



      The Writer can be found here: https://github.com/cloudera/spark-kafka-writer






      share|improve this answer



















      • 2





        404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

        – Mekal
        Sep 6 '16 at 3:36








      • 1





        Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

        – Michael G. Noll
        Sep 16 '16 at 19:59














      8












      8








      8







      There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.



      The Writer can be found here: https://github.com/cloudera/spark-kafka-writer






      share|improve this answer













      There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.



      The Writer can be found here: https://github.com/cloudera/spark-kafka-writer







      share|improve this answer












      share|improve this answer



      share|improve this answer










      answered Jul 23 '15 at 23:31









      maasgmaasg

      30.9k971103




      30.9k971103








      • 2





        404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

        – Mekal
        Sep 6 '16 at 3:36








      • 1





        Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

        – Michael G. Noll
        Sep 16 '16 at 19:59














      • 2





        404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

        – Mekal
        Sep 6 '16 at 3:36








      • 1





        Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

        – Michael G. Noll
        Sep 16 '16 at 19:59








      2




      2





      404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

      – Mekal
      Sep 6 '16 at 3:36







      404 Not found for that project, removed? github.com/cloudera/spark-kafka-writer

      – Mekal
      Sep 6 '16 at 3:36






      1




      1





      Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

      – Michael G. Noll
      Sep 16 '16 at 19:59





      Nowadays there's github.com/BenFradet/spark-kafka-writer (same name, but not sure whether it's the same code)

      – Michael G. Noll
      Sep 16 '16 at 19:59











      7














      I was having the same issue and found this post.



      The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.



          val kafkaSink = sparkContext.broadcast(KafkaSink(conf))


      He uses a wrapper that lazily creates the producer:



          class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
      }


      object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
      val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
      producer.close()
      }

      producer
      }
      new KafkaSink(f)
      }
      }


      The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:



          dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
      kafkaSink.value.send("topicName", message)
      }
      }





      share|improve this answer


























      • What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

        – Ra41P
        Jun 1 '18 at 6:29
















      7














      I was having the same issue and found this post.



      The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.



          val kafkaSink = sparkContext.broadcast(KafkaSink(conf))


      He uses a wrapper that lazily creates the producer:



          class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
      }


      object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
      val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
      producer.close()
      }

      producer
      }
      new KafkaSink(f)
      }
      }


      The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:



          dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
      kafkaSink.value.send("topicName", message)
      }
      }





      share|improve this answer


























      • What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

        – Ra41P
        Jun 1 '18 at 6:29














      7












      7








      7







      I was having the same issue and found this post.



      The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.



          val kafkaSink = sparkContext.broadcast(KafkaSink(conf))


      He uses a wrapper that lazily creates the producer:



          class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
      }


      object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
      val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
      producer.close()
      }

      producer
      }
      new KafkaSink(f)
      }
      }


      The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:



          dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
      kafkaSink.value.send("topicName", message)
      }
      }





      share|improve this answer















      I was having the same issue and found this post.



      The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.



          val kafkaSink = sparkContext.broadcast(KafkaSink(conf))


      He uses a wrapper that lazily creates the producer:



          class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
      }


      object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
      val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
      producer.close()
      }

      producer
      }
      new KafkaSink(f)
      }
      }


      The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:



          dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
      kafkaSink.value.send("topicName", message)
      }
      }






      share|improve this answer














      share|improve this answer



      share|improve this answer








      edited Jan 31 '17 at 20:52









      cricket_007

      82.9k1145112




      82.9k1145112










      answered Sep 7 '16 at 8:50









      gcaliarigcaliari

      9615




      9615













      • What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

        – Ra41P
        Jun 1 '18 at 6:29



















      • What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

        – Ra41P
        Jun 1 '18 at 6:29

















      What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

      – Ra41P
      Jun 1 '18 at 6:29





      What prevents me from having a singleton class in my JARs, that has the kafka producer in it. This way, I don't need a broadcast variable. Just having a singleton KafkaSink will ensure one KafkaSink per executor, as a singleton will be initialized once per JVM (aka executor).

      – Ra41P
      Jun 1 '18 at 6:29











      3














      Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext until it becomes acceptable (obv. there's a latency cost to doing this).



      (If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)






      share|improve this answer




























        3














        Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext until it becomes acceptable (obv. there's a latency cost to doing this).



        (If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)






        share|improve this answer


























          3












          3








          3







          Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext until it becomes acceptable (obv. there's a latency cost to doing this).



          (If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)






          share|improve this answer













          Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you have to redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContext until it becomes acceptable (obv. there's a latency cost to doing this).



          (If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Jul 23 '15 at 15:53









          lmmlmm

          13.7k32134




          13.7k32134























              3















              With Spark >= 2.2



              Both read and write operations are possible on Kafka using Structured Streaming API




              Build stream from Kafka topic



              // Subscribe to a topic and read messages from the earliest to latest offsets
              val ds= spark
              .readStream // use `read` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("subscribe", "source-topic1")
              .option("startingOffsets", "earliest")
              .option("endingOffsets", "latest")
              .load()


              Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.



              val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
              .as[(String, String)]


              Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.



              Write stream to Kafka topic



              dsStruc
              .writeStream // use `write` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("topic", "target-topic1")
              .start()


              More configuration for Kafka integration to read or write



              Key artifacts to add in the application



               "org.apache.spark" % "spark-core_2.11" % 2.2.0,
              "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
              "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,





              share|improve this answer


























              • Any reason for downvote ?

                – mrsrinivas
                Jan 23 at 15:27
















              3















              With Spark >= 2.2



              Both read and write operations are possible on Kafka using Structured Streaming API




              Build stream from Kafka topic



              // Subscribe to a topic and read messages from the earliest to latest offsets
              val ds= spark
              .readStream // use `read` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("subscribe", "source-topic1")
              .option("startingOffsets", "earliest")
              .option("endingOffsets", "latest")
              .load()


              Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.



              val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
              .as[(String, String)]


              Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.



              Write stream to Kafka topic



              dsStruc
              .writeStream // use `write` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("topic", "target-topic1")
              .start()


              More configuration for Kafka integration to read or write



              Key artifacts to add in the application



               "org.apache.spark" % "spark-core_2.11" % 2.2.0,
              "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
              "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,





              share|improve this answer


























              • Any reason for downvote ?

                – mrsrinivas
                Jan 23 at 15:27














              3












              3








              3








              With Spark >= 2.2



              Both read and write operations are possible on Kafka using Structured Streaming API




              Build stream from Kafka topic



              // Subscribe to a topic and read messages from the earliest to latest offsets
              val ds= spark
              .readStream // use `read` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("subscribe", "source-topic1")
              .option("startingOffsets", "earliest")
              .option("endingOffsets", "latest")
              .load()


              Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.



              val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
              .as[(String, String)]


              Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.



              Write stream to Kafka topic



              dsStruc
              .writeStream // use `write` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("topic", "target-topic1")
              .start()


              More configuration for Kafka integration to read or write



              Key artifacts to add in the application



               "org.apache.spark" % "spark-core_2.11" % 2.2.0,
              "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
              "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,





              share|improve this answer
















              With Spark >= 2.2



              Both read and write operations are possible on Kafka using Structured Streaming API




              Build stream from Kafka topic



              // Subscribe to a topic and read messages from the earliest to latest offsets
              val ds= spark
              .readStream // use `read` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("subscribe", "source-topic1")
              .option("startingOffsets", "earliest")
              .option("endingOffsets", "latest")
              .load()


              Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String type.



              val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
              .as[(String, String)]


              Since dsStruc have the schema, it's accepts all SQL kind operations like filter, agg, select ..etc on it.



              Write stream to Kafka topic



              dsStruc
              .writeStream // use `write` for batch, like DataFrame
              .format("kafka")
              .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
              .option("topic", "target-topic1")
              .start()


              More configuration for Kafka integration to read or write



              Key artifacts to add in the application



               "org.apache.spark" % "spark-core_2.11" % 2.2.0,
              "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
              "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,






              share|improve this answer














              share|improve this answer



              share|improve this answer








              edited Dec 10 '18 at 8:58

























              answered Nov 11 '18 at 15:26









              mrsrinivasmrsrinivas

              16.2k77296




              16.2k77296













              • Any reason for downvote ?

                – mrsrinivas
                Jan 23 at 15:27



















              • Any reason for downvote ?

                – mrsrinivas
                Jan 23 at 15:27

















              Any reason for downvote ?

              – mrsrinivas
              Jan 23 at 15:27





              Any reason for downvote ?

              – mrsrinivas
              Jan 23 at 15:27











              2














              This might be what you want to do. You basically create one producer for each partition of records.



              input.foreachRDD(rdd =>
              rdd.foreachPartition(
              partitionOfRecords =>
              {
              val props = new HashMap[String, Object]()
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
              val producer = new KafkaProducer[String,String](props)

              partitionOfRecords.foreach
              {
              case x:String=>{
              println(x)

              val message=new ProducerRecord[String, String]("output",null,x)
              producer.send(message)
              }
              }
              })
              )


              Hope that helps






              share|improve this answer




























                2














                This might be what you want to do. You basically create one producer for each partition of records.



                input.foreachRDD(rdd =>
                rdd.foreachPartition(
                partitionOfRecords =>
                {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                case x:String=>{
                println(x)

                val message=new ProducerRecord[String, String]("output",null,x)
                producer.send(message)
                }
                }
                })
                )


                Hope that helps






                share|improve this answer


























                  2












                  2








                  2







                  This might be what you want to do. You basically create one producer for each partition of records.



                  input.foreachRDD(rdd =>
                  rdd.foreachPartition(
                  partitionOfRecords =>
                  {
                  val props = new HashMap[String, Object]()
                  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                  val producer = new KafkaProducer[String,String](props)

                  partitionOfRecords.foreach
                  {
                  case x:String=>{
                  println(x)

                  val message=new ProducerRecord[String, String]("output",null,x)
                  producer.send(message)
                  }
                  }
                  })
                  )


                  Hope that helps






                  share|improve this answer













                  This might be what you want to do. You basically create one producer for each partition of records.



                  input.foreachRDD(rdd =>
                  rdd.foreachPartition(
                  partitionOfRecords =>
                  {
                  val props = new HashMap[String, Object]()
                  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                  val producer = new KafkaProducer[String,String](props)

                  partitionOfRecords.foreach
                  {
                  case x:String=>{
                  println(x)

                  val message=new ProducerRecord[String, String]("output",null,x)
                  producer.send(message)
                  }
                  }
                  })
                  )


                  Hope that helps







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Jul 24 '15 at 3:59









                  sainath reddysainath reddy

                  629




                  629






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f31590592%2fspark-streaming-read-and-write-on-kafka-topic%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      Create new schema in PostgreSQL using DBeaver

                      Deepest pit of an array with Javascript: test on Codility

                      Costa Masnaga