diff --git a/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/CustomMultiResourcePartitioner.java b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/CustomMultiResourcePartitioner.java new file mode 100644 index 0000000000..4cae69efbd --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/CustomMultiResourcePartitioner.java @@ -0,0 +1,77 @@ +/* + * Copyright 2006-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.baeldung.spring_batch_intro.partitioner; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; + +public class CustomMultiResourcePartitioner implements Partitioner { + + private static final String DEFAULT_KEY_NAME = "fileName"; + + private static final String PARTITION_KEY = "partition"; + + private Resource[] resources = new Resource[0]; + + private String keyName = DEFAULT_KEY_NAME; + + /** + * The resources to assign to each partition. In Spring configuration you + * can use a pattern to select multiple resources. + * @param resources the resources to use + */ + public void setResources(Resource[] resources) { + this.resources = resources; + } + + /** + * The name of the key for the file name in each {@link ExecutionContext}. + * Defaults to "fileName". + * @param keyName the value of the key + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * Assign the filename of each of the injected resources to an + * {@link ExecutionContext}. + * + * @see Partitioner#partition(int) + */ + @Override + public Map partition(int gridSize) { + Map map = new HashMap(gridSize); + int i = 0, k = 1; + for (Resource resource : resources) { + ExecutionContext context = new ExecutionContext(); + Assert.state(resource.exists(), "Resource does not exist: " + resource); + context.putString(keyName, resource.getFilename()); + context.putString("opFileName", "output" + k++ + ".xml"); + + map.put(PARTITION_KEY + i, context); + i++; + } + return map; + } + +} diff --git a/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionConfig.java b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionConfig.java new file mode 100644 index 0000000000..fe8339a8b4 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionConfig.java @@ -0,0 +1,169 @@ +package org.baeldung.spring_batch_intro.partitioner; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.text.ParseException; + +import javax.sql.DataSource; + +import org.baeldung.spring_batch_intro.model.Transaction; +import org.baeldung.spring_batch_intro.service.RecordFieldSetMapper; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.item.UnexpectedInputException; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.mapping.DefaultLineMapper; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.batch.item.xml.StaxEventItemWriter; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.core.task.TaskExecutor; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.oxm.Marshaller; +import org.springframework.oxm.jaxb.Jaxb2Marshaller; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@EnableBatchProcessing +public class SpringbatchPartitionConfig { + + @Autowired + ResourcePatternResolver resoursePatternResolver; + + @Autowired + private JobBuilderFactory jobs; + + @Autowired + private StepBuilderFactory steps; + + @Bean(name = "partitionerJob") + public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { + return jobs.get("partitionerJob") + .start(partitionStep()) + .build(); + } + + @Bean + public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { + return steps.get("partitionStep") + .partitioner("slaveStep", partitioner()) + .step(slaveStep()) + .taskExecutor(taskExecutor()) + .build(); + } + + @Bean + public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { + return steps.get("slaveStep") + . chunk(1) + .reader(itemReader(null)) + .writer(itemWriter(marshaller(), null)) + .build(); + } + + @Bean + public CustomMultiResourcePartitioner partitioner() { + CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); + Resource[] resources; + try { + resources = resoursePatternResolver.getResources("file:src/main/resources/input/partitioner/*.csv"); + } catch (IOException e) { + throw new RuntimeException("I/O problems when resolving the input file pattern.", e); + } + partitioner.setResources(resources); + return partitioner; + } + + @Bean + @StepScope + public FlatFileItemReader itemReader(@Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { + FlatFileItemReader reader = new FlatFileItemReader(); + DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); + String[] tokens = { "username", "userid", "transactiondate", "amount" }; + tokenizer.setNames(tokens); + reader.setResource(new ClassPathResource("input/partitioner/" + filename)); + DefaultLineMapper lineMapper = new DefaultLineMapper(); + lineMapper.setLineTokenizer(tokenizer); + lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); + reader.setLinesToSkip(1); + reader.setLineMapper(lineMapper); + return reader; + } + + @Bean(destroyMethod = "") + @StepScope + public StaxEventItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { + StaxEventItemWriter itemWriter = new StaxEventItemWriter<>(); + itemWriter.setMarshaller(marshaller); + itemWriter.setRootTagName("transactionRecord"); + itemWriter.setResource(new FileSystemResource("src/main/resources/output/" + filename)); + return itemWriter; + } + + @Bean + public Marshaller marshaller() { + Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); + marshaller.setClassesToBeBound(new Class[] { Transaction.class }); + return marshaller; + } + + @Bean + public TaskExecutor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setMaxPoolSize(5); + taskExecutor.setCorePoolSize(5); + taskExecutor.setQueueCapacity(5); + taskExecutor.afterPropertiesSet(); + return taskExecutor; + } + + private JobRepository getJobRepository() throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(dataSource()); + factory.setTransactionManager(getTransactionManager()); + // JobRepositoryFactoryBean's methods Throws Generic Exception, + // it would have been better to have a specific one + factory.afterPropertiesSet(); + return (JobRepository) factory.getObject(); + } + + private DataSource dataSource() { + EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); + EmbeddedDatabase db = builder.setType(EmbeddedDatabaseType.HSQL) + .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql") + .addScript("classpath:org/springframework/batch/core/schema-h2.sql") + .build(); + return db; + } + + private PlatformTransactionManager getTransactionManager() { + return new ResourcelessTransactionManager(); + } + + public JobLauncher getJobLauncher() throws Exception { + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + // SimpleJobLauncher's methods Throws Generic Exception, + // it would have been better to have a specific one + jobLauncher.setJobRepository(getJobRepository()); + jobLauncher.afterPropertiesSet(); + return jobLauncher; + } +} diff --git a/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionerApp.java b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionerApp.java new file mode 100644 index 0000000000..1d6d922969 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/spring_batch_intro/partitioner/SpringbatchPartitionerApp.java @@ -0,0 +1,28 @@ +package org.baeldung.spring_batch_intro.partitioner; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +public class SpringbatchPartitionerApp { + public static void main(final String[] args) { + // Spring Java config + final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(SpringbatchPartitionConfig.class); + context.refresh(); + + final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); + final Job job = (Job) context.getBean("partitionerJob"); + System.out.println("Starting the batch job"); + try { + final JobExecution execution = jobLauncher.run(job, new JobParameters()); + System.out.println("Job Status : " + execution.getStatus()); + System.out.println("Job succeeded"); + } catch (final Exception e) { + e.printStackTrace(); + System.out.println("Job failed"); + } + } +} \ No newline at end of file