Gracefully shutdown a spring integration flow
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I have a spring integration application, which I want to shutdown gracefully.
The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.
The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.
What I've learned and done so far:
- docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
- I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
- the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.
My problem is now:
With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.
My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?
Here's my code so far:
OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager()
return new PseudoTransactionManager();
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory()
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller()
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Application:
public static void main(final String args) throws Exception
SpringApplication.run(App.class, args);
@Bean
public GracefulShutdown gracefulShutdown()
return new GracefulShutdown();
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event)
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted())
try
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
catch (InterruptedException e)
Thread.currentThread().interrupt();
LOG.info("Transaction completed");
spring-integration
add a comment |
I have a spring integration application, which I want to shutdown gracefully.
The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.
The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.
What I've learned and done so far:
- docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
- I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
- the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.
My problem is now:
With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.
My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?
Here's my code so far:
OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager()
return new PseudoTransactionManager();
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory()
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller()
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Application:
public static void main(final String args) throws Exception
SpringApplication.run(App.class, args);
@Bean
public GracefulShutdown gracefulShutdown()
return new GracefulShutdown();
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event)
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted())
try
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
catch (InterruptedException e)
Thread.currentThread().interrupt();
LOG.info("Transaction completed");
spring-integration
add a comment |
I have a spring integration application, which I want to shutdown gracefully.
The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.
The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.
What I've learned and done so far:
- docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
- I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
- the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.
My problem is now:
With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.
My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?
Here's my code so far:
OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager()
return new PseudoTransactionManager();
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory()
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller()
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Application:
public static void main(final String args) throws Exception
SpringApplication.run(App.class, args);
@Bean
public GracefulShutdown gracefulShutdown()
return new GracefulShutdown();
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event)
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted())
try
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
catch (InterruptedException e)
Thread.currentThread().interrupt();
LOG.info("Transaction completed");
spring-integration
I have a spring integration application, which I want to shutdown gracefully.
The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.
The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.
What I've learned and done so far:
- docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
- I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
- the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.
My problem is now:
With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.
My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?
Here's my code so far:
OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager()
return new PseudoTransactionManager();
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory()
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller()
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Application:
public static void main(final String args) throws Exception
SpringApplication.run(App.class, args);
@Bean
public GracefulShutdown gracefulShutdown()
return new GracefulShutdown();
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event)
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted())
try
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
catch (InterruptedException e)
Thread.currentThread().interrupt();
LOG.info("Transaction completed");
spring-integration
spring-integration
asked Nov 15 '18 at 15:25
Matthias HanskeMatthias Hanske
163
163
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322669%2fgracefully-shutdown-a-spring-integration-flow%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
add a comment |
You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
add a comment |
You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
answered Nov 15 '18 at 16:13
Artem BilanArtem Bilan
68.4k84973
68.4k84973
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
add a comment |
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.
– Matthias Hanske
Nov 16 '18 at 10:55
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322669%2fgracefully-shutdown-a-spring-integration-flow%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown