BAEL-3298 Spring batch retry job
This commit is contained in:
		
							parent
							
								
									0a369c4a16
								
							
						
					
					
						commit
						49f97ae586
					
				| @ -26,6 +26,8 @@ public class App { | |||||||
|         runJob(context, "firstBatchJob"); |         runJob(context, "firstBatchJob"); | ||||||
|         runJob(context, "skippingBatchJob"); |         runJob(context, "skippingBatchJob"); | ||||||
|         runJob(context, "skipPolicyBatchJob"); |         runJob(context, "skipPolicyBatchJob"); | ||||||
|  |         runJob(context, "retryBatchJob"); | ||||||
|  | 
 | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) { |     private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) { | ||||||
|  | |||||||
| @ -1,14 +1,17 @@ | |||||||
| package org.baeldung.batch; | package org.baeldung.batch; | ||||||
| 
 | 
 | ||||||
|  | import org.apache.http.conn.ConnectTimeoutException; | ||||||
| import org.baeldung.batch.model.Transaction; | import org.baeldung.batch.model.Transaction; | ||||||
| import org.baeldung.batch.service.CustomItemProcessor; | import org.baeldung.batch.service.CustomItemProcessor; | ||||||
| import org.baeldung.batch.service.CustomSkipPolicy; | import org.baeldung.batch.service.CustomSkipPolicy; | ||||||
| import org.baeldung.batch.service.MissingUsernameException; | import org.baeldung.batch.service.MissingUsernameException; | ||||||
| import org.baeldung.batch.service.NegativeAmountException; | import org.baeldung.batch.service.NegativeAmountException; | ||||||
| import org.baeldung.batch.service.RecordFieldSetMapper; | import org.baeldung.batch.service.RecordFieldSetMapper; | ||||||
|  | import org.baeldung.batch.service.RetryItemProcessor; | ||||||
| import org.baeldung.batch.service.SkippingItemProcessor; | import org.baeldung.batch.service.SkippingItemProcessor; | ||||||
| import org.springframework.batch.core.Job; | import org.springframework.batch.core.Job; | ||||||
| import org.springframework.batch.core.Step; | import org.springframework.batch.core.Step; | ||||||
|  | import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | ||||||
| import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | ||||||
| import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | ||||||
| import org.springframework.batch.item.ItemProcessor; | import org.springframework.batch.item.ItemProcessor; | ||||||
| @ -23,12 +26,15 @@ import org.springframework.beans.factory.annotation.Autowired; | |||||||
| import org.springframework.beans.factory.annotation.Qualifier; | import org.springframework.beans.factory.annotation.Qualifier; | ||||||
| import org.springframework.beans.factory.annotation.Value; | import org.springframework.beans.factory.annotation.Value; | ||||||
| import org.springframework.context.annotation.Bean; | import org.springframework.context.annotation.Bean; | ||||||
|  | import org.springframework.context.annotation.Configuration; | ||||||
| import org.springframework.core.io.Resource; | import org.springframework.core.io.Resource; | ||||||
|  | import org.springframework.dao.DeadlockLoserDataAccessException; | ||||||
| import org.springframework.oxm.Marshaller; | import org.springframework.oxm.Marshaller; | ||||||
| import org.springframework.oxm.jaxb.Jaxb2Marshaller; | import org.springframework.oxm.jaxb.Jaxb2Marshaller; | ||||||
| 
 | 
 | ||||||
| import java.text.ParseException; | import java.text.ParseException; | ||||||
| 
 | @Configuration | ||||||
|  | @EnableBatchProcessing | ||||||
| public class SpringBatchConfig { | public class SpringBatchConfig { | ||||||
|     @Autowired |     @Autowired | ||||||
|     private JobBuilderFactory jobBuilderFactory; |     private JobBuilderFactory jobBuilderFactory; | ||||||
| @ -69,6 +75,11 @@ public class SpringBatchConfig { | |||||||
|         return new SkippingItemProcessor(); |         return new SkippingItemProcessor(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Bean | ||||||
|  |     public ItemProcessor<Transaction, Transaction> retryItemProcessor() { | ||||||
|  |         return new RetryItemProcessor(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Bean |     @Bean | ||||||
|     public ItemWriter<Transaction> itemWriter(Marshaller marshaller) { |     public ItemWriter<Transaction> itemWriter(Marshaller marshaller) { | ||||||
|         StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>(); |         StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>(); | ||||||
| @ -117,6 +128,22 @@ public class SpringBatchConfig { | |||||||
|                 .build(); |                 .build(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Bean | ||||||
|  |     public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor<Transaction, Transaction> processor, | ||||||
|  |         ItemWriter<Transaction> writer) throws ParseException { | ||||||
|  |         return stepBuilderFactory | ||||||
|  |             .get("retryStep") | ||||||
|  |             .<Transaction, Transaction>chunk(10) | ||||||
|  |             .reader(itemReader(inputCsv)) | ||||||
|  |             .processor(processor) | ||||||
|  |             .writer(writer) | ||||||
|  |             .faultTolerant() | ||||||
|  |             .retryLimit(3) | ||||||
|  |             .retry(ConnectTimeoutException.class) | ||||||
|  |             .retry(DeadlockLoserDataAccessException.class) | ||||||
|  |             .build(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Bean(name = "skippingBatchJob") |     @Bean(name = "skippingBatchJob") | ||||||
|     public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) { |     public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) { | ||||||
|         return jobBuilderFactory |         return jobBuilderFactory | ||||||
| @ -125,6 +152,14 @@ public class SpringBatchConfig { | |||||||
|                 .build(); |                 .build(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Bean(name = "retryBatchJob") | ||||||
|  |     public Job retryJob(@Qualifier("retryStep") Step retryStep) { | ||||||
|  |         return jobBuilderFactory | ||||||
|  |             .get("retryBatchJob") | ||||||
|  |             .start(retryStep) | ||||||
|  |             .build(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Bean |     @Bean | ||||||
|     public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor, |     public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor, | ||||||
|                                ItemWriter<Transaction> writer) throws ParseException { |                                ItemWriter<Transaction> writer) throws ParseException { | ||||||
|  | |||||||
| @ -9,6 +9,8 @@ import javax.xml.bind.annotation.XmlRootElement; | |||||||
| public class Transaction { | public class Transaction { | ||||||
|     private String username; |     private String username; | ||||||
|     private int userId; |     private int userId; | ||||||
|  |     private int age; | ||||||
|  |     private String postCode; | ||||||
|     private Date transactionDate; |     private Date transactionDate; | ||||||
|     private double amount; |     private double amount; | ||||||
| 
 | 
 | ||||||
| @ -46,9 +48,25 @@ public class Transaction { | |||||||
|         this.amount = amount; |         this.amount = amount; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     public int getAge() { | ||||||
|  |         return age; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setAge(int age) { | ||||||
|  |         this.age = age; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public String getPostCode() { | ||||||
|  |         return postCode; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setPostCode(String postCode) { | ||||||
|  |         this.postCode = postCode; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Override |     @Override | ||||||
|     public String toString() { |     public String toString() { | ||||||
|         return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; |         return "Transaction [username=" + username + ", userId=" + userId + ", age=" + age + ", postCode=" + postCode + ", transactionDate=" + transactionDate + ", amount=" + amount + "]"; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| } | } | ||||||
|  | |||||||
| @ -0,0 +1,34 @@ | |||||||
|  | package org.baeldung.batch.service; | ||||||
|  | 
 | ||||||
|  | import org.apache.http.HttpResponse; | ||||||
|  | import org.apache.http.client.config.RequestConfig; | ||||||
|  | import org.apache.http.client.methods.HttpGet; | ||||||
|  | import org.apache.http.impl.client.CloseableHttpClient; | ||||||
|  | import org.apache.http.impl.client.HttpClientBuilder; | ||||||
|  | import org.baeldung.batch.model.Transaction; | ||||||
|  | import org.slf4j.Logger; | ||||||
|  | import org.slf4j.LoggerFactory; | ||||||
|  | import org.springframework.batch.item.ItemProcessor; | ||||||
|  | 
 | ||||||
|  | import java.io.IOException; | ||||||
|  | 
 | ||||||
|  | public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> { | ||||||
|  | 
 | ||||||
|  |     private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class); | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public Transaction process(Transaction transaction) throws IOException { | ||||||
|  |         LOGGER.info("Attempting to process user with id={}", transaction.getUserId()); | ||||||
|  |         HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); | ||||||
|  |         //parse user's age and postCode from response and update transaction | ||||||
|  |         return transaction; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private HttpResponse fetchMoreUserDetails(int id) throws IOException { | ||||||
|  |         final RequestConfig config = RequestConfig.custom().setConnectTimeout(2 * 1000).build(); | ||||||
|  |         final CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build(); | ||||||
|  | 
 | ||||||
|  |         final HttpGet request = new HttpGet("http://www.baeldung.com:81/user/" + id); | ||||||
|  |         return client.execute(request); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -54,4 +54,19 @@ | |||||||
|             </batch:tasklet> |             </batch:tasklet> | ||||||
|         </batch:step> |         </batch:step> | ||||||
|     </batch:job> |     </batch:job> | ||||||
|  | 
 | ||||||
|  |     <batch:job id="retryBatchJob"> | ||||||
|  |         <batch:step id="retryStep"> | ||||||
|  |             <batch:tasklet> | ||||||
|  |                 <batch:chunk reader="itemReader" writer="itemWriter" | ||||||
|  |                              processor="retryItemProcessor" commit-interval="10" | ||||||
|  |                              retry-limit="3"> | ||||||
|  |                     <batch:retryable-exception-classes> | ||||||
|  |                         <batch:include class="org.apache.http.conn.ConnectTimeoutException"/> | ||||||
|  |                         <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> | ||||||
|  |                     </batch:retryable-exception-classes> | ||||||
|  |                 </batch:chunk> | ||||||
|  |             </batch:tasklet> | ||||||
|  |         </batch:step> | ||||||
|  |     </batch:job> | ||||||
| </beans> | </beans> | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user