diff --git a/spring-batch/src/main/java/org/baeldung/batch/App.java b/spring-batch/src/main/java/org/baeldung/batch/App.java index 91b99ba571..764ef72a35 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/App.java +++ b/spring-batch/src/main/java/org/baeldung/batch/App.java @@ -18,6 +18,8 @@ public class App { final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(SpringConfig.class); context.register(SpringBatchConfig.class); + context.register(SpringBatchRetryConfig.class); + context.refresh(); // Spring xml config @@ -26,6 +28,8 @@ public class App { runJob(context, "firstBatchJob"); runJob(context, "skippingBatchJob"); runJob(context, "skipPolicyBatchJob"); + runJob(context, "retryBatchJob"); + } private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) { diff --git a/spring-batch/src/main/java/org/baeldung/batch/SpringBatchRetryConfig.java b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchRetryConfig.java new file mode 100644 index 0000000000..56088f194b --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/SpringBatchRetryConfig.java @@ -0,0 +1,117 @@ +package org.baeldung.batch; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.baeldung.batch.model.Transaction; +import org.baeldung.batch.service.RecordFieldSetMapper; +import org.baeldung.batch.service.RetryItemProcessor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.mapping.DefaultLineMapper; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.batch.item.xml.StaxEventItemWriter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.dao.DeadlockLoserDataAccessException; +import org.springframework.oxm.Marshaller; +import org.springframework.oxm.jaxb.Jaxb2Marshaller; + +import java.text.ParseException; + +@Configuration +@EnableBatchProcessing +public class SpringBatchRetryConfig { + + private static final String[] tokens = { "username", "userid", "transactiondate", "amount" }; + private static final int TWO_SECONDS = 2000; + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Value("input/recordRetry.csv") + private Resource inputCsv; + + @Value("file:xml/retryOutput.xml") + private Resource outputXml; + + public ItemReader itemReader(Resource inputData) throws ParseException { + DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); + tokenizer.setNames(tokens); + DefaultLineMapper lineMapper = new DefaultLineMapper<>(); + lineMapper.setLineTokenizer(tokenizer); + lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); + FlatFileItemReader reader = new FlatFileItemReader<>(); + reader.setResource(inputData); + reader.setLinesToSkip(1); + reader.setLineMapper(lineMapper); + return reader; + } + + @Bean + public CloseableHttpClient closeableHttpClient() { + final RequestConfig config = RequestConfig.custom() + .setConnectTimeout(TWO_SECONDS) + .build(); + return HttpClientBuilder.create().setDefaultRequestConfig(config).build(); + } + + @Bean + public ItemProcessor retryItemProcessor() { + return new RetryItemProcessor(); + } + + @Bean + public ItemWriter itemWriter(Marshaller marshaller) { + StaxEventItemWriter itemWriter = new StaxEventItemWriter<>(); + itemWriter.setMarshaller(marshaller); + itemWriter.setRootTagName("transactionRecord"); + itemWriter.setResource(outputXml); + return itemWriter; + } + + @Bean + public Marshaller marshaller() { + Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); + marshaller.setClassesToBeBound(Transaction.class); + return marshaller; + } + + @Bean + public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor processor, + ItemWriter writer) throws ParseException { + return stepBuilderFactory.get("retryStep") + .chunk(10) + .reader(itemReader(inputCsv)) + .processor(processor) + .writer(writer) + .faultTolerant() + .retryLimit(3) + .retry(ConnectTimeoutException.class) + .retry(DeadlockLoserDataAccessException.class) + .build(); + } + + @Bean(name = "retryBatchJob") + public Job retryJob(@Qualifier("retryStep") Step retryStep) { + return jobBuilderFactory + .get("retryBatchJob") + .start(retryStep) + .build(); + } +} diff --git a/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java b/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java index 0ce3a413ab..f34462eadd 100644 --- a/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java +++ b/spring-batch/src/main/java/org/baeldung/batch/model/Transaction.java @@ -9,6 +9,8 @@ import javax.xml.bind.annotation.XmlRootElement; public class Transaction { private String username; private int userId; + private int age; + private String postCode; private Date transactionDate; private double amount; @@ -46,9 +48,25 @@ public class Transaction { this.amount = amount; } + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public String getPostCode() { + return postCode; + } + + public void setPostCode(String postCode) { + this.postCode = postCode; + } + @Override public String toString() { - return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; + return "Transaction [username=" + username + ", userId=" + userId + ", age=" + age + ", postCode=" + postCode + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; } } diff --git a/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java b/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java new file mode 100644 index 0000000000..d4e82452a7 --- /dev/null +++ b/spring-batch/src/main/java/org/baeldung/batch/service/RetryItemProcessor.java @@ -0,0 +1,42 @@ +package org.baeldung.batch.service; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.baeldung.batch.model.Transaction; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.IOException; + +public class RetryItemProcessor implements ItemProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class); + + @Autowired + private CloseableHttpClient closeableHttpClient; + + @Override + public Transaction process(Transaction transaction) throws IOException, JSONException { + LOGGER.info("Attempting to process user with id={}", transaction.getUserId()); + HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); + + //parse user's age and postCode from response and update transaction + String result = EntityUtils.toString(response.getEntity()); + JSONObject userObject = new JSONObject(result); + transaction.setAge(Integer.parseInt(userObject.getString("age"))); + transaction.setPostCode(userObject.getString("postCode")); + + return transaction; + } + + private HttpResponse fetchMoreUserDetails(int id) throws IOException { + final HttpGet request = new HttpGet("http://www.baeldung.com:81/user/" + id); + return closeableHttpClient.execute(request); + } +} diff --git a/spring-batch/src/main/resources/input/recordRetry.csv b/spring-batch/src/main/resources/input/recordRetry.csv new file mode 100644 index 0000000000..1b1e3e1ac9 --- /dev/null +++ b/spring-batch/src/main/resources/input/recordRetry.csv @@ -0,0 +1,3 @@ +username, user_id, transaction_date, transaction_amount +sammy, 1234, 31/10/2015, 10000 +john, 9999, 3/12/2015, 12321 diff --git a/spring-batch/src/main/resources/spring-batch-intro.xml b/spring-batch/src/main/resources/spring-batch-intro.xml index 0f76dd50ff..908b8aa2e1 100644 --- a/spring-batch/src/main/resources/spring-batch-intro.xml +++ b/spring-batch/src/main/resources/spring-batch-intro.xml @@ -54,4 +54,19 @@ + + + + + + + + + + + + + diff --git a/spring-batch/src/test/java/org/baeldung/batch/SpringBatchRetryIntegrationTest.java b/spring-batch/src/test/java/org/baeldung/batch/SpringBatchRetryIntegrationTest.java new file mode 100644 index 0000000000..293c97ac5d --- /dev/null +++ b/spring-batch/src/test/java/org/baeldung/batch/SpringBatchRetryIntegrationTest.java @@ -0,0 +1,90 @@ +package org.baeldung.batch; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.test.AssertFile; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.core.io.FileSystemResource; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(SpringRunner.class) +@SpringBatchTest +@EnableAutoConfiguration +@ContextConfiguration(classes = { SpringBatchRetryConfig.class }) +public class SpringBatchRetryIntegrationTest { + + private static final String TEST_OUTPUT = "xml/retryOutput.xml"; + private static final String EXPECTED_OUTPUT = "src/test/resources/output/batchRetry/retryOutput.xml"; + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @MockBean + private CloseableHttpClient closeableHttpClient; + + @Mock + private CloseableHttpResponse httpResponse; + + @Test + public void whenEndpointAlwaysFail_thenJobFails() throws Exception { + when(closeableHttpClient.execute(any())) + .thenThrow(new ConnectTimeoutException("Endpoint is down")); + + JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters()); + JobInstance actualJobInstance = jobExecution.getJobInstance(); + ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); + + assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); + assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); + assertThat(actualJobExitStatus.getExitDescription(), containsString("org.apache.http.conn.ConnectTimeoutException")); + } + + @Test + public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { + FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); + FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); + + //fails for first two calls and passes third time onwards + when(httpResponse.getEntity()) + .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); + when(closeableHttpClient.execute(any())) + .thenThrow(new ConnectTimeoutException("Timeout count 1")) + .thenThrow(new ConnectTimeoutException("Timeout count 2")) + .thenReturn(httpResponse); + + JobExecution jobExecution = jobLauncherTestUtils.launchJob(defaultJobParameters()); + JobInstance actualJobInstance = jobExecution.getJobInstance(); + ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); + + assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); + assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); + AssertFile.assertFileEquals(expectedResult, actualResult); + } + + private JobParameters defaultJobParameters() { + JobParametersBuilder paramsBuilder = new JobParametersBuilder(); + paramsBuilder.addString("jobID", String.valueOf(System.currentTimeMillis())); + return paramsBuilder.toJobParameters(); + } +} diff --git a/spring-batch/src/test/resources/output/batchRetry/retryOutput.xml b/spring-batch/src/test/resources/output/batchRetry/retryOutput.xml new file mode 100644 index 0000000000..0de35670f4 --- /dev/null +++ b/spring-batch/src/test/resources/output/batchRetry/retryOutput.xml @@ -0,0 +1 @@ +1010000.04302222015-10-31T00:00:00+05:301234sammy1012321.04302222015-12-03T00:00:00+05:309999john diff --git a/spring-batch/xml/retryOutput.xml b/spring-batch/xml/retryOutput.xml new file mode 100644 index 0000000000..0de35670f4 --- /dev/null +++ b/spring-batch/xml/retryOutput.xml @@ -0,0 +1 @@ +1010000.04302222015-10-31T00:00:00+05:301234sammy1012321.04302222015-12-03T00:00:00+05:309999john