Spring Integration : retry configuration with multi-instances
I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
The process is :
- Read XML files one by one in a shared folder.
- Process the file (check structure, content...), transform the data and send email.
- Write a report about this file in another shared folder.
- Delete successfully processed file.
I'm looking for a non-blocking and safe solution to process theses files.
Use cases :
- If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.
- If an instance is processing a file, the others instances must not process the file.
I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:$input.files.path"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
Problem :
With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE
). So if the app is restarted, the other files will never be processed
(it works fine if the app crashes when the first file is being processed).
It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?
Any help is very appreciated. It's going to make me crazy :(
Thanks !
Edits / Notes :
Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
I have updated my configuration with the answer from Artem Bilan. And remove the
transactional
block in thepoller
block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.I have unsuccessfully tested this configuration in the
poller
block (same behaviour) :<int:advice-chain>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.
java spring spring-boot spring-integration
add a comment |
I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
The process is :
- Read XML files one by one in a shared folder.
- Process the file (check structure, content...), transform the data and send email.
- Write a report about this file in another shared folder.
- Delete successfully processed file.
I'm looking for a non-blocking and safe solution to process theses files.
Use cases :
- If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.
- If an instance is processing a file, the others instances must not process the file.
I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:$input.files.path"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
Problem :
With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE
). So if the app is restarted, the other files will never be processed
(it works fine if the app crashes when the first file is being processed).
It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?
Any help is very appreciated. It's going to make me crazy :(
Thanks !
Edits / Notes :
Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
I have updated my configuration with the answer from Artem Bilan. And remove the
transactional
block in thepoller
block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.I have unsuccessfully tested this configuration in the
poller
block (same behaviour) :<int:advice-chain>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.
java spring spring-boot spring-integration
add a comment |
I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
The process is :
- Read XML files one by one in a shared folder.
- Process the file (check structure, content...), transform the data and send email.
- Write a report about this file in another shared folder.
- Delete successfully processed file.
I'm looking for a non-blocking and safe solution to process theses files.
Use cases :
- If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.
- If an instance is processing a file, the others instances must not process the file.
I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:$input.files.path"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
Problem :
With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE
). So if the app is restarted, the other files will never be processed
(it works fine if the app crashes when the first file is being processed).
It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?
Any help is very appreciated. It's going to make me crazy :(
Thanks !
Edits / Notes :
Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
I have updated my configuration with the answer from Artem Bilan. And remove the
transactional
block in thepoller
block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.I have unsuccessfully tested this configuration in the
poller
block (same behaviour) :<int:advice-chain>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.
java spring spring-boot spring-integration
I'm running 4 instances of Spring Boot Integration based apps on 4 differents servers.
The process is :
- Read XML files one by one in a shared folder.
- Process the file (check structure, content...), transform the data and send email.
- Write a report about this file in another shared folder.
- Delete successfully processed file.
I'm looking for a non-blocking and safe solution to process theses files.
Use cases :
- If an instance crashes while reading or processing a file (so without ending the integration chain) : another instance must process the file or the same instance must process the file after it restarts.
- If an instance is processing a file, the others instances must not process the file.
I have built this Spring Integration XML configuration file (it includes JDBC metadatastore with a shared H2 database) :
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:$input.files.path"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
Problem :
With multiple files, when 1 file is successfully processed, the transaction commit the others existing files in the metadatastore (table INT_METADATA_STORE
). So if the app is restarted, the other files will never be processed
(it works fine if the app crashes when the first file is being processed).
It seems it only apply for reading files, not for processing files in an integration chain ... How to manage rollback transaction on JVM crash file by file ?
Any help is very appreciated. It's going to make me crazy :(
Thanks !
Edits / Notes :
Inspired from https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
I have updated my configuration with the answer from Artem Bilan. And remove the
transactional
block in thepoller
block : I had conflict of transactions between instances (ugly table locks exceptions). Although the behaviour was the same.I have unsuccessfully tested this configuration in the
poller
block (same behaviour) :<int:advice-chain>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>Maybe a solution based on Idempotent Receiver Enterprise Integration Pattern could work. But I didn't manage to configure it... I don't find precise documentation.
java spring spring-boot spring-integration
java spring spring-boot spring-integration
edited Nov 15 '18 at 11:50
cactuschibre
asked Nov 12 '18 at 10:44
cactuschibrecactuschibre
474213
474213
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
You shouldn't use a PseudoTransactionManager
, but DataSourceTransactionManager
instead.
Since you use a JdbcMetadataStore
, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
add a comment |
Ok. I found a working solution. Maybe not the cleanest one but it works :
- Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).
inbound-channel-adapter
hasscan-each-poll
option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.- Option
defaultAutoCommit
is set tofalse
on the DB. - I didn't use the
FileSystemPersistentAcceptOnceFileListFilter
because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ... I wrote my own conditions and actions in expressions through filter and transaction synchronization.
<!-- Input -->
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:inbound-channel-adapter
id="inputAdapter"
channel="inputChannel"
directory="file:$input.files.path"
comparator="lastModifiedFileComparator"
scan-each-poll="true">
<int:poller max-messages-per-poll="1" fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
<int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
<!-- Rollback by removing the file from the metadatastore -->
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
</int:transaction-synchronization-factory>
<!-- Metadatastore configuration -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<!-- Workflow -->
<int:chain input-channel="processChannel" output-channel="outputChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<!-- Output -->
<int-file:outbound-channel-adapter
id="outputChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<!-- Delete the source file -->
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
Any improvement or other solution is welcome.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260483%2fspring-integration-retry-configuration-with-multi-instances%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
You shouldn't use a PseudoTransactionManager
, but DataSourceTransactionManager
instead.
Since you use a JdbcMetadataStore
, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
add a comment |
You shouldn't use a PseudoTransactionManager
, but DataSourceTransactionManager
instead.
Since you use a JdbcMetadataStore
, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
add a comment |
You shouldn't use a PseudoTransactionManager
, but DataSourceTransactionManager
instead.
Since you use a JdbcMetadataStore
, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.
You shouldn't use a PseudoTransactionManager
, but DataSourceTransactionManager
instead.
Since you use a JdbcMetadataStore
, it is going to participate in the transaction and if downstream flow fails, the entry in the metadata store is going to be rolled back as well.
answered Nov 12 '18 at 14:32
Artem BilanArtem Bilan
64.4k84668
64.4k84668
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
add a comment |
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
Good to know, but the problem persists :(
– cactuschibre
Nov 14 '18 at 10:36
add a comment |
Ok. I found a working solution. Maybe not the cleanest one but it works :
- Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).
inbound-channel-adapter
hasscan-each-poll
option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.- Option
defaultAutoCommit
is set tofalse
on the DB. - I didn't use the
FileSystemPersistentAcceptOnceFileListFilter
because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ... I wrote my own conditions and actions in expressions through filter and transaction synchronization.
<!-- Input -->
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:inbound-channel-adapter
id="inputAdapter"
channel="inputChannel"
directory="file:$input.files.path"
comparator="lastModifiedFileComparator"
scan-each-poll="true">
<int:poller max-messages-per-poll="1" fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
<int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
<!-- Rollback by removing the file from the metadatastore -->
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
</int:transaction-synchronization-factory>
<!-- Metadatastore configuration -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<!-- Workflow -->
<int:chain input-channel="processChannel" output-channel="outputChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<!-- Output -->
<int-file:outbound-channel-adapter
id="outputChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<!-- Delete the source file -->
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
Any improvement or other solution is welcome.
add a comment |
Ok. I found a working solution. Maybe not the cleanest one but it works :
- Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).
inbound-channel-adapter
hasscan-each-poll
option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.- Option
defaultAutoCommit
is set tofalse
on the DB. - I didn't use the
FileSystemPersistentAcceptOnceFileListFilter
because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ... I wrote my own conditions and actions in expressions through filter and transaction synchronization.
<!-- Input -->
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:inbound-channel-adapter
id="inputAdapter"
channel="inputChannel"
directory="file:$input.files.path"
comparator="lastModifiedFileComparator"
scan-each-poll="true">
<int:poller max-messages-per-poll="1" fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
<int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
<!-- Rollback by removing the file from the metadatastore -->
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
</int:transaction-synchronization-factory>
<!-- Metadatastore configuration -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<!-- Workflow -->
<int:chain input-channel="processChannel" output-channel="outputChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<!-- Output -->
<int-file:outbound-channel-adapter
id="outputChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<!-- Delete the source file -->
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
Any improvement or other solution is welcome.
add a comment |
Ok. I found a working solution. Maybe not the cleanest one but it works :
- Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).
inbound-channel-adapter
hasscan-each-poll
option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.- Option
defaultAutoCommit
is set tofalse
on the DB. - I didn't use the
FileSystemPersistentAcceptOnceFileListFilter
because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ... I wrote my own conditions and actions in expressions through filter and transaction synchronization.
<!-- Input -->
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:inbound-channel-adapter
id="inputAdapter"
channel="inputChannel"
directory="file:$input.files.path"
comparator="lastModifiedFileComparator"
scan-each-poll="true">
<int:poller max-messages-per-poll="1" fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
<int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
<!-- Rollback by removing the file from the metadatastore -->
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
</int:transaction-synchronization-factory>
<!-- Metadatastore configuration -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<!-- Workflow -->
<int:chain input-channel="processChannel" output-channel="outputChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<!-- Output -->
<int-file:outbound-channel-adapter
id="outputChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<!-- Delete the source file -->
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
Any improvement or other solution is welcome.
Ok. I found a working solution. Maybe not the cleanest one but it works :
- Multi-instances on separate servers, sharing the same H2 database (network folder mount). I think it should work via remote TCP. MVCC has been activated on H2 (check its doc).
inbound-channel-adapter
hasscan-each-poll
option activated to permit repolling files that could be previously ignored (if the process already begun by another instance). So, if another instance crashes, the file can be polled and processed again without restart for this very instance.- Option
defaultAutoCommit
is set tofalse
on the DB. - I didn't use the
FileSystemPersistentAcceptOnceFileListFilter
because it was aggregating all read files in the metadatastore when one file get successfully processed. I didn't manage to use it in my context ... I wrote my own conditions and actions in expressions through filter and transaction synchronization.
<!-- Input -->
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:inbound-channel-adapter
id="inputAdapter"
channel="inputChannel"
directory="file:$input.files.path"
comparator="lastModifiedFileComparator"
scan-each-poll="true">
<int:poller max-messages-per-poll="1" fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
<int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
<!-- Rollback by removing the file from the metadatastore -->
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
</int:transaction-synchronization-factory>
<!-- Metadatastore configuration -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:$database.path/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="$database.username"/>
<property name="password" value="$database.password"/>
<property name="maxIdle" value="4"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<!-- Workflow -->
<int:chain input-channel="processChannel" output-channel="outputChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<!-- Output -->
<int-file:outbound-channel-adapter
id="outputChannel"
directory="file:$output.files.path"
filename-generator-expression ="payload.name">
<!-- Delete the source file -->
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
Any improvement or other solution is welcome.
answered Nov 15 '18 at 15:41
cactuschibrecactuschibre
474213
474213
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260483%2fspring-integration-retry-configuration-with-multi-instances%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown