Partitioner Refactor (#2434)

* Refactor Batch

* Refactor Batch
This commit is contained in:
Grzegorz Piwowarek 2017-08-14 11:00:51 +02:00 committed by GitHub
parent 290e759d4a
commit 43357b0809
10 changed files with 49 additions and 52 deletions

View File

@ -1,4 +1,4 @@
package org.baeldung.spring_batch_intro; package org.baeldung.batch;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecution;

View File

@ -1,11 +1,8 @@
package org.baeldung.spring_batch_intro; package org.baeldung.batch;
import java.net.MalformedURLException; import org.baeldung.batch.model.Transaction;
import java.text.ParseException; import org.baeldung.batch.service.CustomItemProcessor;
import org.baeldung.batch.service.RecordFieldSetMapper;
import org.baeldung.spring_batch_intro.model.Transaction;
import org.baeldung.spring_batch_intro.service.CustomItemProcessor;
import org.baeldung.spring_batch_intro.service.RecordFieldSetMapper;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
@ -26,6 +23,9 @@ import org.springframework.core.io.Resource;
import org.springframework.oxm.Marshaller; import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller; import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import java.net.MalformedURLException;
import java.text.ParseException;
public class SpringBatchConfig { public class SpringBatchConfig {
@Autowired @Autowired
private JobBuilderFactory jobs; private JobBuilderFactory jobs;
@ -43,7 +43,7 @@ public class SpringBatchConfig {
public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException { public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>(); FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" }; String[] tokens = {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens); tokenizer.setNames(tokens);
reader.setResource(inputCsv); reader.setResource(inputCsv);
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>(); DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
@ -71,13 +71,13 @@ public class SpringBatchConfig {
@Bean @Bean
public Marshaller marshaller() { public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(new Class[] { Transaction.class }); marshaller.setClassesToBeBound(Transaction.class);
return marshaller; return marshaller;
} }
@Bean @Bean
protected Step step1(ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) { protected Step step1(ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) {
return steps.get("step1").<Transaction, Transaction> chunk(10).reader(reader).processor(processor).writer(writer).build(); return steps.get("step1").<Transaction, Transaction>chunk(10).reader(reader).processor(processor).writer(writer).build();
} }
@Bean(name = "firstBatchJob") @Bean(name = "firstBatchJob")

View File

@ -1,4 +1,4 @@
package org.baeldung.spring_batch_intro; package org.baeldung.batch;
import java.net.MalformedURLException; import java.net.MalformedURLException;

View File

@ -1,4 +1,4 @@
package org.baeldung.spring_batch_intro.model; package org.baeldung.batch.model;
import java.util.Date; import java.util.Date;

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.baeldung.spring_batch_intro.partitioner; package org.baeldung.batch.partitioner;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;

View File

@ -1,13 +1,7 @@
package org.baeldung.spring_batch_intro.partitioner; package org.baeldung.batch.partitioner;
import java.io.IOException; import org.baeldung.batch.model.Transaction;
import java.net.MalformedURLException; import org.baeldung.batch.service.RecordFieldSetMapper;
import java.text.ParseException;
import javax.sql.DataSource;
import org.baeldung.spring_batch_intro.model.Transaction;
import org.baeldung.spring_batch_intro.service.RecordFieldSetMapper;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
@ -33,7 +27,6 @@ import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.oxm.Marshaller; import org.springframework.oxm.Marshaller;
@ -41,6 +34,11 @@ import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.io.IOException;
import java.net.MalformedURLException;
import java.text.ParseException;
@Configuration @Configuration
@EnableBatchProcessing @EnableBatchProcessing
public class SpringbatchPartitionConfig { public class SpringbatchPartitionConfig {
@ -57,26 +55,26 @@ public class SpringbatchPartitionConfig {
@Bean(name = "partitionerJob") @Bean(name = "partitionerJob")
public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitionerJob") return jobs.get("partitionerJob")
.start(partitionStep()) .start(partitionStep())
.build(); .build();
} }
@Bean @Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep") return steps.get("partitionStep")
.partitioner("slaveStep", partitioner()) .partitioner("slaveStep", partitioner())
.step(slaveStep()) .step(slaveStep())
.taskExecutor(taskExecutor()) .taskExecutor(taskExecutor())
.build(); .build();
} }
@Bean @Bean
public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep") return steps.get("slaveStep")
.<Transaction, Transaction> chunk(1) .<Transaction, Transaction>chunk(1)
.reader(itemReader(null)) .reader(itemReader(null))
.writer(itemWriter(marshaller(), null)) .writer(itemWriter(marshaller(), null))
.build(); .build();
} }
@Bean @Bean
@ -95,12 +93,12 @@ public class SpringbatchPartitionConfig {
@Bean @Bean
@StepScope @StepScope
public FlatFileItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { public FlatFileItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>(); FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" }; String[] tokens = {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens); tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/partitioner/" + filename)); reader.setResource(new ClassPathResource("input/partitioner/" + filename));
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>(); DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer); lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1); reader.setLinesToSkip(1);
@ -121,7 +119,7 @@ public class SpringbatchPartitionConfig {
@Bean @Bean
public Marshaller marshaller() { public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(new Class[] { Transaction.class }); marshaller.setClassesToBeBound(Transaction.class);
return marshaller; return marshaller;
} }
@ -142,16 +140,15 @@ public class SpringbatchPartitionConfig {
// JobRepositoryFactoryBean's methods Throws Generic Exception, // JobRepositoryFactoryBean's methods Throws Generic Exception,
// it would have been better to have a specific one // it would have been better to have a specific one
factory.afterPropertiesSet(); factory.afterPropertiesSet();
return (JobRepository) factory.getObject(); return factory.getObject();
} }
private DataSource dataSource() { private DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
EmbeddedDatabase db = builder.setType(EmbeddedDatabaseType.HSQL) return builder.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql") .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql") .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.build(); .build();
return db;
} }
private PlatformTransactionManager getTransactionManager() { private PlatformTransactionManager getTransactionManager() {

View File

@ -1,4 +1,4 @@
package org.baeldung.spring_batch_intro.partitioner; package org.baeldung.batch.partitioner;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecution;

View File

@ -1,6 +1,6 @@
package org.baeldung.spring_batch_intro.service; package org.baeldung.batch.service;
import org.baeldung.spring_batch_intro.model.Transaction; import org.baeldung.batch.model.Transaction;
import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemProcessor;
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> { public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

View File

@ -1,9 +1,9 @@
package org.baeldung.spring_batch_intro.service; package org.baeldung.batch.service;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import org.baeldung.spring_batch_intro.model.Transaction; import org.baeldung.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException; import org.springframework.validation.BindException;

View File

@ -22,14 +22,14 @@
</property> </property>
<property name="fieldSetMapper"> <property name="fieldSetMapper">
<bean <bean
class="org.baeldung.spring_batch_intro.service.RecordFieldSetMapper" /> class="org.baeldung.batch.service.RecordFieldSetMapper" />
</property> </property>
</bean> </bean>
</property> </property>
<property name="linesToSkip" value="1" /> <property name="linesToSkip" value="1" />
</bean> </bean>
<bean id="itemProcessor" class="org.baeldung.spring_batch_intro.service.CustomItemProcessor" /> <bean id="itemProcessor" class="org.baeldung.batch.service.CustomItemProcessor" />
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"> <bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" /> <property name="resource" value="file:xml/output.xml" />
@ -40,7 +40,7 @@
<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound"> <property name="classesToBeBound">
<list> <list>
<value>org.baeldung.spring_batch_intro.model.Transaction</value> <value>org.baeldung.batch.model.Transaction</value>
</list> </list>
</property> </property>
</bean> </bean>