KafkaConsumer resume partition cannot continue to receive uncommitted messages
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
add a comment |
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05
add a comment |
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
apache-kafka kafka-consumer-api
asked Nov 12 '18 at 16:05
RickRick
56111
56111
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05
add a comment |
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05
add a comment |
1 Answer
1
active
oldest
votes
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53265928%2fkafkaconsumer-resume-partition-cannot-continue-to-receive-uncommitted-messages%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
add a comment |
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
add a comment |
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
answered Nov 12 '18 at 23:34
AbhishekNAbhishekN
23616
23616
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
add a comment |
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
Nov 13 '18 at 3:02
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53265928%2fkafkaconsumer-resume-partition-cannot-continue-to-receive-uncommitted-messages%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
Nov 12 '18 at 18:05