diff --git a/spring-batch/src/main/java/org/baeldung/batch/App.java b/spring-batch/src/main/java/org/baeldung/batch/App.java index 8bf58e65d2..749591aa03 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/App.java +++ b/spring-batch/src/main/java/org/baeldung/batch/App.java @@ -18,9 +18,17 @@ public class App { // Spring xml config // ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch.xml"); + runJob(context, "firstBatchJob"); + runJob(context, "skippingBatchJob"); + runJob(context, "skipPolicyBatchJob"); + } + + private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) { final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); - final Job job = (Job) context.getBean("firstBatchJob"); - System.out.println("Starting the batch job"); + final Job job = (Job) context.getBean(batchJobName); + + System.out.println("----------------------------------------"); + System.out.println("Starting the batch job: " + batchJobName); try { // To enable multiple execution of a job with the same parameters JobParameters jobParameters = new JobParametersBuilder() diff --git a/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java index 7b19935cc8..b318dda154 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java +++ b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java @@ -2,7 +2,11 @@ package org.baeldung.batch; import org.baeldung.batch.model.Transaction; import org.baeldung.batch.service.CustomItemProcessor; +import org.baeldung.batch.service.CustomSkipPolicy; +import org.baeldung.batch.service.MissingUsernameException; +import org.baeldung.batch.service.NegativeAmountException; import org.baeldung.batch.service.RecordFieldSetMapper; +import org.baeldung.batch.service.SkippingItemProcessor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; @@ -23,30 +27,31 @@ import org.springframework.core.io.Resource; import org.springframework.oxm.Marshaller; import org.springframework.oxm.jaxb.Jaxb2Marshaller; -import java.net.MalformedURLException; import java.text.ParseException; public class SpringBatchConfig { @Autowired - private JobBuilderFactory jobs; + private JobBuilderFactory jobBuilderFactory; @Autowired - private StepBuilderFactory steps; + private StepBuilderFactory stepBuilderFactory; @Value("input/record.csv") private Resource inputCsv; + @Value("input/recordWithInvalidData.csv") + private Resource invalidInputCsv; + @Value("file:xml/output.xml") private Resource outputXml; - @Bean - public ItemReader itemReader() throws UnexpectedInputException, ParseException { - FlatFileItemReader reader = new FlatFileItemReader(); + public ItemReader itemReader(Resource inputData) throws UnexpectedInputException, ParseException { + FlatFileItemReader reader = new FlatFileItemReader<>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); - reader.setResource(inputCsv); - DefaultLineMapper lineMapper = new DefaultLineMapper(); + reader.setResource(inputData); + DefaultLineMapper lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); @@ -60,8 +65,13 @@ public class SpringBatchConfig { } @Bean - public ItemWriter itemWriter(Marshaller marshaller) throws MalformedURLException { - StaxEventItemWriter itemWriter = new StaxEventItemWriter(); + public ItemProcessor skippingItemProcessor() { + return new SkippingItemProcessor(); + } + + @Bean + public ItemWriter itemWriter(Marshaller marshaller) { + StaxEventItemWriter itemWriter = new StaxEventItemWriter<>(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(outputXml); @@ -76,13 +86,60 @@ public class SpringBatchConfig { } @Bean - protected Step step1(ItemReader reader, ItemProcessor processor, ItemWriter writer) { - return steps.get("step1").chunk(10).reader(reader).processor(processor).writer(writer).build(); + protected Step step1(@Qualifier("itemProcessor") ItemProcessor processor, + ItemWriter writer) throws ParseException { + return stepBuilderFactory.get("step1").chunk(10).reader(itemReader(inputCsv)).processor(processor).writer(writer).build(); } @Bean(name = "firstBatchJob") public Job job(@Qualifier("step1") Step step1) { - return jobs.get("firstBatchJob").start(step1).build(); + return jobBuilderFactory.get("firstBatchJob").start(step1).build(); + } + + @Bean + public Step skippingStep(@Qualifier("skippingItemProcessor") ItemProcessor processor, + ItemWriter writer) throws ParseException { + return stepBuilderFactory + .get("skippingStep") + .chunk(10) + .reader(itemReader(invalidInputCsv)) + .processor(processor) + .writer(writer) + .faultTolerant() + .skipLimit(2) + .skip(MissingUsernameException.class) + .skip(NegativeAmountException.class) + .build(); + } + + @Bean(name = "skippingBatchJob") + public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) { + return jobBuilderFactory + .get("skippingBatchJob") + .start(skippingStep) + .build(); + } + + @Bean + public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor processor, + ItemWriter writer) throws ParseException { + return stepBuilderFactory + .get("skipPolicyStep") + .chunk(10) + .reader(itemReader(invalidInputCsv)) + .processor(processor) + .writer(writer) + .faultTolerant() + .skipPolicy(new CustomSkipPolicy()) + .build(); + } + + @Bean(name = "skipPolicyBatchJob") + public Job skipPolicyBatchJob(@Qualifier("skipPolicyStep") Step skipPolicyStep) { + return jobBuilderFactory + .get("skipPolicyBatchJob") + .start(skipPolicyStep) + .build(); } } diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/CustomSkipPolicy.java b/spring-batch/src/main/java/org/baeldung/batch/service/CustomSkipPolicy.java new file mode 100644 index 0000000000..a156a65b6e --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/CustomSkipPolicy.java @@ -0,0 +1,29 @@ +package org.baeldung.batch.service; + +import org.springframework.batch.core.step.skip.SkipLimitExceededException; +import org.springframework.batch.core.step.skip.SkipPolicy; + +public class CustomSkipPolicy implements SkipPolicy { + + private static final int MAX_SKIP_COUNT = 2; + private static final int INVALID_TX_AMOUNT_LIMIT = -1000; + + @Override + public boolean shouldSkip(Throwable throwable, int skipCount) throws SkipLimitExceededException { + + if (throwable instanceof MissingUsernameException && skipCount < MAX_SKIP_COUNT) { + return true; + } + + if (throwable instanceof NegativeAmountException && skipCount < MAX_SKIP_COUNT ) { + NegativeAmountException ex = (NegativeAmountException) throwable; + if(ex.getAmount() < INVALID_TX_AMOUNT_LIMIT){ + return false; + } else{ + return true; + } + } + + return false; + } +} diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/MissingUsernameException.java b/spring-batch/src/main/java/org/baeldung/batch/service/MissingUsernameException.java new file mode 100644 index 0000000000..2cf8f4d334 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/MissingUsernameException.java @@ -0,0 +1,4 @@ +package org.baeldung.batch.service; + +public class MissingUsernameException extends RuntimeException { +} diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/NegativeAmountException.java b/spring-batch/src/main/java/org/baeldung/batch/service/NegativeAmountException.java new file mode 100644 index 0000000000..c9c05be671 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/NegativeAmountException.java @@ -0,0 +1,14 @@ +package org.baeldung.batch.service; + +public class NegativeAmountException extends RuntimeException { + + private double amount; + + public NegativeAmountException(double amount){ + this.amount = amount; + } + + public double getAmount() { + return amount; + } +} diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/SkippingItemProcessor.java b/spring-batch/src/main/java/org/baeldung/batch/service/SkippingItemProcessor.java new file mode 100644 index 0000000000..307a8213e2 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/SkippingItemProcessor.java @@ -0,0 +1,24 @@ +package org.baeldung.batch.service; + +import org.baeldung.batch.model.Transaction; +import org.springframework.batch.item.ItemProcessor; + +public class SkippingItemProcessor implements ItemProcessor { + + @Override + public Transaction process(Transaction transaction) { + + System.out.println("SkippingItemProcessor: " + transaction); + + if (transaction.getUsername() == null || transaction.getUsername().isEmpty()) { + throw new MissingUsernameException(); + } + + double txAmount = transaction.getAmount(); + if (txAmount < 0) { + throw new NegativeAmountException(txAmount); + } + + return transaction; + } +} diff --git a/spring-batch/src/main/resources/input/recordWithInvalidData.csv b/spring-batch/src/main/resources/input/recordWithInvalidData.csv new file mode 100644 index 0000000000..020edb9826 --- /dev/null +++ b/spring-batch/src/main/resources/input/recordWithInvalidData.csv @@ -0,0 +1,7 @@ +username, user_id, transaction_date, transaction_amount +devendra, 1234, 31/10/2015, 10000 +john, 2134, 3/12/2015, 12321 +robin, 2134, 2/02/2015, 23411 +, 2536, 3/10/2019, 100 +mike, 9876, 5/11/2018, -500 +, 3425, 10/10/2017, 100 \ No newline at end of file