BAEL-243 Spring Batch using Partitioner
This commit is contained in:
parent
3c5ed71063
commit
89d89e6dce
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2006-2013 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.baeldung.spring_batch_intro.partitioner;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.partition.support.Partitioner;
|
||||||
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
public class CustomMultiResourcePartitioner implements Partitioner {
|
||||||
|
|
||||||
|
private static final String DEFAULT_KEY_NAME = "fileName";
|
||||||
|
|
||||||
|
private static final String PARTITION_KEY = "partition";
|
||||||
|
|
||||||
|
private Resource[] resources = new Resource[0];
|
||||||
|
|
||||||
|
private String keyName = DEFAULT_KEY_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The resources to assign to each partition. In Spring configuration you
|
||||||
|
* can use a pattern to select multiple resources.
|
||||||
|
* @param resources the resources to use
|
||||||
|
*/
|
||||||
|
public void setResources(Resource[] resources) {
|
||||||
|
this.resources = resources;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The name of the key for the file name in each {@link ExecutionContext}.
|
||||||
|
* Defaults to "fileName".
|
||||||
|
* @param keyName the value of the key
|
||||||
|
*/
|
||||||
|
public void setKeyName(String keyName) {
|
||||||
|
this.keyName = keyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assign the filename of each of the injected resources to an
|
||||||
|
* {@link ExecutionContext}.
|
||||||
|
*
|
||||||
|
* @see Partitioner#partition(int)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Map<String, ExecutionContext> partition(int gridSize) {
|
||||||
|
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);
|
||||||
|
int i = 0, k = 1;
|
||||||
|
for (Resource resource : resources) {
|
||||||
|
ExecutionContext context = new ExecutionContext();
|
||||||
|
Assert.state(resource.exists(), "Resource does not exist: " + resource);
|
||||||
|
context.putString(keyName, resource.getFilename());
|
||||||
|
context.putString("opFileName", "output" + k++ + ".xml");
|
||||||
|
|
||||||
|
map.put(PARTITION_KEY + i, context);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,169 @@
|
||||||
|
package org.baeldung.spring_batch_intro.partitioner;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.text.ParseException;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
import org.baeldung.spring_batch_intro.model.Transaction;
|
||||||
|
import org.baeldung.spring_batch_intro.service.RecordFieldSetMapper;
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
|
import org.springframework.batch.core.launch.JobLauncher;
|
||||||
|
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
|
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
|
||||||
|
import org.springframework.batch.item.UnexpectedInputException;
|
||||||
|
import org.springframework.batch.item.file.FlatFileItemReader;
|
||||||
|
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
|
||||||
|
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
|
||||||
|
import org.springframework.batch.item.xml.StaxEventItemWriter;
|
||||||
|
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.core.io.FileSystemResource;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
import org.springframework.core.io.support.ResourcePatternResolver;
|
||||||
|
import org.springframework.core.task.TaskExecutor;
|
||||||
|
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
|
||||||
|
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
|
||||||
|
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
|
||||||
|
import org.springframework.oxm.Marshaller;
|
||||||
|
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableBatchProcessing
|
||||||
|
public class SpringbatchPartitionConfig {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
ResourcePatternResolver resoursePatternResolver;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JobBuilderFactory jobs;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StepBuilderFactory steps;
|
||||||
|
|
||||||
|
@Bean(name = "partitionerJob")
|
||||||
|
public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException {
|
||||||
|
return jobs.get("partitionerJob")
|
||||||
|
.start(partitionStep())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
|
||||||
|
return steps.get("partitionStep")
|
||||||
|
.partitioner("slaveStep", partitioner())
|
||||||
|
.step(slaveStep())
|
||||||
|
.taskExecutor(taskExecutor())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException {
|
||||||
|
return steps.get("slaveStep")
|
||||||
|
.<Transaction, Transaction> chunk(1)
|
||||||
|
.reader(itemReader(null))
|
||||||
|
.writer(itemWriter(marshaller(), null))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public CustomMultiResourcePartitioner partitioner() {
|
||||||
|
CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner();
|
||||||
|
Resource[] resources;
|
||||||
|
try {
|
||||||
|
resources = resoursePatternResolver.getResources("file:src/main/resources/input/partitioner/*.csv");
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("I/O problems when resolving the input file pattern.", e);
|
||||||
|
}
|
||||||
|
partitioner.setResources(resources);
|
||||||
|
return partitioner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public FlatFileItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException {
|
||||||
|
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
|
||||||
|
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
|
||||||
|
String[] tokens = { "username", "userid", "transactiondate", "amount" };
|
||||||
|
tokenizer.setNames(tokens);
|
||||||
|
reader.setResource(new ClassPathResource("input/partitioner/" + filename));
|
||||||
|
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
|
||||||
|
lineMapper.setLineTokenizer(tokenizer);
|
||||||
|
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
|
||||||
|
reader.setLinesToSkip(1);
|
||||||
|
reader.setLineMapper(lineMapper);
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(destroyMethod = "")
|
||||||
|
@StepScope
|
||||||
|
public StaxEventItemWriter<Transaction> itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException {
|
||||||
|
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
|
||||||
|
itemWriter.setMarshaller(marshaller);
|
||||||
|
itemWriter.setRootTagName("transactionRecord");
|
||||||
|
itemWriter.setResource(new FileSystemResource("src/main/resources/output/" + filename));
|
||||||
|
return itemWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Marshaller marshaller() {
|
||||||
|
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
|
||||||
|
marshaller.setClassesToBeBound(new Class[] { Transaction.class });
|
||||||
|
return marshaller;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public TaskExecutor taskExecutor() {
|
||||||
|
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
|
||||||
|
taskExecutor.setMaxPoolSize(5);
|
||||||
|
taskExecutor.setCorePoolSize(5);
|
||||||
|
taskExecutor.setQueueCapacity(5);
|
||||||
|
taskExecutor.afterPropertiesSet();
|
||||||
|
return taskExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private JobRepository getJobRepository() throws Exception {
|
||||||
|
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
|
||||||
|
factory.setDataSource(dataSource());
|
||||||
|
factory.setTransactionManager(getTransactionManager());
|
||||||
|
// JobRepositoryFactoryBean's methods Throws Generic Exception,
|
||||||
|
// it would have been better to have a specific one
|
||||||
|
factory.afterPropertiesSet();
|
||||||
|
return (JobRepository) factory.getObject();
|
||||||
|
}
|
||||||
|
|
||||||
|
private DataSource dataSource() {
|
||||||
|
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
|
||||||
|
EmbeddedDatabase db = builder.setType(EmbeddedDatabaseType.HSQL)
|
||||||
|
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
|
||||||
|
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
|
||||||
|
.build();
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
|
private PlatformTransactionManager getTransactionManager() {
|
||||||
|
return new ResourcelessTransactionManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobLauncher getJobLauncher() throws Exception {
|
||||||
|
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
|
||||||
|
// SimpleJobLauncher's methods Throws Generic Exception,
|
||||||
|
// it would have been better to have a specific one
|
||||||
|
jobLauncher.setJobRepository(getJobRepository());
|
||||||
|
jobLauncher.afterPropertiesSet();
|
||||||
|
return jobLauncher;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.baeldung.spring_batch_intro.partitioner;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
import org.springframework.batch.core.launch.JobLauncher;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
|
||||||
|
public class SpringbatchPartitionerApp {
|
||||||
|
public static void main(final String[] args) {
|
||||||
|
// Spring Java config
|
||||||
|
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
|
||||||
|
context.register(SpringbatchPartitionConfig.class);
|
||||||
|
context.refresh();
|
||||||
|
|
||||||
|
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
|
||||||
|
final Job job = (Job) context.getBean("partitionerJob");
|
||||||
|
System.out.println("Starting the batch job");
|
||||||
|
try {
|
||||||
|
final JobExecution execution = jobLauncher.run(job, new JobParameters());
|
||||||
|
System.out.println("Job Status : " + execution.getStatus());
|
||||||
|
System.out.println("Job succeeded");
|
||||||
|
} catch (final Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.out.println("Job failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue