BAEL-3093: Configuring Skip Logic in Spring Batch (#7946)
This commit is contained in:
parent
1d0596f23c
commit
c2b029fd85
@ -18,9 +18,17 @@ public class App {
|
|||||||
// Spring xml config
|
// Spring xml config
|
||||||
// ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch.xml");
|
// ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch.xml");
|
||||||
|
|
||||||
|
runJob(context, "firstBatchJob");
|
||||||
|
runJob(context, "skippingBatchJob");
|
||||||
|
runJob(context, "skipPolicyBatchJob");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) {
|
||||||
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
|
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
|
||||||
final Job job = (Job) context.getBean("firstBatchJob");
|
final Job job = (Job) context.getBean(batchJobName);
|
||||||
System.out.println("Starting the batch job");
|
|
||||||
|
System.out.println("----------------------------------------");
|
||||||
|
System.out.println("Starting the batch job: " + batchJobName);
|
||||||
try {
|
try {
|
||||||
// To enable multiple execution of a job with the same parameters
|
// To enable multiple execution of a job with the same parameters
|
||||||
JobParameters jobParameters = new JobParametersBuilder()
|
JobParameters jobParameters = new JobParametersBuilder()
|
||||||
|
@ -2,7 +2,11 @@ package org.baeldung.batch;
|
|||||||
|
|
||||||
import org.baeldung.batch.model.Transaction;
|
import org.baeldung.batch.model.Transaction;
|
||||||
import org.baeldung.batch.service.CustomItemProcessor;
|
import org.baeldung.batch.service.CustomItemProcessor;
|
||||||
|
import org.baeldung.batch.service.CustomSkipPolicy;
|
||||||
|
import org.baeldung.batch.service.MissingUsernameException;
|
||||||
|
import org.baeldung.batch.service.NegativeAmountException;
|
||||||
import org.baeldung.batch.service.RecordFieldSetMapper;
|
import org.baeldung.batch.service.RecordFieldSetMapper;
|
||||||
|
import org.baeldung.batch.service.SkippingItemProcessor;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.Step;
|
import org.springframework.batch.core.Step;
|
||||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||||
@ -23,30 +27,31 @@ import org.springframework.core.io.Resource;
|
|||||||
import org.springframework.oxm.Marshaller;
|
import org.springframework.oxm.Marshaller;
|
||||||
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
|
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
|
||||||
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
|
|
||||||
public class SpringBatchConfig {
|
public class SpringBatchConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
private JobBuilderFactory jobs;
|
private JobBuilderFactory jobBuilderFactory;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private StepBuilderFactory steps;
|
private StepBuilderFactory stepBuilderFactory;
|
||||||
|
|
||||||
@Value("input/record.csv")
|
@Value("input/record.csv")
|
||||||
private Resource inputCsv;
|
private Resource inputCsv;
|
||||||
|
|
||||||
|
@Value("input/recordWithInvalidData.csv")
|
||||||
|
private Resource invalidInputCsv;
|
||||||
|
|
||||||
@Value("file:xml/output.xml")
|
@Value("file:xml/output.xml")
|
||||||
private Resource outputXml;
|
private Resource outputXml;
|
||||||
|
|
||||||
@Bean
|
public ItemReader<Transaction> itemReader(Resource inputData) throws UnexpectedInputException, ParseException {
|
||||||
public ItemReader<Transaction> itemReader() throws UnexpectedInputException, ParseException {
|
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
|
||||||
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
|
|
||||||
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
|
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
|
||||||
String[] tokens = {"username", "userid", "transactiondate", "amount"};
|
String[] tokens = {"username", "userid", "transactiondate", "amount"};
|
||||||
tokenizer.setNames(tokens);
|
tokenizer.setNames(tokens);
|
||||||
reader.setResource(inputCsv);
|
reader.setResource(inputData);
|
||||||
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
|
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
|
||||||
lineMapper.setLineTokenizer(tokenizer);
|
lineMapper.setLineTokenizer(tokenizer);
|
||||||
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
|
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
|
||||||
reader.setLinesToSkip(1);
|
reader.setLinesToSkip(1);
|
||||||
@ -60,8 +65,13 @@ public class SpringBatchConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) throws MalformedURLException {
|
public ItemProcessor<Transaction, Transaction> skippingItemProcessor() {
|
||||||
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<Transaction>();
|
return new SkippingItemProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
|
||||||
|
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
|
||||||
itemWriter.setMarshaller(marshaller);
|
itemWriter.setMarshaller(marshaller);
|
||||||
itemWriter.setRootTagName("transactionRecord");
|
itemWriter.setRootTagName("transactionRecord");
|
||||||
itemWriter.setResource(outputXml);
|
itemWriter.setResource(outputXml);
|
||||||
@ -76,13 +86,60 @@ public class SpringBatchConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
protected Step step1(ItemReader<Transaction> reader, ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) {
|
protected Step step1(@Qualifier("itemProcessor") ItemProcessor<Transaction, Transaction> processor,
|
||||||
return steps.get("step1").<Transaction, Transaction>chunk(10).reader(reader).processor(processor).writer(writer).build();
|
ItemWriter<Transaction> writer) throws ParseException {
|
||||||
|
return stepBuilderFactory.get("step1").<Transaction, Transaction>chunk(10).reader(itemReader(inputCsv)).processor(processor).writer(writer).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean(name = "firstBatchJob")
|
@Bean(name = "firstBatchJob")
|
||||||
public Job job(@Qualifier("step1") Step step1) {
|
public Job job(@Qualifier("step1") Step step1) {
|
||||||
return jobs.get("firstBatchJob").start(step1).build();
|
return jobBuilderFactory.get("firstBatchJob").start(step1).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Step skippingStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor,
|
||||||
|
ItemWriter<Transaction> writer) throws ParseException {
|
||||||
|
return stepBuilderFactory
|
||||||
|
.get("skippingStep")
|
||||||
|
.<Transaction, Transaction>chunk(10)
|
||||||
|
.reader(itemReader(invalidInputCsv))
|
||||||
|
.processor(processor)
|
||||||
|
.writer(writer)
|
||||||
|
.faultTolerant()
|
||||||
|
.skipLimit(2)
|
||||||
|
.skip(MissingUsernameException.class)
|
||||||
|
.skip(NegativeAmountException.class)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "skippingBatchJob")
|
||||||
|
public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) {
|
||||||
|
return jobBuilderFactory
|
||||||
|
.get("skippingBatchJob")
|
||||||
|
.start(skippingStep)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor,
|
||||||
|
ItemWriter<Transaction> writer) throws ParseException {
|
||||||
|
return stepBuilderFactory
|
||||||
|
.get("skipPolicyStep")
|
||||||
|
.<Transaction, Transaction>chunk(10)
|
||||||
|
.reader(itemReader(invalidInputCsv))
|
||||||
|
.processor(processor)
|
||||||
|
.writer(writer)
|
||||||
|
.faultTolerant()
|
||||||
|
.skipPolicy(new CustomSkipPolicy())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "skipPolicyBatchJob")
|
||||||
|
public Job skipPolicyBatchJob(@Qualifier("skipPolicyStep") Step skipPolicyStep) {
|
||||||
|
return jobBuilderFactory
|
||||||
|
.get("skipPolicyBatchJob")
|
||||||
|
.start(skipPolicyStep)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,29 @@
|
|||||||
|
package org.baeldung.batch.service;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
|
||||||
|
import org.springframework.batch.core.step.skip.SkipPolicy;
|
||||||
|
|
||||||
|
public class CustomSkipPolicy implements SkipPolicy {
|
||||||
|
|
||||||
|
private static final int MAX_SKIP_COUNT = 2;
|
||||||
|
private static final int INVALID_TX_AMOUNT_LIMIT = -1000;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldSkip(Throwable throwable, int skipCount) throws SkipLimitExceededException {
|
||||||
|
|
||||||
|
if (throwable instanceof MissingUsernameException && skipCount < MAX_SKIP_COUNT) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (throwable instanceof NegativeAmountException && skipCount < MAX_SKIP_COUNT ) {
|
||||||
|
NegativeAmountException ex = (NegativeAmountException) throwable;
|
||||||
|
if(ex.getAmount() < INVALID_TX_AMOUNT_LIMIT){
|
||||||
|
return false;
|
||||||
|
} else{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,4 @@
|
|||||||
|
package org.baeldung.batch.service;
|
||||||
|
|
||||||
|
public class MissingUsernameException extends RuntimeException {
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
package org.baeldung.batch.service;
|
||||||
|
|
||||||
|
public class NegativeAmountException extends RuntimeException {
|
||||||
|
|
||||||
|
private double amount;
|
||||||
|
|
||||||
|
public NegativeAmountException(double amount){
|
||||||
|
this.amount = amount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getAmount() {
|
||||||
|
return amount;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
package org.baeldung.batch.service;
|
||||||
|
|
||||||
|
import org.baeldung.batch.model.Transaction;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
|
||||||
|
public class SkippingItemProcessor implements ItemProcessor<Transaction, Transaction> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction process(Transaction transaction) {
|
||||||
|
|
||||||
|
System.out.println("SkippingItemProcessor: " + transaction);
|
||||||
|
|
||||||
|
if (transaction.getUsername() == null || transaction.getUsername().isEmpty()) {
|
||||||
|
throw new MissingUsernameException();
|
||||||
|
}
|
||||||
|
|
||||||
|
double txAmount = transaction.getAmount();
|
||||||
|
if (txAmount < 0) {
|
||||||
|
throw new NegativeAmountException(txAmount);
|
||||||
|
}
|
||||||
|
|
||||||
|
return transaction;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,7 @@
|
|||||||
|
username, user_id, transaction_date, transaction_amount
|
||||||
|
devendra, 1234, 31/10/2015, 10000
|
||||||
|
john, 2134, 3/12/2015, 12321
|
||||||
|
robin, 2134, 2/02/2015, 23411
|
||||||
|
, 2536, 3/10/2019, 100
|
||||||
|
mike, 9876, 5/11/2018, -500
|
||||||
|
, 3425, 10/10/2017, 100
|
|
Loading…
x
Reference in New Issue
Block a user