diff --git a/spring-batch/src/main/java/org/baeldung/batch/App.java b/spring-batch/src/main/java/org/baeldung/batch/App.java index 91b99ba571..634f0a9014 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/App.java +++ b/spring-batch/src/main/java/org/baeldung/batch/App.java @@ -26,6 +26,8 @@ public class App { runJob(context, "firstBatchJob"); runJob(context, "skippingBatchJob"); runJob(context, "skipPolicyBatchJob"); + runJob(context, "retryBatchJob"); + } private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) { diff --git a/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java index 07dd65bcfd..90eb038052 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java +++ b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchConfig.java @@ -1,14 +1,17 @@ package org.baeldung.batch; +import org.apache.http.conn.ConnectTimeoutException; import org.baeldung.batch.model.Transaction; import org.baeldung.batch.service.CustomItemProcessor; import org.baeldung.batch.service.CustomSkipPolicy; import org.baeldung.batch.service.MissingUsernameException; import org.baeldung.batch.service.NegativeAmountException; import org.baeldung.batch.service.RecordFieldSetMapper; +import org.baeldung.batch.service.RetryItemProcessor; import org.baeldung.batch.service.SkippingItemProcessor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; @@ -23,12 +26,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.core.io.Resource; +import org.springframework.dao.DeadlockLoserDataAccessException; import org.springframework.oxm.Marshaller; import org.springframework.oxm.jaxb.Jaxb2Marshaller; import java.text.ParseException; - +@Configuration +@EnableBatchProcessing public class SpringBatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @@ -69,6 +75,11 @@ public class SpringBatchConfig { return new SkippingItemProcessor(); } + @Bean + public ItemProcessor retryItemProcessor() { + return new RetryItemProcessor(); + } + @Bean public ItemWriter itemWriter(Marshaller marshaller) { StaxEventItemWriter itemWriter = new StaxEventItemWriter<>(); @@ -117,6 +128,22 @@ public class SpringBatchConfig { .build(); } + @Bean + public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor processor, + ItemWriter writer) throws ParseException { + return stepBuilderFactory + .get("retryStep") + .chunk(10) + .reader(itemReader(inputCsv)) + .processor(processor) + .writer(writer) + .faultTolerant() + .retryLimit(3) + .retry(ConnectTimeoutException.class) + .retry(DeadlockLoserDataAccessException.class) + .build(); + } + @Bean(name = "skippingBatchJob") public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) { return jobBuilderFactory @@ -125,6 +152,14 @@ public class SpringBatchConfig { .build(); } + @Bean(name = "retryBatchJob") + public Job retryJob(@Qualifier("retryStep") Step retryStep) { + return jobBuilderFactory + .get("retryBatchJob") + .start(retryStep) + .build(); + } + @Bean public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor processor, ItemWriter writer) throws ParseException { diff --git a/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java b/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java index 0ce3a413ab..f34462eadd 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java +++ b/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java @@ -9,6 +9,8 @@ import javax.xml.bind.annotation.XmlRootElement; public class Transaction { private String username; private int userId; + private int age; + private String postCode; private Date transactionDate; private double amount; @@ -46,9 +48,25 @@ public class Transaction { this.amount = amount; } + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public String getPostCode() { + return postCode; + } + + public void setPostCode(String postCode) { + this.postCode = postCode; + } + @Override public String toString() { - return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; + return "Transaction [username=" + username + ", userId=" + userId + ", age=" + age + ", postCode=" + postCode + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; } } diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java b/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java new file mode 100644 index 0000000000..8eb279caf0 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java @@ -0,0 +1,34 @@ +package org.baeldung.batch.service; + +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.baeldung.batch.model.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemProcessor; + +import java.io.IOException; + +public class RetryItemProcessor implements ItemProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class); + + @Override + public Transaction process(Transaction transaction) throws IOException { + LOGGER.info("Attempting to process user with id={}", transaction.getUserId()); + HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); + //parse user's age and postCode from response and update transaction + return transaction; + } + + private HttpResponse fetchMoreUserDetails(int id) throws IOException { + final RequestConfig config = RequestConfig.custom().setConnectTimeout(2 * 1000).build(); + final CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build(); + + final HttpGet request = new HttpGet("http://www.baeldung.com:81/user/" + id); + return client.execute(request); + } +} diff --git a/spring-batch/src/main/resources/spring-batch-intro.xml b/spring-batch/src/main/resources/spring-batch-intro.xml index 0f76dd50ff..908b8aa2e1 100644 --- a/spring-batch/src/main/resources/spring-batch-intro.xml +++ b/spring-batch/src/main/resources/spring-batch-intro.xml @@ -54,4 +54,19 @@ + + + + + + + + + + + + +