Create new Producer from Kafka consumer?
How to create new Kafka Producer from existing Consumer with java ?
apache-kafka kafka-consumer-api kafka-producer-api
add a comment |
How to create new Kafka Producer from existing Consumer with java ?
apache-kafka kafka-consumer-api kafka-producer-api
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29
add a comment |
How to create new Kafka Producer from existing Consumer with java ?
apache-kafka kafka-consumer-api kafka-producer-api
How to create new Kafka Producer from existing Consumer with java ?
apache-kafka kafka-consumer-api kafka-producer-api
apache-kafka kafka-consumer-api kafka-producer-api
edited Nov 14 '18 at 15:29
cricket_007
82.6k1144112
82.6k1144112
asked Nov 14 '18 at 6:27
Prince Prince
33
33
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29
add a comment |
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29
add a comment |
2 Answers
2
active
oldest
votes
You can't create a KafkaProducer from a KafkaConsumer instance.
You have to explicitly create a KafkaProducer using the same connection settings as your producer.
Considering the use case you mentioned (copying data from a topic to another), I'd recommend using Kafka Streams. There's actually an example in Kafka that does exactly that: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
add a comment |
I will recommend to use the Kafka Streams library. It reads data from kafka topics and do some processing and write back to another topics.
That could be simpler approach for you.
https://kafka.apache.org/documentation/streams/
Current limitation is, Source and destination cluster should be same with Kafka Streams.
Otherwise you need to use Processor API to define another destination cluster.
Another approach, is simply define a producer in the consumer program. Wherever your rule matches(based on offset or any conditions), call producer.send()
method
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294278%2fcreate-new-producer-from-kafka-consumer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can't create a KafkaProducer from a KafkaConsumer instance.
You have to explicitly create a KafkaProducer using the same connection settings as your producer.
Considering the use case you mentioned (copying data from a topic to another), I'd recommend using Kafka Streams. There's actually an example in Kafka that does exactly that: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
add a comment |
You can't create a KafkaProducer from a KafkaConsumer instance.
You have to explicitly create a KafkaProducer using the same connection settings as your producer.
Considering the use case you mentioned (copying data from a topic to another), I'd recommend using Kafka Streams. There's actually an example in Kafka that does exactly that: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
add a comment |
You can't create a KafkaProducer from a KafkaConsumer instance.
You have to explicitly create a KafkaProducer using the same connection settings as your producer.
Considering the use case you mentioned (copying data from a topic to another), I'd recommend using Kafka Streams. There's actually an example in Kafka that does exactly that: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
You can't create a KafkaProducer from a KafkaConsumer instance.
You have to explicitly create a KafkaProducer using the same connection settings as your producer.
Considering the use case you mentioned (copying data from a topic to another), I'd recommend using Kafka Streams. There's actually an example in Kafka that does exactly that: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
answered Nov 14 '18 at 10:08
Mickael MaisonMickael Maison
7,98542730
7,98542730
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
add a comment |
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
Actual my task is to print the data from particular offset to particular off set for example ->print the data from 151 offset to 351 offset .
– Prince
Nov 14 '18 at 10:34
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
for this i use consumer.seek and poll methods however it's not working properly
– Prince
Nov 14 '18 at 10:36
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
ConsumerRecords<String, AccountHolder> records = consumer.poll(Duration.ofSeconds(1000)); if (flag) //consumer.seek(new TopicPartition("sudotest", 0), 29061); consumer.seek(new TopicPartition("sudotest",0), 2854); flag = false; for (ConsumerRecord<String, AccountHolder> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
– Prince
Nov 14 '18 at 10:37
1
1
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
In the question you asked about Producer and now you're asking about consuming data at specific offsets. I'm not sure I understand how what you're doing
– Mickael Maison
Nov 14 '18 at 12:01
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
i use consumer .seek method to start printing the data from specific offset but -1 and 0 offset value printing by default that is why iam trying to use to use new producer and send the data in new topic after applying if loop condition that'is my agenda if iam wrong then please give me solution.
– Prince
Nov 14 '18 at 12:37
add a comment |
I will recommend to use the Kafka Streams library. It reads data from kafka topics and do some processing and write back to another topics.
That could be simpler approach for you.
https://kafka.apache.org/documentation/streams/
Current limitation is, Source and destination cluster should be same with Kafka Streams.
Otherwise you need to use Processor API to define another destination cluster.
Another approach, is simply define a producer in the consumer program. Wherever your rule matches(based on offset or any conditions), call producer.send()
method
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
add a comment |
I will recommend to use the Kafka Streams library. It reads data from kafka topics and do some processing and write back to another topics.
That could be simpler approach for you.
https://kafka.apache.org/documentation/streams/
Current limitation is, Source and destination cluster should be same with Kafka Streams.
Otherwise you need to use Processor API to define another destination cluster.
Another approach, is simply define a producer in the consumer program. Wherever your rule matches(based on offset or any conditions), call producer.send()
method
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
add a comment |
I will recommend to use the Kafka Streams library. It reads data from kafka topics and do some processing and write back to another topics.
That could be simpler approach for you.
https://kafka.apache.org/documentation/streams/
Current limitation is, Source and destination cluster should be same with Kafka Streams.
Otherwise you need to use Processor API to define another destination cluster.
Another approach, is simply define a producer in the consumer program. Wherever your rule matches(based on offset or any conditions), call producer.send()
method
I will recommend to use the Kafka Streams library. It reads data from kafka topics and do some processing and write back to another topics.
That could be simpler approach for you.
https://kafka.apache.org/documentation/streams/
Current limitation is, Source and destination cluster should be same with Kafka Streams.
Otherwise you need to use Processor API to define another destination cluster.
Another approach, is simply define a producer in the consumer program. Wherever your rule matches(based on offset or any conditions), call producer.send()
method
edited Nov 14 '18 at 12:47
answered Nov 14 '18 at 12:42
Nishu TayalNishu Tayal
12.6k73482
12.6k73482
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
add a comment |
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
that third approach which i was trying almost done !!!!!!!thanks
– Prince
Nov 14 '18 at 13:01
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53294278%2fcreate-new-producer-from-kafka-consumer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What do you mean by new Kafka Producer? Do you just want to consume data and publish into another topic with some processing done?
– Nishu Tayal
Nov 14 '18 at 8:41
no actually i just want to take the data from consumer and want to pass in new topic <<<<<<-------
– Prince
Nov 14 '18 at 9:56
pass in new topic with the help of new producer
– Prince
Nov 14 '18 at 9:57
Please edit your question to better explain what you actually want, based on the comments below
– cricket_007
Nov 14 '18 at 15:29