1353 lines
44 KiB
Plaintext
1353 lines
44 KiB
Plaintext
|
:batch-asciidoc: ./
|
||
|
:toc: left
|
||
|
:toclevels: 4
|
||
|
|
||
|
[[springBatchIntegration]]
|
||
|
|
||
|
== Spring Batch Integration
|
||
|
|
||
|
ifndef::onlyonetoggle[]
|
||
|
include::toggle.adoc[]
|
||
|
endif::onlyonetoggle[]
|
||
|
|
||
|
[[spring-batch-integration-introduction]]
|
||
|
|
||
|
=== Spring Batch Integration Introduction
|
||
|
|
||
|
Many users of Spring Batch may encounter requirements that are
|
||
|
outside the scope of Spring Batch but that may be efficiently and
|
||
|
concisely implemented by using Spring Integration. Conversely, Spring
|
||
|
Integration users may encounter Spring Batch requirements and need a way
|
||
|
to efficiently integrate both frameworks. In this context, several
|
||
|
patterns and use-cases emerge, and Spring Batch Integration
|
||
|
addresses those requirements.
|
||
|
|
||
|
The line between Spring Batch and Spring Integration is not always
|
||
|
clear, but two pieces of advice can
|
||
|
help: Think about granularity, and apply common patterns. Some
|
||
|
of those common patterns are described in this reference manual
|
||
|
section.
|
||
|
|
||
|
Adding messaging to a batch process enables automation of
|
||
|
operations and also separation and strategizing of key concerns.
|
||
|
For example, a message might trigger a job to execute, and then the
|
||
|
sending of the message can be exposed in a variety of ways. Alternatively, when
|
||
|
a job completes or fails, that event might trigger a message to be sent,
|
||
|
and the consumers of those messages might have operational concerns
|
||
|
that have nothing to do with the application itself. Messaging can
|
||
|
also be embedded in a job (for example reading or writing items for
|
||
|
processing via channels). Remote partitioning and remote chunking
|
||
|
provide methods to distribute workloads over a number of workers.
|
||
|
|
||
|
|
||
|
This section covers the following key concepts:
|
||
|
|
||
|
[role="xmlContent"]
|
||
|
* <<spring-batch-integration.adoc#namespace-support,Namespace Support>>
|
||
|
|
||
|
[[continue-section-list]]
|
||
|
* <<spring-batch-integration.adoc#launching-batch-jobs-through-messages,Launching Batch Jobs through Messages>>
|
||
|
|
||
|
|
||
|
|
||
|
* <<spring-batch-integration.adoc#providing-feedback-with-informational-messages,Providing Feedback with Informational Messages>>
|
||
|
|
||
|
|
||
|
|
||
|
* <<spring-batch-integration.adoc#asynchronous-processors,Asynchronous Processors>>
|
||
|
|
||
|
|
||
|
|
||
|
* <<spring-batch-integration.adoc#externalizing-batch-process-execution,Externalizing
|
||
|
Batch Process Execution>>
|
||
|
|
||
|
|
||
|
[[namespace-support]]
|
||
|
[role="xmlContent"]
|
||
|
==== Namespace Support
|
||
|
|
||
|
Since Spring Batch Integration 1.3, dedicated XML Namespace
|
||
|
support was added, with the aim to provide an easier configuration
|
||
|
experience. In order to activate the namespace, add the following
|
||
|
namespace declarations to your Spring XML Application Context
|
||
|
file:
|
||
|
|
||
|
[source, xml]
|
||
|
----
|
||
|
<beans xmlns="http://www.springframework.org/schema/beans"
|
||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||
|
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
|
||
|
xsi:schemaLocation="
|
||
|
http://www.springframework.org/schema/batch-integration
|
||
|
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
|
||
|
|
||
|
...
|
||
|
|
||
|
</beans>
|
||
|
----
|
||
|
|
||
|
A fully configured Spring XML Application Context file for Spring
|
||
|
Batch Integration may look like the following:
|
||
|
|
||
|
[source, xml]
|
||
|
----
|
||
|
<beans xmlns="http://www.springframework.org/schema/beans"
|
||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||
|
xmlns:int="http://www.springframework.org/schema/integration"
|
||
|
xmlns:batch="http://www.springframework.org/schema/batch"
|
||
|
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
|
||
|
xsi:schemaLocation="
|
||
|
http://www.springframework.org/schema/batch-integration
|
||
|
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
|
||
|
http://www.springframework.org/schema/batch
|
||
|
https://www.springframework.org/schema/batch/spring-batch.xsd
|
||
|
http://www.springframework.org/schema/beans
|
||
|
https://www.springframework.org/schema/beans/spring-beans.xsd
|
||
|
http://www.springframework.org/schema/integration
|
||
|
https://www.springframework.org/schema/integration/spring-integration.xsd">
|
||
|
|
||
|
...
|
||
|
|
||
|
</beans>
|
||
|
----
|
||
|
|
||
|
Appending version numbers to the referenced XSD file is also
|
||
|
allowed, but, as a version-less declaration always uses the
|
||
|
latest schema, we generally do not recommend appending the version
|
||
|
number to the XSD name. Adding a version number
|
||
|
could possibly create issues when updating the Spring Batch
|
||
|
Integration dependencies, as they may require more recent versions
|
||
|
of the XML schema.
|
||
|
|
||
|
|
||
|
[[launching-batch-jobs-through-messages]]
|
||
|
|
||
|
==== Launching Batch Jobs through Messages
|
||
|
|
||
|
|
||
|
When starting batch jobs by using the core Spring Batch API, you
|
||
|
basically have 2 options:
|
||
|
|
||
|
* From the command line, with the `CommandLineJobRunner`
|
||
|
* Programmatically, with either `JobOperator.start()` or `JobLauncher.run()`
|
||
|
|
||
|
|
||
|
|
||
|
For example, you may want to use the
|
||
|
`CommandLineJobRunner` when invoking Batch Jobs by
|
||
|
using a shell script. Alternatively, you may use the
|
||
|
`JobOperator` directly (for example, when using
|
||
|
Spring Batch as part of a web application). However, what about
|
||
|
more complex use cases? Maybe you need to poll a remote (S)FTP
|
||
|
server to retrieve the data for the Batch Job or your application
|
||
|
has to support multiple different data sources simultaneously. For
|
||
|
example, you may receive data files not only from the web, but also from
|
||
|
FTP and other sources. Maybe additional transformation of the input files is
|
||
|
needed before invoking Spring Batch.
|
||
|
|
||
|
|
||
|
|
||
|
Therefore, it would be much more powerful to execute the batch job
|
||
|
using Spring Integration and its numerous adapters. For example,
|
||
|
you can use a __File Inbound Channel Adapter__ to
|
||
|
monitor a directory in the file-system and start the Batch Job as
|
||
|
soon as the input file arrives. Additionally, you can create Spring
|
||
|
Integration flows that use multiple different adapters to easily
|
||
|
ingest data for your batch jobs from multiple sources
|
||
|
simultaneously using only configuration. Implementing all these
|
||
|
scenarios with Spring Integration is easy, as it allows for
|
||
|
decoupled, event-driven execution of the
|
||
|
`JobLauncher`.
|
||
|
|
||
|
|
||
|
|
||
|
Spring Batch Integration provides the
|
||
|
`JobLaunchingMessageHandler` class that you can
|
||
|
use to launch batch jobs. The input for the
|
||
|
`JobLaunchingMessageHandler` is provided by a
|
||
|
Spring Integration message, which has a payload of type
|
||
|
`JobLaunchRequest`. This class is a wrapper around the `Job`
|
||
|
that needs to be launched and around the `JobParameters`
|
||
|
necessary to launch the Batch job.
|
||
|
|
||
|
|
||
|
|
||
|
The following image illustrates the typical Spring Integration
|
||
|
message flow in order to start a Batch job. The
|
||
|
link:$$https://www.enterpriseintegrationpatterns.com/toc.html$$[EIP (Enterprise Integration Patterns) website]
|
||
|
provides a full overview of messaging icons and their descriptions.
|
||
|
|
||
|
.Launch Batch Job
|
||
|
image::{batch-asciidoc}images/launch-batch-job.png[Launch Batch Job, scaledwidth="60%"]
|
||
|
|
||
|
|
||
|
[[transforming-a-file-into-a-joblaunchrequest]]
|
||
|
|
||
|
===== Transforming a file into a JobLaunchRequest
|
||
|
|
||
|
|
||
|
[source, java]
|
||
|
----
|
||
|
package io.spring.sbi;
|
||
|
|
||
|
import org.springframework.batch.core.Job;
|
||
|
import org.springframework.batch.core.JobParametersBuilder;
|
||
|
import org.springframework.batch.integration.launch.JobLaunchRequest;
|
||
|
import org.springframework.integration.annotation.Transformer;
|
||
|
import org.springframework.messaging.Message;
|
||
|
|
||
|
import java.io.File;
|
||
|
|
||
|
public class FileMessageToJobRequest {
|
||
|
private Job job;
|
||
|
private String fileParameterName;
|
||
|
|
||
|
public void setFileParameterName(String fileParameterName) {
|
||
|
this.fileParameterName = fileParameterName;
|
||
|
}
|
||
|
|
||
|
public void setJob(Job job) {
|
||
|
this.job = job;
|
||
|
}
|
||
|
|
||
|
@Transformer
|
||
|
public JobLaunchRequest toRequest(Message<File> message) {
|
||
|
JobParametersBuilder jobParametersBuilder =
|
||
|
new JobParametersBuilder();
|
||
|
|
||
|
jobParametersBuilder.addString(fileParameterName,
|
||
|
message.getPayload().getAbsolutePath());
|
||
|
|
||
|
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
|
||
|
[[the-jobexecution-response]]
|
||
|
|
||
|
===== The `JobExecution` Response
|
||
|
|
||
|
When a batch job is being executed, a
|
||
|
`JobExecution` instance is returned. This
|
||
|
instance can be used to determine the status of an execution. If
|
||
|
a `JobExecution` is able to be created
|
||
|
successfully, it is always returned, regardless of whether
|
||
|
or not the actual execution is successful.
|
||
|
|
||
|
|
||
|
|
||
|
The exact behavior on how the `JobExecution`
|
||
|
instance is returned depends on the provided
|
||
|
`TaskExecutor`. If a
|
||
|
`synchronous` (single-threaded)
|
||
|
`TaskExecutor` implementation is used, the
|
||
|
`JobExecution` response is returned only
|
||
|
`after` the job completes. When using an
|
||
|
`asynchronous`
|
||
|
`TaskExecutor`, the
|
||
|
`JobExecution` instance is returned
|
||
|
immediately. Users can then take the `id` of
|
||
|
`JobExecution` instance
|
||
|
(with `JobExecution.getJobId()`) and query the
|
||
|
`JobRepository` for the job's updated status
|
||
|
using the `JobExplorer`. For more
|
||
|
information, please refer to the Spring
|
||
|
Batch reference documentation on
|
||
|
<<job.adoc#queryingRepository,Querying the Repository>>.
|
||
|
|
||
|
|
||
|
|
||
|
[[spring-batch-integration-configuration]]
|
||
|
|
||
|
===== Spring Batch Integration Configuration
|
||
|
|
||
|
The following configuration creates a file
|
||
|
`inbound-channel-adapter` to listen for CSV
|
||
|
files in the provided directory, hand them off to our
|
||
|
transformer (`FileMessageToJobRequest`),
|
||
|
launch the job via the __Job Launching Gateway__, and then log the output of the
|
||
|
`JobExecution` with the
|
||
|
`logging-channel-adapter`.
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<int:channel id="inboundFileChannel"/>
|
||
|
<int:channel id="outboundJobRequestChannel"/>
|
||
|
<int:channel id="jobLaunchReplyChannel"/>
|
||
|
|
||
|
<int-file:inbound-channel-adapter id="filePoller"
|
||
|
channel="inboundFileChannel"
|
||
|
directory="file:/tmp/myfiles/"
|
||
|
filename-pattern="*.csv">
|
||
|
<int:poller fixed-rate="1000"/>
|
||
|
</int-file:inbound-channel-adapter>
|
||
|
|
||
|
<int:transformer input-channel="inboundFileChannel"
|
||
|
output-channel="outboundJobRequestChannel">
|
||
|
<bean class="io.spring.sbi.FileMessageToJobRequest">
|
||
|
<property name="job" ref="personJob"/>
|
||
|
<property name="fileParameterName" value="input.file.name"/>
|
||
|
</bean>
|
||
|
</int:transformer>
|
||
|
|
||
|
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
|
||
|
reply-channel="jobLaunchReplyChannel"/>
|
||
|
|
||
|
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
public FileMessageToJobRequest fileMessageToJobRequest() {
|
||
|
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
|
||
|
fileMessageToJobRequest.setFileParameterName("input.file.name");
|
||
|
fileMessageToJobRequest.setJob(personJob());
|
||
|
return fileMessageToJobRequest;
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public JobLaunchingGateway jobLaunchingGateway() {
|
||
|
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
|
||
|
simpleJobLauncher.setJobRepository(jobRepository);
|
||
|
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
|
||
|
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
|
||
|
|
||
|
return jobLaunchingGateway;
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
|
||
|
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
|
||
|
filter(new SimplePatternFileListFilter("*.csv")),
|
||
|
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
|
||
|
handle(fileMessageToJobRequest()).
|
||
|
handle(jobLaunchingGateway).
|
||
|
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
|
||
|
get();
|
||
|
}
|
||
|
----
|
||
|
|
||
|
|
||
|
[[example-itemreader-configuration]]
|
||
|
|
||
|
===== Example ItemReader Configuration
|
||
|
|
||
|
Now that we are polling for files and launching jobs, we need to
|
||
|
configure our Spring Batch
|
||
|
`ItemReader` (for example) to use the files found at the location defined
|
||
|
by the job parameter called "input.file.name", as shown in the following bean configuration:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
|
||
|
scope="step">
|
||
|
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
|
||
|
...
|
||
|
</bean>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
@StepScope
|
||
|
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
|
||
|
...
|
||
|
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
|
||
|
flatFileItemReader.setResource(new FileSystemResource(resource));
|
||
|
...
|
||
|
return flatFileItemReader;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
The main points of interest in the preceding example are injecting the value of
|
||
|
`#{jobParameters['input.file.name']}`
|
||
|
as the Resource property value and setting the `ItemReader` bean
|
||
|
to have __Step scope__. Setting the bean to have Step scope takes advantage of
|
||
|
the late binding support, which allows access to the
|
||
|
`jobParameters` variable.
|
||
|
|
||
|
|
||
|
[[availableAttributesOfTheJobLaunchingGateway]]
|
||
|
=== Available Attributes of the Job-Launching Gateway
|
||
|
|
||
|
The job-launching gateway has the following attributes that you can set to control a job:
|
||
|
|
||
|
* `id`: Identifies the underlying Spring bean definition, which is an instance of either:
|
||
|
** `EventDrivenConsumer`
|
||
|
** `PollingConsumer`
|
||
|
(The exact implementation depends on whether the component's input channel is a
|
||
|
`SubscribableChannel` or `PollableChannel`.)
|
||
|
* `auto-startup`: Boolean flag to indicate that the endpoint should start automatically on
|
||
|
startup. The default is __true__.
|
||
|
* `request-channel`: The input `MessageChannel` of this endpoint.
|
||
|
* `reply-channel`: `MessageChannel` to which the resulting `JobExecution` payload is sent.
|
||
|
* `reply-timeout`: Lets you specify how long (in milliseconds) this gateway waits for the reply message
|
||
|
to be sent successfully to the reply channel before throwing
|
||
|
an exception. This attribute only applies when the channel
|
||
|
might block (for example, when using a bounded queue channel
|
||
|
that is currently full). Also, keep in mind that, when sending to a
|
||
|
`DirectChannel`, the invocation occurs
|
||
|
in the sender's thread. Therefore, the failing of the send
|
||
|
operation may be caused by other components further downstream.
|
||
|
The `reply-timeout` attribute maps to the
|
||
|
`sendTimeout` property of the underlying
|
||
|
`MessagingTemplate` instance. If not specified, the attribute
|
||
|
defaults to<emphasis>-1</emphasis>,
|
||
|
meaning that, by default, the `Gateway` waits indefinitely.
|
||
|
* `job-launcher`: Optional. Accepts a
|
||
|
custom
|
||
|
`JobLauncher`
|
||
|
bean reference.
|
||
|
If not specified the adapter
|
||
|
re-uses the instance that is registered under the `id` of
|
||
|
`jobLauncher`. If no default instance
|
||
|
exists, an exception is thrown.
|
||
|
* `order`: Specifies the order of invocation when this endpoint is connected as a subscriber
|
||
|
to a `SubscribableChannel`.
|
||
|
|
||
|
=== Sub-Elements
|
||
|
When this `Gateway` is receiving messages from a
|
||
|
`PollableChannel`, you must either provide
|
||
|
a global default `Poller` or provide a `Poller` sub-element to the
|
||
|
`Job Launching Gateway`, as shown in the following example:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<batch-int:job-launching-gateway request-channel="queueChannel"
|
||
|
reply-channel="replyChannel" job-launcher="jobLauncher">
|
||
|
<int:poller fixed-rate="1000">
|
||
|
</batch-int:job-launching-gateway>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
|
||
|
public JobLaunchingGateway sampleJobLaunchingGateway() {
|
||
|
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
|
||
|
jobLaunchingGateway.setOutputChannel(replyChannel());
|
||
|
return jobLaunchingGateway;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
[[providing-feedback-with-informational-messages]]
|
||
|
|
||
|
==== Providing Feedback with Informational Messages
|
||
|
|
||
|
|
||
|
As Spring Batch jobs can run for long times, providing progress
|
||
|
information is often critical. For example, stake-holders may want
|
||
|
to be notified if some or all parts of a batch job have failed.
|
||
|
Spring Batch provides support for this information being gathered
|
||
|
through:
|
||
|
|
||
|
|
||
|
|
||
|
* Active polling
|
||
|
|
||
|
* Event-driven listeners
|
||
|
|
||
|
|
||
|
When starting a Spring Batch job asynchronously (for example, by using the
|
||
|
`Job Launching Gateway`), a
|
||
|
`JobExecution` instance is returned. Thus,
|
||
|
`JobExecution.getJobId()` can be used to
|
||
|
continuously poll for status updates by retrieving updated
|
||
|
instances of the `JobExecution` from the
|
||
|
`JobRepository` by using the
|
||
|
`JobExplorer`. However, this is considered
|
||
|
sub-optimal, and an event-driven approach should be preferred.
|
||
|
|
||
|
|
||
|
Therefore, Spring Batch provides listeners, including the three most commonly used listeners:
|
||
|
|
||
|
* StepListener
|
||
|
* ChunkListener
|
||
|
* JobExecutionListener
|
||
|
|
||
|
In the example shown in the following image, a Spring Batch job has been configured with a
|
||
|
`StepExecutionListener`. Thus, Spring
|
||
|
Integration receives and processes any step before or after
|
||
|
events. For example, the received
|
||
|
`StepExecution` can be inspected by using a
|
||
|
`Router`. Based on the results of that
|
||
|
inspection, various things can occur (such as routing a message
|
||
|
to a Mail Outbound Channel Adapter), so that an Email notification
|
||
|
can be sent out based on some condition.
|
||
|
|
||
|
.Handling Informational Messages
|
||
|
image::{batch-asciidoc}images/handling-informational-messages.png[Handling Informational Messages, scaledwidth="60%"]
|
||
|
|
||
|
|
||
|
The following two-part example shows how a listener is configured to send a
|
||
|
message to a `Gateway` for a
|
||
|
`StepExecution` events and log its output to a
|
||
|
`logging-channel-adapter`.
|
||
|
|
||
|
First, create the notification integration beans:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<int:channel id="stepExecutionsChannel"/>
|
||
|
|
||
|
<int:gateway id="notificationExecutionsListener"
|
||
|
service-interface="org.springframework.batch.core.StepExecutionListener"
|
||
|
default-request-channel="stepExecutionsChannel"/>
|
||
|
|
||
|
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
@ServiceActivator(inputChannel = "stepExecutionsChannel")
|
||
|
public LoggingHandler loggingHandler() {
|
||
|
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
|
||
|
adapter.setLoggerName("TEST_LOGGER");
|
||
|
adapter.setLogExpressionString("headers.id + ': ' + payload");
|
||
|
return adapter;
|
||
|
}
|
||
|
|
||
|
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
|
||
|
public interface NotificationExecutionListener extends StepExecutionListener {}
|
||
|
----
|
||
|
|
||
|
[role="javaContent"]
|
||
|
NOTE: You will need to add the `@IntegrationComponentScan` annotation to your configuration.
|
||
|
|
||
|
[[message-gateway-entry-list]]
|
||
|
|
||
|
Second, modify your job to add a step-level listener:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<job id="importPayments">
|
||
|
<step id="step1">
|
||
|
<tasklet ../>
|
||
|
<chunk ../>
|
||
|
<listeners>
|
||
|
<listener ref="notificationExecutionsListener"/>
|
||
|
</listeners>
|
||
|
</tasklet>
|
||
|
...
|
||
|
</step>
|
||
|
</job>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
public Job importPaymentsJob() {
|
||
|
return jobBuilderFactory.get("importPayments")
|
||
|
.start(stepBuilderFactory.get("step1")
|
||
|
.chunk(200)
|
||
|
.listener(notificationExecutionsListener())
|
||
|
...
|
||
|
}
|
||
|
----
|
||
|
|
||
|
[[asynchronous-processors]]
|
||
|
|
||
|
==== Asynchronous Processors
|
||
|
|
||
|
|
||
|
Asynchronous Processors help you to scale the processing of
|
||
|
items. In the asynchronous processor use case, an
|
||
|
`AsyncItemProcessor` serves as a dispatcher,
|
||
|
executing the logic of the `ItemProcessor` for an
|
||
|
item on a new thread. Once the item completes, the `Future` is passed to
|
||
|
the `AsynchItemWriter` to be written.
|
||
|
|
||
|
|
||
|
|
||
|
Therefore, you can increase performance by using asynchronous item
|
||
|
processing, basically allowing you to implement
|
||
|
__fork-join__ scenarios. The
|
||
|
`AsyncItemWriter` gathers the results and
|
||
|
writes back the chunk as soon as all the results become available.
|
||
|
|
||
|
|
||
|
|
||
|
The following example shows how to configuration the `AsyncItemProcessor`:
|
||
|
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="processor"
|
||
|
class="org.springframework.batch.integration.async.AsyncItemProcessor">
|
||
|
<property name="delegate">
|
||
|
<bean class="your.ItemProcessor"/>
|
||
|
</property>
|
||
|
<property name="taskExecutor">
|
||
|
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
|
||
|
</property>
|
||
|
</bean>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
|
||
|
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
|
||
|
asyncItemProcessor.setTaskExecutor(taskExecutor);
|
||
|
asyncItemProcessor.setDelegate(itemProcessor);
|
||
|
return asyncItemProcessor;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
The `delegate` property refers
|
||
|
to your `ItemProcessor` bean, and
|
||
|
the `taskExecutor` property
|
||
|
refers to the `TaskExecutor` of your choice.
|
||
|
|
||
|
The following example shows how to configure the `AsyncItemWriter`:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="itemWriter"
|
||
|
class="org.springframework.batch.integration.async.AsyncItemWriter">
|
||
|
<property name="delegate">
|
||
|
<bean id="itemWriter" class="your.ItemWriter"/>
|
||
|
</property>
|
||
|
</bean>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
public AsyncItemWriter writer(ItemWriter itemWriter) {
|
||
|
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
|
||
|
asyncItemWriter.setDelegate(itemWriter);
|
||
|
return asyncItemWriter;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
Again, the `delegate` property is
|
||
|
actually a reference to your `ItemWriter` bean.
|
||
|
|
||
|
|
||
|
[[externalizing-batch-process-execution]]
|
||
|
|
||
|
==== Externalizing Batch Process Execution
|
||
|
|
||
|
|
||
|
The integration approaches discussed so far suggest use cases
|
||
|
where Spring Integration wraps Spring Batch like an outer-shell.
|
||
|
However, Spring Batch can also use Spring Integration internally.
|
||
|
Using this approach, Spring Batch users can delegate the
|
||
|
processing of items or even chunks to outside processes. This
|
||
|
allows you to offload complex processing. Spring Batch Integration
|
||
|
provides dedicated support for:
|
||
|
|
||
|
|
||
|
|
||
|
* Remote Chunking
|
||
|
|
||
|
|
||
|
|
||
|
* Remote Partitioning
|
||
|
|
||
|
|
||
|
[[remote-chunking]]
|
||
|
|
||
|
===== Remote Chunking
|
||
|
|
||
|
.Remote Chunking
|
||
|
image::{batch-asciidoc}images/remote-chunking-sbi.png[Remote Chunking, scaledwidth="60%"]
|
||
|
|
||
|
Taking things one step further, one can also externalize the
|
||
|
chunk processing by using the
|
||
|
`ChunkMessageChannelItemWriter`
|
||
|
(provided by Spring Batch Integration), which sends items out
|
||
|
and collects the result. Once sent, Spring Batch continues the
|
||
|
process of reading and grouping items, without waiting for the results.
|
||
|
Rather, it is the responsibility of the `ChunkMessageChannelItemWriter`
|
||
|
to gather the results and integrate them back into the Spring Batch process.
|
||
|
|
||
|
|
||
|
With Spring Integration, you have full
|
||
|
control over the concurrency of your processes (for instance, by
|
||
|
using a `QueueChannel` instead of a
|
||
|
`DirectChannel`). Furthermore, by relying on
|
||
|
Spring Integration's rich collection of Channel Adapters (such as
|
||
|
JMS and AMQP), you can distribute chunks of a Batch job to
|
||
|
external systems for processing.
|
||
|
|
||
|
A simple job with a step to be remotely chunked might have a
|
||
|
configuration similar to the following:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<job id="personJob">
|
||
|
<step id="step1">
|
||
|
<tasklet>
|
||
|
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
|
||
|
</tasklet>
|
||
|
...
|
||
|
</step>
|
||
|
</job>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
public Job chunkJob() {
|
||
|
return jobBuilderFactory.get("personJob")
|
||
|
.start(stepBuilderFactory.get("step1")
|
||
|
.<Person, Person>chunk(200)
|
||
|
.reader(itemReader())
|
||
|
.writer(itemWriter())
|
||
|
.build())
|
||
|
.build();
|
||
|
}
|
||
|
----
|
||
|
|
||
|
The `ItemReader` reference points to the bean you want
|
||
|
to use for reading data on the manager. The `ItemWriter` reference
|
||
|
points to a special `ItemWriter`
|
||
|
(called `ChunkMessageChannelItemWriter`),
|
||
|
as described above. The processor (if any) is left off the
|
||
|
manager configuration, as it is configured on the worker. The
|
||
|
following configuration provides a basic manager setup. You
|
||
|
should check any additional component properties, such as
|
||
|
throttle limits and so on, when implementing your use case.
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
|
||
|
<property name="brokerURL" value="tcp://localhost:61616"/>
|
||
|
</bean>
|
||
|
|
||
|
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
|
||
|
|
||
|
<bean id="messagingTemplate"
|
||
|
class="org.springframework.integration.core.MessagingTemplate">
|
||
|
<property name="defaultChannel" ref="requests"/>
|
||
|
<property name="receiveTimeout" value="2000"/>
|
||
|
</bean>
|
||
|
|
||
|
<bean id="itemWriter"
|
||
|
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
|
||
|
scope="step">
|
||
|
<property name="messagingOperations" ref="messagingTemplate"/>
|
||
|
<property name="replyChannel" ref="replies"/>
|
||
|
</bean>
|
||
|
|
||
|
<int:channel id="replies">
|
||
|
<int:queue/>
|
||
|
</int:channel>
|
||
|
|
||
|
<int-jms:message-driven-channel-adapter id="jmsReplies"
|
||
|
destination-name="replies"
|
||
|
channel="replies"/>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
|
||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||
|
factory.setBrokerURL("tcp://localhost:61616");
|
||
|
return factory;
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure outbound flow (requests going to workers)
|
||
|
*/
|
||
|
@Bean
|
||
|
public DirectChannel requests() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
|
||
|
return IntegrationFlows
|
||
|
.from(requests())
|
||
|
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure inbound flow (replies coming from workers)
|
||
|
*/
|
||
|
@Bean
|
||
|
public QueueChannel replies() {
|
||
|
return new QueueChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
|
||
|
return IntegrationFlows
|
||
|
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
|
||
|
.channel(replies())
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure the ChunkMessageChannelItemWriter
|
||
|
*/
|
||
|
@Bean
|
||
|
public ItemWriter<Integer> itemWriter() {
|
||
|
MessagingTemplate messagingTemplate = new MessagingTemplate();
|
||
|
messagingTemplate.setDefaultChannel(requests());
|
||
|
messagingTemplate.setReceiveTimeout(2000);
|
||
|
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
|
||
|
= new ChunkMessageChannelItemWriter<>();
|
||
|
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
|
||
|
chunkMessageChannelItemWriter.setReplyChannel(replies());
|
||
|
return chunkMessageChannelItemWriter;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
The preceding configuration provides us with a number of beans. We
|
||
|
configure our messaging middleware using ActiveMQ and the
|
||
|
inbound/outbound JMS adapters provided by Spring Integration. As
|
||
|
shown, our `itemWriter` bean, which is
|
||
|
referenced by our job step, uses the
|
||
|
`ChunkMessageChannelItemWriter` for writing chunks over the
|
||
|
configured middleware.
|
||
|
|
||
|
Now we can move on to the worker configuration, as shown in the following example:
|
||
|
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
|
||
|
<property name="brokerURL" value="tcp://localhost:61616"/>
|
||
|
</bean>
|
||
|
|
||
|
<int:channel id="requests"/>
|
||
|
<int:channel id="replies"/>
|
||
|
|
||
|
<int-jms:message-driven-channel-adapter id="incomingRequests"
|
||
|
destination-name="requests"
|
||
|
channel="requests"/>
|
||
|
|
||
|
<int-jms:outbound-channel-adapter id="outgoingReplies"
|
||
|
destination-name="replies"
|
||
|
channel="replies">
|
||
|
</int-jms:outbound-channel-adapter>
|
||
|
|
||
|
<int:service-activator id="serviceActivator"
|
||
|
input-channel="requests"
|
||
|
output-channel="replies"
|
||
|
ref="chunkProcessorChunkHandler"
|
||
|
method="handleChunk"/>
|
||
|
|
||
|
<bean id="chunkProcessorChunkHandler"
|
||
|
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
|
||
|
<property name="chunkProcessor">
|
||
|
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
|
||
|
<property name="itemWriter">
|
||
|
<bean class="io.spring.sbi.PersonItemWriter"/>
|
||
|
</property>
|
||
|
<property name="itemProcessor">
|
||
|
<bean class="io.spring.sbi.PersonItemProcessor"/>
|
||
|
</property>
|
||
|
</bean>
|
||
|
</property>
|
||
|
</bean>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
@Bean
|
||
|
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
|
||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||
|
factory.setBrokerURL("tcp://localhost:61616");
|
||
|
return factory;
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure inbound flow (requests coming from the manager)
|
||
|
*/
|
||
|
@Bean
|
||
|
public DirectChannel requests() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
|
||
|
return IntegrationFlows
|
||
|
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
|
||
|
.channel(requests())
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure outbound flow (replies going to the manager)
|
||
|
*/
|
||
|
@Bean
|
||
|
public DirectChannel replies() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
|
||
|
return IntegrationFlows
|
||
|
.from(replies())
|
||
|
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configure the ChunkProcessorChunkHandler
|
||
|
*/
|
||
|
@Bean
|
||
|
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
|
||
|
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
|
||
|
ChunkProcessor<Integer> chunkProcessor
|
||
|
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
|
||
|
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
|
||
|
= new ChunkProcessorChunkHandler<>();
|
||
|
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
|
||
|
return chunkProcessorChunkHandler;
|
||
|
}
|
||
|
----
|
||
|
|
||
|
Most of these configuration items should look familiar from the
|
||
|
manager configuration. Workers do not need access to
|
||
|
the Spring Batch `JobRepository` nor
|
||
|
to the actual job configuration file. The main bean of interest
|
||
|
is the `chunkProcessorChunkHandler`. The
|
||
|
`chunkProcessor` property of `ChunkProcessorChunkHandler` takes a
|
||
|
configured `SimpleChunkProcessor`, which is where you would provide a reference to your
|
||
|
`ItemWriter` (and, optionally, your
|
||
|
`ItemProcessor`) that will run on the worker
|
||
|
when it receives chunks from the manager.
|
||
|
|
||
|
For more information, see the section of the "Scalability" chapter on
|
||
|
link:$$https://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].
|
||
|
|
||
|
Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration`
|
||
|
annotation that can be used to simplify a remote chunking setup. This annotation provides
|
||
|
two beans that can be autowired in the application context:
|
||
|
|
||
|
* `RemoteChunkingManagerStepBuilderFactory`: used to configure the manager step
|
||
|
* `RemoteChunkingWorkerBuilder`: used to configure the remote worker integration flow
|
||
|
|
||
|
These APIs take care of configuring a number of components as described in the following diagram:
|
||
|
|
||
|
.Remote Chunking Configuration
|
||
|
image::{batch-asciidoc}images/remote-chunking-config.png[Remote Chunking Configuration, scaledwidth="80%"]
|
||
|
|
||
|
On the manager side, the `RemoteChunkingManagerStepBuilderFactory` lets you
|
||
|
configure a manager step by declaring:
|
||
|
|
||
|
* the item reader to read items and send them to workers
|
||
|
* the output channel ("Outgoing requests") to send requests to workers
|
||
|
* the input channel ("Incoming replies") to receive replies from workers
|
||
|
|
||
|
A `ChunkMessageChannelItemWriter` and the `MessagingTemplate` are not needed to be explicitly configured
|
||
|
(Those can still be explicitly configured if required).
|
||
|
|
||
|
On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to:
|
||
|
|
||
|
* listen to requests sent by the manager on the input channel ("Incoming requests")
|
||
|
* call the `handleChunk` method of `ChunkProcessorChunkHandler` for each request
|
||
|
with the configured `ItemProcessor` and `ItemWriter`
|
||
|
* send replies on the output channel ("Outgoing replies") to the manager
|
||
|
|
||
|
There is no need to explicitly configure the `SimpleChunkProcessor`
|
||
|
and the `ChunkProcessorChunkHandler` (Those can be explicitly configured if required).
|
||
|
|
||
|
The following example shows how to use these APIs:
|
||
|
|
||
|
[source, java]
|
||
|
----
|
||
|
@EnableBatchIntegration
|
||
|
@EnableBatchProcessing
|
||
|
public class RemoteChunkingJobConfiguration {
|
||
|
|
||
|
@Configuration
|
||
|
public static class ManagerConfiguration {
|
||
|
|
||
|
@Autowired
|
||
|
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
|
||
|
|
||
|
@Bean
|
||
|
public TaskletStep managerStep() {
|
||
|
return this.managerStepBuilderFactory.get("managerStep")
|
||
|
.chunk(100)
|
||
|
.reader(itemReader())
|
||
|
.outputChannel(requests()) // requests sent to workers
|
||
|
.inputChannel(replies()) // replies received from workers
|
||
|
.build();
|
||
|
}
|
||
|
|
||
|
// Middleware beans setup omitted
|
||
|
|
||
|
}
|
||
|
|
||
|
@Configuration
|
||
|
public static class WorkerConfiguration {
|
||
|
|
||
|
@Autowired
|
||
|
private RemoteChunkingWorkerBuilder workerBuilder;
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow workerFlow() {
|
||
|
return this.workerBuilder
|
||
|
.itemProcessor(itemProcessor())
|
||
|
.itemWriter(itemWriter())
|
||
|
.inputChannel(requests()) // requests received from the manager
|
||
|
.outputChannel(replies()) // replies sent to the manager
|
||
|
.build();
|
||
|
}
|
||
|
|
||
|
// Middleware beans setup omitted
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
----
|
||
|
|
||
|
You can find a complete example of a remote chunking job
|
||
|
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here].
|
||
|
|
||
|
[[remote-partitioning]]
|
||
|
|
||
|
===== Remote Partitioning
|
||
|
|
||
|
.Remote Partitioning
|
||
|
image::{batch-asciidoc}images/remote-partitioning.png[Remote Partitioning, scaledwidth="60%"]
|
||
|
|
||
|
|
||
|
Remote Partitioning, on the other hand, is useful when it
|
||
|
is not the processing of items but rather the associated I/O that
|
||
|
causes the bottleneck. Using Remote Partitioning, work can
|
||
|
be farmed out to workers that execute complete Spring Batch
|
||
|
steps. Thus, each worker has its own `ItemReader`, `ItemProcessor`, and
|
||
|
`ItemWriter`. For this purpose, Spring Batch
|
||
|
Integration provides the `MessageChannelPartitionHandler`.
|
||
|
|
||
|
|
||
|
|
||
|
This implementation of the `PartitionHandler`
|
||
|
interface uses `MessageChannel` instances to
|
||
|
send instructions to remote workers and receive their responses.
|
||
|
This provides a nice abstraction from the transports (such as JMS
|
||
|
and AMQP) being used to communicate with the remote workers.
|
||
|
|
||
|
|
||
|
|
||
|
The section of the "Scalability" chapter that addresses
|
||
|
<<scalability.adoc#partitioning,remote partitioning>> provides an overview of the concepts and
|
||
|
components needed to configure remote partitioning and shows an
|
||
|
example of using the default
|
||
|
`TaskExecutorPartitionHandler` to partition
|
||
|
in separate local threads of execution. For remote partitioning
|
||
|
to multiple JVMs, two additional components are required:
|
||
|
|
||
|
* A remoting fabric or grid environment
|
||
|
* A `PartitionHandler` implementation that supports the desired
|
||
|
remoting fabric or grid environment
|
||
|
|
||
|
|
||
|
|
||
|
Similar to remote chunking, JMS can be used as the "remoting
|
||
|
fabric". In that case, use a `MessageChannelPartitionHandler` instance as the `PartitionHandler` implementation,
|
||
|
as described above.
|
||
|
The following example
|
||
|
assumes an existing partitioned job and focuses on
|
||
|
the `MessageChannelPartitionHandler` and JMS
|
||
|
configuration:
|
||
|
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<bean id="partitionHandler"
|
||
|
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
|
||
|
<property name="stepName" value="step1"/>
|
||
|
<property name="gridSize" value="3"/>
|
||
|
<property name="replyChannel" ref="outbound-replies"/>
|
||
|
<property name="messagingOperations">
|
||
|
<bean class="org.springframework.integration.core.MessagingTemplate">
|
||
|
<property name="defaultChannel" ref="outbound-requests"/>
|
||
|
<property name="receiveTimeout" value="100000"/>
|
||
|
</bean>
|
||
|
</property>
|
||
|
</bean>
|
||
|
|
||
|
<int:channel id="outbound-requests"/>
|
||
|
<int-jms:outbound-channel-adapter destination="requestsQueue"
|
||
|
channel="outbound-requests"/>
|
||
|
|
||
|
<int:channel id="inbound-requests"/>
|
||
|
<int-jms:message-driven-channel-adapter destination="requestsQueue"
|
||
|
channel="inbound-requests"/>
|
||
|
|
||
|
<bean id="stepExecutionRequestHandler"
|
||
|
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
|
||
|
<property name="jobExplorer" ref="jobExplorer"/>
|
||
|
<property name="stepLocator" ref="stepLocator"/>
|
||
|
</bean>
|
||
|
|
||
|
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
|
||
|
output-channel="outbound-staging"/>
|
||
|
|
||
|
<int:channel id="outbound-staging"/>
|
||
|
<int-jms:outbound-channel-adapter destination="stagingQueue"
|
||
|
channel="outbound-staging"/>
|
||
|
|
||
|
<int:channel id="inbound-staging"/>
|
||
|
<int-jms:message-driven-channel-adapter destination="stagingQueue"
|
||
|
channel="inbound-staging"/>
|
||
|
|
||
|
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
|
||
|
output-channel="outbound-replies"/>
|
||
|
|
||
|
<int:channel id="outbound-replies">
|
||
|
<int:queue/>
|
||
|
</int:channel>
|
||
|
|
||
|
<bean id="stepLocator"
|
||
|
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
|
||
|
/*
|
||
|
* Configuration of the manager side
|
||
|
*/
|
||
|
@Bean
|
||
|
public PartitionHandler partitionHandler() {
|
||
|
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
|
||
|
partitionHandler.setStepName("step1");
|
||
|
partitionHandler.setGridSize(3);
|
||
|
partitionHandler.setReplyChannel(outboundReplies());
|
||
|
MessagingTemplate template = new MessagingTemplate();
|
||
|
template.setDefaultChannel(outboundRequests());
|
||
|
template.setReceiveTimeout(100000);
|
||
|
partitionHandler.setMessagingOperations(template);
|
||
|
return partitionHandler;
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public QueueChannel outboundReplies() {
|
||
|
return new QueueChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public DirectChannel outboundRequests() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow outboundJmsRequests() {
|
||
|
return IntegrationFlows.from("outboundRequests")
|
||
|
.handle(Jms.outboundGateway(connectionFactory())
|
||
|
.requestDestination("requestsQueue"))
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
@ServiceActivator(inputChannel = "inboundStaging")
|
||
|
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
|
||
|
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
|
||
|
aggregatorFactoryBean.setProcessorBean(partitionHandler());
|
||
|
aggregatorFactoryBean.setOutputChannel(outboundReplies());
|
||
|
// configure other propeties of the aggregatorFactoryBean
|
||
|
return aggregatorFactoryBean;
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public DirectChannel inboundStaging() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow inboundJmsStaging() {
|
||
|
return IntegrationFlows
|
||
|
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
|
||
|
.configureListenerContainer(c -> c.subscriptionDurable(false))
|
||
|
.destination("stagingQueue"))
|
||
|
.channel(inboundStaging())
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Configuration of the worker side
|
||
|
*/
|
||
|
@Bean
|
||
|
public StepExecutionRequestHandler stepExecutionRequestHandler() {
|
||
|
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
|
||
|
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
|
||
|
stepExecutionRequestHandler.setStepLocator(stepLocator());
|
||
|
return stepExecutionRequestHandler;
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
|
||
|
public StepExecutionRequestHandler serviceActivator() throws Exception {
|
||
|
return stepExecutionRequestHandler();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public DirectChannel inboundRequests() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
public IntegrationFlow inboundJmsRequests() {
|
||
|
return IntegrationFlows
|
||
|
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
|
||
|
.configureListenerContainer(c -> c.subscriptionDurable(false))
|
||
|
.destination("requestsQueue"))
|
||
|
.channel(inboundRequests())
|
||
|
.get();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public DirectChannel outboundStaging() {
|
||
|
return new DirectChannel();
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public IntegrationFlow outboundJmsStaging() {
|
||
|
return IntegrationFlows.from("outboundStaging")
|
||
|
.handle(Jms.outboundGateway(connectionFactory())
|
||
|
.requestDestination("stagingQueue"))
|
||
|
.get();
|
||
|
}
|
||
|
----
|
||
|
|
||
|
You must also ensure that the partition `handler` attribute maps to the `partitionHandler` bean, as shown in the following example:
|
||
|
|
||
|
.XML Configuration
|
||
|
[source, xml, role="xmlContent"]
|
||
|
----
|
||
|
<job id="personJob">
|
||
|
<step id="step1.manager">
|
||
|
<partition partitioner="partitioner" handler="partitionHandler"/>
|
||
|
...
|
||
|
</step>
|
||
|
</job>
|
||
|
----
|
||
|
|
||
|
.Java Configuration
|
||
|
[source, java, role="javaContent"]
|
||
|
----
|
||
|
public Job personJob() {
|
||
|
return jobBuilderFactory.get("personJob")
|
||
|
.start(stepBuilderFactory.get("step1.manager")
|
||
|
.partitioner("step1.worker", partitioner())
|
||
|
.partitionHandler(partitionHandler())
|
||
|
.build())
|
||
|
.build();
|
||
|
}
|
||
|
----
|
||
|
|
||
|
You can find a complete example of a remote partitioning job
|
||
|
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-partitioning-sample$$[here].
|
||
|
|
||
|
The `@EnableBatchIntegration` annotation that can be used to simplify a remote
|
||
|
partitioning setup. This annotation provides two beans useful for remote partitioning:
|
||
|
|
||
|
* `RemotePartitioningManagerStepBuilderFactory`: used to configure the manager step
|
||
|
* `RemotePartitioningWorkerStepBuilderFactory`: used to configure the worker step
|
||
|
|
||
|
These APIs take care of configuring a number of components as described in the following diagram:
|
||
|
|
||
|
.Remote Partitioning Configuration (with job repository polling)
|
||
|
image::{batch-asciidoc}images/remote-partitioning-polling-config.png[Remote Partitioning Configuration (with job repository polling), scaledwidth="80%"]
|
||
|
|
||
|
.Remote Partitioning Configuration (with replies aggregation)
|
||
|
image::{batch-asciidoc}images/remote-partitioning-aggregation-config.png[Remote Partitioning Configuration (with replies aggregation), scaledwidth="80%"]
|
||
|
|
||
|
On the manager side, the `RemotePartitioningManagerStepBuilderFactory` allows you to
|
||
|
configure a manager step by declaring:
|
||
|
|
||
|
* the `Partitioner` used to partition data
|
||
|
* the output channel ("Outgoing requests") to send requests to workers
|
||
|
* the input channel ("Incoming replies") to receive replies from workers (when configuring replies aggregation)
|
||
|
* the poll interval and timeout parameters (when configuring job repository polling)
|
||
|
|
||
|
The `MessageChannelPartitionHandler` and the `MessagingTemplate` are not needed to be explicitly configured
|
||
|
(Those can still be explicitly configured if required).
|
||
|
|
||
|
On the worker side, the `RemotePartitioningWorkerStepBuilderFactory` allows you to configure a worker to:
|
||
|
|
||
|
* listen to requests sent by the manager on the input channel ("Incoming requests")
|
||
|
* call the `handle` method of `StepExecutionRequestHandler` for each request
|
||
|
* send replies on the output channel ("Outgoing replies") to the manager
|
||
|
|
||
|
There is no need to explicitly configure the `StepExecutionRequestHandler` (which can be explicitly configured if required).
|
||
|
|
||
|
The following example shows how to use these APIs:
|
||
|
|
||
|
[source, java]
|
||
|
----
|
||
|
@Configuration
|
||
|
@EnableBatchProcessing
|
||
|
@EnableBatchIntegration
|
||
|
public class RemotePartitioningJobConfiguration {
|
||
|
|
||
|
@Configuration
|
||
|
public static class ManagerConfiguration {
|
||
|
|
||
|
@Autowired
|
||
|
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
|
||
|
|
||
|
@Bean
|
||
|
public Step managerStep() {
|
||
|
return this.managerStepBuilderFactory
|
||
|
.get("managerStep")
|
||
|
.partitioner("workerStep", partitioner())
|
||
|
.gridSize(10)
|
||
|
.outputChannel(outgoingRequestsToWorkers())
|
||
|
.inputChannel(incomingRepliesFromWorkers())
|
||
|
.build();
|
||
|
}
|
||
|
|
||
|
// Middleware beans setup omitted
|
||
|
|
||
|
}
|
||
|
|
||
|
@Configuration
|
||
|
public static class WorkerConfiguration {
|
||
|
|
||
|
@Autowired
|
||
|
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
|
||
|
|
||
|
@Bean
|
||
|
public Step workerStep() {
|
||
|
return this.workerStepBuilderFactory
|
||
|
.get("workerStep")
|
||
|
.inputChannel(incomingRequestsFromManager())
|
||
|
.outputChannel(outgoingRepliesToManager())
|
||
|
.chunk(100)
|
||
|
.reader(itemReader())
|
||
|
.processor(itemProcessor())
|
||
|
.writer(itemWriter())
|
||
|
.build();
|
||
|
}
|
||
|
|
||
|
// Middleware beans setup omitted
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
----
|