Gracefully shutdown a spring integration flow



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








0















I have a spring integration application, which I want to shutdown gracefully.
The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.



The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.



What I've learned and done so far:
- docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
- I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
- the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.



My problem is now:
With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.



My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?



Here's my code so far:
OutboundFlow:



 @Bean
public PseudoTransactionManager transactionManager()
return new PseudoTransactionManager();


@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory()
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);



@Bean
public PollerSpec orderOutboundFlowTempFileInPoller()
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());


@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...


Application:



 public static void main(final String args) throws Exception 
SpringApplication.run(App.class, args);


@Bean
public GracefulShutdown gracefulShutdown()
return new GracefulShutdown();


private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

@Autowired
private Lifecycle orderOutboundFlowTempFileIn;

@Override public void onApplicationEvent(ContextClosedEvent event)
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted())
try
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
catch (InterruptedException e)
Thread.currentThread().interrupt();


LOG.info("Transaction completed");












share|improve this question




























    0















    I have a spring integration application, which I want to shutdown gracefully.
    The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.



    The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.



    What I've learned and done so far:
    - docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
    - I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
    - the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.



    My problem is now:
    With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
    The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.



    My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?



    Here's my code so far:
    OutboundFlow:



     @Bean
    public PseudoTransactionManager transactionManager()
    return new PseudoTransactionManager();


    @Bean
    public TransactionSynchronizationFactory transactionSynchronizationFactory()
    final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    processor.setBeanFactory(beanFactory);
    return new DefaultTransactionSynchronizationFactory(processor);



    @Bean
    public PollerSpec orderOutboundFlowTempFileInPoller()
    return Pollers
    .fixedDelay(pollerDelay)
    .maxMessagesPerPoll(100)
    .transactional(transactionManager())
    .transactionSynchronizationFactory(transactionSynchronizationFactory());


    @Bean
    public IntegrationFlow orderOutboundFlowTempFileIn() {
    return IntegrationFlows
    .from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
    .filterFunction(
    f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
    e -> e.poller(orderOutboundFlowTempFileInPoller())) ...


    Application:



     public static void main(final String args) throws Exception 
    SpringApplication.run(App.class, args);


    @Bean
    public GracefulShutdown gracefulShutdown()
    return new GracefulShutdown();


    private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
    private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

    @Autowired
    private Lifecycle orderOutboundFlowTempFileIn;

    @Override public void onApplicationEvent(ContextClosedEvent event)
    LOG.info("Trying to gracefully shutdown App");
    ApplicationContext context = event.getApplicationContext();
    PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

    orderOutboundFlowTempFileIn.stop();
    TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
    .getAdviceChain()
    .iterator().next());
    if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
    final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
    LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
    while (!transaction.isCompleted())
    try
    LOG.info("Still active, waiting 30 more seconds");
    Thread.sleep(30000);
    catch (InterruptedException e)
    Thread.currentThread().interrupt();


    LOG.info("Transaction completed");












    share|improve this question
























      0












      0








      0








      I have a spring integration application, which I want to shutdown gracefully.
      The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.



      The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.



      What I've learned and done so far:
      - docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
      - I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
      - the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.



      My problem is now:
      With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
      The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.



      My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?



      Here's my code so far:
      OutboundFlow:



       @Bean
      public PseudoTransactionManager transactionManager()
      return new PseudoTransactionManager();


      @Bean
      public TransactionSynchronizationFactory transactionSynchronizationFactory()
      final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
      processor.setBeanFactory(beanFactory);
      return new DefaultTransactionSynchronizationFactory(processor);



      @Bean
      public PollerSpec orderOutboundFlowTempFileInPoller()
      return Pollers
      .fixedDelay(pollerDelay)
      .maxMessagesPerPoll(100)
      .transactional(transactionManager())
      .transactionSynchronizationFactory(transactionSynchronizationFactory());


      @Bean
      public IntegrationFlow orderOutboundFlowTempFileIn() {
      return IntegrationFlows
      .from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
      .filterFunction(
      f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
      e -> e.poller(orderOutboundFlowTempFileInPoller())) ...


      Application:



       public static void main(final String args) throws Exception 
      SpringApplication.run(App.class, args);


      @Bean
      public GracefulShutdown gracefulShutdown()
      return new GracefulShutdown();


      private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
      private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

      @Autowired
      private Lifecycle orderOutboundFlowTempFileIn;

      @Override public void onApplicationEvent(ContextClosedEvent event)
      LOG.info("Trying to gracefully shutdown App");
      ApplicationContext context = event.getApplicationContext();
      PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

      orderOutboundFlowTempFileIn.stop();
      TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
      .getAdviceChain()
      .iterator().next());
      if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
      final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
      LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
      while (!transaction.isCompleted())
      try
      LOG.info("Still active, waiting 30 more seconds");
      Thread.sleep(30000);
      catch (InterruptedException e)
      Thread.currentThread().interrupt();


      LOG.info("Transaction completed");












      share|improve this question














      I have a spring integration application, which I want to shutdown gracefully.
      The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.



      The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.



      What I've learned and done so far:
      - docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option)
      - I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context.
      - the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.



      My problem is now:
      With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives.
      The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.



      My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?



      Here's my code so far:
      OutboundFlow:



       @Bean
      public PseudoTransactionManager transactionManager()
      return new PseudoTransactionManager();


      @Bean
      public TransactionSynchronizationFactory transactionSynchronizationFactory()
      final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
      processor.setBeanFactory(beanFactory);
      return new DefaultTransactionSynchronizationFactory(processor);



      @Bean
      public PollerSpec orderOutboundFlowTempFileInPoller()
      return Pollers
      .fixedDelay(pollerDelay)
      .maxMessagesPerPoll(100)
      .transactional(transactionManager())
      .transactionSynchronizationFactory(transactionSynchronizationFactory());


      @Bean
      public IntegrationFlow orderOutboundFlowTempFileIn() {
      return IntegrationFlows
      .from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
      .filterFunction(
      f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
      e -> e.poller(orderOutboundFlowTempFileInPoller())) ...


      Application:



       public static void main(final String args) throws Exception 
      SpringApplication.run(App.class, args);


      @Bean
      public GracefulShutdown gracefulShutdown()
      return new GracefulShutdown();


      private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent>
      private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);

      @Autowired
      private Lifecycle orderOutboundFlowTempFileIn;

      @Override public void onApplicationEvent(ContextClosedEvent event)
      LOG.info("Trying to gracefully shutdown App");
      ApplicationContext context = event.getApplicationContext();
      PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");

      orderOutboundFlowTempFileIn.stop();
      TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
      .getAdviceChain()
      .iterator().next());
      if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager)
      final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
      LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
      while (!transaction.isCompleted())
      try
      LOG.info("Still active, waiting 30 more seconds");
      Thread.sleep(30000);
      catch (InterruptedException e)
      Thread.currentThread().interrupt();


      LOG.info("Transaction completed");









      spring-integration






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 15 '18 at 15:25









      Matthias HanskeMatthias Hanske

      163




      163






















          1 Answer
          1






          active

          oldest

          votes


















          1














          You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
          And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.



          This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.



          There is no reason to stop the whole flow from here, since all downstream components are passive.






          share|improve this answer























          • This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

            – Matthias Hanske
            Nov 16 '18 at 10:55












          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322669%2fgracefully-shutdown-a-spring-integration-flow%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
          And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.



          This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.



          There is no reason to stop the whole flow from here, since all downstream components are passive.






          share|improve this answer























          • This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

            – Matthias Hanske
            Nov 16 '18 at 10:55
















          1














          You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
          And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.



          This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.



          There is no reason to stop the whole flow from here, since all downstream components are passive.






          share|improve this answer























          • This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

            – Matthias Hanske
            Nov 16 '18 at 10:55














          1












          1








          1







          You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
          And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.



          This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.



          There is no reason to stop the whole flow from here, since all downstream components are passive.






          share|improve this answer













          You actually have just to stop a SourcePollingChannelAdapter for the Files.inboundAdapter(). For that purpose you need to add an .id() to the e lambda.
          And use that id to retrieve a SourcePollingChannelAdapter when you need to stop it.



          This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.



          There is no reason to stop the whole flow from here, since all downstream components are passive.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 15 '18 at 16:13









          Artem BilanArtem Bilan

          68.4k84973




          68.4k84973












          • This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

            – Matthias Hanske
            Nov 16 '18 at 10:55


















          • This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

            – Matthias Hanske
            Nov 16 '18 at 10:55

















          This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

          – Matthias Hanske
          Nov 16 '18 at 10:55






          This works to prevent the inbound adapter from reading files, but the outbound file transfer to the remote host is killed instantly, so there will be unfinished an therefore corrupted files. In this case the send()-method from the RemoteFileTemplate threw a MessagingExcpetion caused by an InterruptedIOException. Is there a way to achieve both? Finishing the current file and prevent new incoming messages? The messages can be large and take a while to upload to the remote host.

          – Matthias Hanske
          Nov 16 '18 at 10:55




















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322669%2fgracefully-shutdown-a-spring-integration-flow%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kleinkühnau

          Makov (Slowakei)

          Deutsches Schauspielhaus