Is there a Python API for event-driven Kafka consumer?










0















I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.



I have been looking for something like the Spring implementation:



@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);



I have looked at:



  1. kafka-python

  2. pykafka

  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.










share|improve this question

















  • 2





    A consumer is already "event driven". What else do you need?

    – cricket_007
    Nov 12 '18 at 13:57











  • If you have Flask, then HTTP is also an interface, not only Kafka

    – cricket_007
    Nov 12 '18 at 13:58















0















I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.



I have been looking for something like the Spring implementation:



@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);



I have looked at:



  1. kafka-python

  2. pykafka

  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.










share|improve this question

















  • 2





    A consumer is already "event driven". What else do you need?

    – cricket_007
    Nov 12 '18 at 13:57











  • If you have Flask, then HTTP is also an interface, not only Kafka

    – cricket_007
    Nov 12 '18 at 13:58













0












0








0








I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.



I have been looking for something like the Spring implementation:



@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);



I have looked at:



  1. kafka-python

  2. pykafka

  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.










share|improve this question














I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.



I have been looking for something like the Spring implementation:



@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);



I have looked at:



  1. kafka-python

  2. pykafka

  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.







python events flask apache-kafka listener






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 12 '18 at 13:38









ShashankShashank

407




407







  • 2





    A consumer is already "event driven". What else do you need?

    – cricket_007
    Nov 12 '18 at 13:57











  • If you have Flask, then HTTP is also an interface, not only Kafka

    – cricket_007
    Nov 12 '18 at 13:58












  • 2





    A consumer is already "event driven". What else do you need?

    – cricket_007
    Nov 12 '18 at 13:57











  • If you have Flask, then HTTP is also an interface, not only Kafka

    – cricket_007
    Nov 12 '18 at 13:58







2




2





A consumer is already "event driven". What else do you need?

– cricket_007
Nov 12 '18 at 13:57





A consumer is already "event driven". What else do you need?

– cricket_007
Nov 12 '18 at 13:57













If you have Flask, then HTTP is also an interface, not only Kafka

– cricket_007
Nov 12 '18 at 13:58





If you have Flask, then HTTP is also an interface, not only Kafka

– cricket_007
Nov 12 '18 at 13:58












2 Answers
2






active

oldest

votes


















0














Kafka Consumer have to continuously poll to retrieve data from brokers.



Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.



You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.






share|improve this answer























  • Thanks for the idea. I came up with an implementation.

    – Shashank
    Nov 14 '18 at 6:16


















0














Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.



from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")

def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)


The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263393%2fis-there-a-python-api-for-event-driven-kafka-consumer%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    Kafka Consumer have to continuously poll to retrieve data from brokers.



    Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.



    You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.






    share|improve this answer























    • Thanks for the idea. I came up with an implementation.

      – Shashank
      Nov 14 '18 at 6:16















    0














    Kafka Consumer have to continuously poll to retrieve data from brokers.



    Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.



    You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.






    share|improve this answer























    • Thanks for the idea. I came up with an implementation.

      – Shashank
      Nov 14 '18 at 6:16













    0












    0








    0







    Kafka Consumer have to continuously poll to retrieve data from brokers.



    Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.



    You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.






    share|improve this answer













    Kafka Consumer have to continuously poll to retrieve data from brokers.



    Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.



    You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Nov 12 '18 at 18:00









    Mickael MaisonMickael Maison

    7,39942629




    7,39942629












    • Thanks for the idea. I came up with an implementation.

      – Shashank
      Nov 14 '18 at 6:16

















    • Thanks for the idea. I came up with an implementation.

      – Shashank
      Nov 14 '18 at 6:16
















    Thanks for the idea. I came up with an implementation.

    – Shashank
    Nov 14 '18 at 6:16





    Thanks for the idea. I came up with an implementation.

    – Shashank
    Nov 14 '18 at 6:16













    0














    Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.



    from kafka import KafkaConsumer
    import threading

    BOOTSTRAP_SERVERS = ['localhost:9092']

    def register_kafka_listener(topic, listener):
    # Poll kafka
    def poll():
    # Initialize consumer Instance
    consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

    print("About to start polling for topic:", topic)
    consumer.poll(timeout_ms=6000)
    print("Started Polling for topic:", topic)
    for msg in consumer:
    print("Entered the loopnKey: ",msg.key," Value:", msg.value)
    listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

    def kafka_listener(data):
    print("Image Ratings:n", data.value.decode("utf-8"))

    register_kafka_listener('topic1', kafka_listener)


    The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.






    share|improve this answer



























      0














      Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.



      from kafka import KafkaConsumer
      import threading

      BOOTSTRAP_SERVERS = ['localhost:9092']

      def register_kafka_listener(topic, listener):
      # Poll kafka
      def poll():
      # Initialize consumer Instance
      consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

      print("About to start polling for topic:", topic)
      consumer.poll(timeout_ms=6000)
      print("Started Polling for topic:", topic)
      for msg in consumer:
      print("Entered the loopnKey: ",msg.key," Value:", msg.value)
      listener(msg)
      print("About to register listener to topic:", topic)
      t1 = threading.Thread(target=poll)
      t1.start()
      print("started a background thread")

      def kafka_listener(data):
      print("Image Ratings:n", data.value.decode("utf-8"))

      register_kafka_listener('topic1', kafka_listener)


      The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.






      share|improve this answer

























        0












        0








        0







        Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.



        from kafka import KafkaConsumer
        import threading

        BOOTSTRAP_SERVERS = ['localhost:9092']

        def register_kafka_listener(topic, listener):
        # Poll kafka
        def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
        print("Entered the loopnKey: ",msg.key," Value:", msg.value)
        listener(msg)
        print("About to register listener to topic:", topic)
        t1 = threading.Thread(target=poll)
        t1.start()
        print("started a background thread")

        def kafka_listener(data):
        print("Image Ratings:n", data.value.decode("utf-8"))

        register_kafka_listener('topic1', kafka_listener)


        The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.






        share|improve this answer













        Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.



        from kafka import KafkaConsumer
        import threading

        BOOTSTRAP_SERVERS = ['localhost:9092']

        def register_kafka_listener(topic, listener):
        # Poll kafka
        def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
        print("Entered the loopnKey: ",msg.key," Value:", msg.value)
        listener(msg)
        print("About to register listener to topic:", topic)
        t1 = threading.Thread(target=poll)
        t1.start()
        print("started a background thread")

        def kafka_listener(data):
        print("Image Ratings:n", data.value.decode("utf-8"))

        register_kafka_listener('topic1', kafka_listener)


        The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 14 '18 at 6:31









        ShashankShashank

        407




        407



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263393%2fis-there-a-python-api-for-event-driven-kafka-consumer%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Use pre created SQLite database for Android project in kotlin

            Darth Vader #20

            Ondo