Is there a Python API for event-driven Kafka consumer?
I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);
I have looked at:
- kafka-python
- pykafka
- confluent-kafka
But I couldn't find anything related to event-driven style of implementation in Python.
python events flask apache-kafka listener
add a comment |
I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);
I have looked at:
- kafka-python
- pykafka
- confluent-kafka
But I couldn't find anything related to event-driven style of implementation in Python.
python events flask apache-kafka listener
2
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58
add a comment |
I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);
I have looked at:
- kafka-python
- pykafka
- confluent-kafka
But I couldn't find anything related to event-driven style of implementation in Python.
python events flask apache-kafka listener
I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message)
System.out.println("Received Messasge in group mygroup: " + message);
I have looked at:
- kafka-python
- pykafka
- confluent-kafka
But I couldn't find anything related to event-driven style of implementation in Python.
python events flask apache-kafka listener
python events flask apache-kafka listener
asked Nov 12 '18 at 13:38
ShashankShashank
407
407
2
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58
add a comment |
2
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58
2
2
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58
add a comment |
2 Answers
2
active
oldest
votes
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
add a comment |
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263393%2fis-there-a-python-api-for-event-driven-kafka-consumer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
add a comment |
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
add a comment |
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
answered Nov 12 '18 at 18:00
Mickael MaisonMickael Maison
7,39942629
7,39942629
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
add a comment |
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
Thanks for the idea. I came up with an implementation.
– Shashank
Nov 14 '18 at 6:16
add a comment |
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
add a comment |
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
add a comment |
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
answered Nov 14 '18 at 6:31
ShashankShashank
407
407
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263393%2fis-there-a-python-api-for-event-driven-kafka-consumer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
A consumer is already "event driven". What else do you need?
– cricket_007
Nov 12 '18 at 13:57
If you have Flask, then HTTP is also an interface, not only Kafka
– cricket_007
Nov 12 '18 at 13:58