spring integration dsl buffer
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I have a requirement where I need to hold/buffer the messages that are received on a channel and persist in database based on number of messages or timeout mean no messages received for 1min.
Is there a way to achieve this in spring integration
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
spring-integration spring-integration-dsl
add a comment |
I have a requirement where I need to hold/buffer the messages that are received on a channel and persist in database based on number of messages or timeout mean no messages received for 1min.
Is there a way to achieve this in spring integration
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
spring-integration spring-integration-dsl
add a comment |
I have a requirement where I need to hold/buffer the messages that are received on a channel and persist in database based on number of messages or timeout mean no messages received for 1min.
Is there a way to achieve this in spring integration
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
spring-integration spring-integration-dsl
I have a requirement where I need to hold/buffer the messages that are received on a channel and persist in database based on number of messages or timeout mean no messages received for 1min.
Is there a way to achieve this in spring integration
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
spring-integration spring-integration-dsl
spring-integration spring-integration-dsl
asked Nov 15 '18 at 14:49
nagendranagendra
243519
243519
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
There is an .aggregate()
operator based on the Aggregator
EI-pattern implementation.
That one you can configure with the JdbcMessageStore
to buffer messages and store them into DB.
You can hold them there until some condition via ReleaseStrategy
(based on each message arriving) or release them due to group timeout
.
If you are not interested to have them all afterward as a single aggregated message, you can consider to use a SimpleMessageGroupProcessor
which just produces a Collection<Message<?>>
and iterates over them to send to the output one by one.
See more info about aggregator in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
We want to accumulate message without correlationId, we are seeing the following errorCaused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
You can do a static correlationKey:.correlationStrategy(m -> 1)
. Also you need to ensure anexpireGroupsUponCompletion(true)
.
– Artem Bilan
Nov 15 '18 at 15:51
1
Thanks.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked
– nagendra
Nov 15 '18 at 16:16
the above didnt work without.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again callgroupTimeout
even if we are usingTimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
TheTimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point ofgroupTimeout()
to have some background task when there is no incoming message for the specific time.
– Artem Bilan
Nov 16 '18 at 17:03
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322049%2fspring-integration-dsl-buffer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
There is an .aggregate()
operator based on the Aggregator
EI-pattern implementation.
That one you can configure with the JdbcMessageStore
to buffer messages and store them into DB.
You can hold them there until some condition via ReleaseStrategy
(based on each message arriving) or release them due to group timeout
.
If you are not interested to have them all afterward as a single aggregated message, you can consider to use a SimpleMessageGroupProcessor
which just produces a Collection<Message<?>>
and iterates over them to send to the output one by one.
See more info about aggregator in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
We want to accumulate message without correlationId, we are seeing the following errorCaused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
You can do a static correlationKey:.correlationStrategy(m -> 1)
. Also you need to ensure anexpireGroupsUponCompletion(true)
.
– Artem Bilan
Nov 15 '18 at 15:51
1
Thanks.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked
– nagendra
Nov 15 '18 at 16:16
the above didnt work without.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again callgroupTimeout
even if we are usingTimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
TheTimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point ofgroupTimeout()
to have some background task when there is no incoming message for the specific time.
– Artem Bilan
Nov 16 '18 at 17:03
add a comment |
There is an .aggregate()
operator based on the Aggregator
EI-pattern implementation.
That one you can configure with the JdbcMessageStore
to buffer messages and store them into DB.
You can hold them there until some condition via ReleaseStrategy
(based on each message arriving) or release them due to group timeout
.
If you are not interested to have them all afterward as a single aggregated message, you can consider to use a SimpleMessageGroupProcessor
which just produces a Collection<Message<?>>
and iterates over them to send to the output one by one.
See more info about aggregator in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
We want to accumulate message without correlationId, we are seeing the following errorCaused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
You can do a static correlationKey:.correlationStrategy(m -> 1)
. Also you need to ensure anexpireGroupsUponCompletion(true)
.
– Artem Bilan
Nov 15 '18 at 15:51
1
Thanks.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked
– nagendra
Nov 15 '18 at 16:16
the above didnt work without.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again callgroupTimeout
even if we are usingTimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
TheTimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point ofgroupTimeout()
to have some background task when there is no incoming message for the specific time.
– Artem Bilan
Nov 16 '18 at 17:03
add a comment |
There is an .aggregate()
operator based on the Aggregator
EI-pattern implementation.
That one you can configure with the JdbcMessageStore
to buffer messages and store them into DB.
You can hold them there until some condition via ReleaseStrategy
(based on each message arriving) or release them due to group timeout
.
If you are not interested to have them all afterward as a single aggregated message, you can consider to use a SimpleMessageGroupProcessor
which just produces a Collection<Message<?>>
and iterates over them to send to the output one by one.
See more info about aggregator in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
There is an .aggregate()
operator based on the Aggregator
EI-pattern implementation.
That one you can configure with the JdbcMessageStore
to buffer messages and store them into DB.
You can hold them there until some condition via ReleaseStrategy
(based on each message arriving) or release them due to group timeout
.
If you are not interested to have them all afterward as a single aggregated message, you can consider to use a SimpleMessageGroupProcessor
which just produces a Collection<Message<?>>
and iterates over them to send to the output one by one.
See more info about aggregator in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
answered Nov 15 '18 at 15:01
Artem BilanArtem Bilan
68.3k84973
68.3k84973
We want to accumulate message without correlationId, we are seeing the following errorCaused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
You can do a static correlationKey:.correlationStrategy(m -> 1)
. Also you need to ensure anexpireGroupsUponCompletion(true)
.
– Artem Bilan
Nov 15 '18 at 15:51
1
Thanks.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked
– nagendra
Nov 15 '18 at 16:16
the above didnt work without.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again callgroupTimeout
even if we are usingTimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
TheTimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point ofgroupTimeout()
to have some background task when there is no incoming message for the specific time.
– Artem Bilan
Nov 16 '18 at 17:03
add a comment |
We want to accumulate message without correlationId, we are seeing the following errorCaused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
You can do a static correlationKey:.correlationStrategy(m -> 1)
. Also you need to ensure anexpireGroupsUponCompletion(true)
.
– Artem Bilan
Nov 15 '18 at 15:51
1
Thanks.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked
– nagendra
Nov 15 '18 at 16:16
the above didnt work without.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again callgroupTimeout
even if we are usingTimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
TheTimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point ofgroupTimeout()
to have some background task when there is no incoming message for the specific time.
– Artem Bilan
Nov 16 '18 at 17:03
We want to accumulate message without correlationId, we are seeing the following error
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
We want to accumulate message without correlationId, we are seeing the following error
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
with this change .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2))
– nagendra
Nov 15 '18 at 15:49
1
1
You can do a static correlationKey:
.correlationStrategy(m -> 1)
. Also you need to ensure an expireGroupsUponCompletion(true)
.– Artem Bilan
Nov 15 '18 at 15:51
You can do a static correlationKey:
.correlationStrategy(m -> 1)
. Also you need to ensure an expireGroupsUponCompletion(true)
.– Artem Bilan
Nov 15 '18 at 15:51
1
1
Thanks
.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked– nagendra
Nov 15 '18 at 16:16
Thanks
.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100)))
worked– nagendra
Nov 15 '18 at 16:16
the above didnt work without
.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again call groupTimeout
even if we are using TimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
the above didnt work without
.expireGroupsUponTimeout(true).groupTimeout(2000)
Do we really need to again call groupTimeout
even if we are using TimeoutCountSequenceSizeReleaseStrategy
– nagendra
Nov 16 '18 at 16:57
1
1
The
TimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point of groupTimeout()
to have some background task when there is no incoming message for the specific time.– Artem Bilan
Nov 16 '18 at 17:03
The
TimeoutCountSequenceSizeReleaseStrategy
does its logic only when message arrives to the aggregator. The point of groupTimeout()
to have some background task when there is no incoming message for the specific time.– Artem Bilan
Nov 16 '18 at 17:03
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322049%2fspring-integration-dsl-buffer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown