Add Spring Batch project

This commit is contained in:
YuCheng Hu 2019-10-11 09:15:05 -04:00
commit 0c31a4a882
52 changed files with 2013 additions and 0 deletions

1
spring-batch/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
output.csv

9
spring-batch/README.md Normal file
View File

@ -0,0 +1,9 @@
## Spring Batch
This module contains articles about Spring Batch
### Relevant Articles:
- [Introduction to Spring Batch](https://www.baeldung.com/introduction-to-spring-batch)
- [Spring Batch using Partitioner](https://www.baeldung.com/spring-batch-partitioner)
- [Spring Batch Tasklets vs Chunks](https://www.baeldung.com/spring-batch-tasklet-chunk)
- [How to Trigger and Stop a Scheduled Spring Batch Job](https://www.baeldung.com/spring-batch-start-stop-job)

72
spring-batch/pom.xml Normal file
View File

@ -0,0 +1,72 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>spring-batch</artifactId>
<version>0.1-SNAPSHOT</version>
<name>spring-batch</name>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<!-- SQLite database driver -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>${sqlite.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<spring.version>5.0.3.RELEASE</spring.version>
<spring.batch.version>4.0.0.RELEASE</spring.batch.version>
<sqlite.version>3.15.1</sqlite.version>
<opencsv.version>4.1</opencsv.version>
<awaitility.version>3.1.1</awaitility.version>
</properties>
</project>

Binary file not shown.

View File

@ -0,0 +1,45 @@
package org.baeldung.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class App {
public static void main(final String[] args) {
// Spring Java config
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringConfig.class);
context.register(SpringBatchConfig.class);
context.refresh();
// Spring xml config
// ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch.xml");
runJob(context, "firstBatchJob");
runJob(context, "skippingBatchJob");
runJob(context, "skipPolicyBatchJob");
}
private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) {
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
final Job job = (Job) context.getBean(batchJobName);
System.out.println("----------------------------------------");
System.out.println("Starting the batch job: " + batchJobName);
try {
// To enable multiple execution of a job with the same parameters
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
final JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job succeeded");
} catch (final Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}

View File

@ -0,0 +1,145 @@
package org.baeldung.batch;
import org.baeldung.batch.model.Transaction;
import org.baeldung.batch.service.CustomItemProcessor;
import org.baeldung.batch.service.CustomSkipPolicy;
import org.baeldung.batch.service.MissingUsernameException;
import org.baeldung.batch.service.NegativeAmountException;
import org.baeldung.batch.service.RecordFieldSetMapper;
import org.baeldung.batch.service.SkippingItemProcessor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import java.text.ParseException;
public class SpringBatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("input/record.csv")
private Resource inputCsv;
@Value("input/recordWithInvalidData.csv")
private Resource invalidInputCsv;
@Value("file:xml/output.xml")
private Resource outputXml;
public ItemReader<Transaction> itemReader(Resource inputData) throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(inputData);
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ItemProcessor<Transaction, Transaction> itemProcessor() {
return new CustomItemProcessor();
}
@Bean
public ItemProcessor<Transaction, Transaction> skippingItemProcessor() {
return new SkippingItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Transaction.class);
return marshaller;
}
@Bean
protected Step step1(@Qualifier("itemProcessor") ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory.get("step1").<Transaction, Transaction>chunk(10).reader(itemReader(inputCsv)).processor(processor).writer(writer).build();
}
@Bean(name = "firstBatchJob")
public Job job(@Qualifier("step1") Step step1) {
return jobBuilderFactory.get("firstBatchJob").start(step1).build();
}
@Bean
public Step skippingStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory
.get("skippingStep")
.<Transaction, Transaction>chunk(10)
.reader(itemReader(invalidInputCsv))
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(2)
.skip(MissingUsernameException.class)
.skip(NegativeAmountException.class)
.build();
}
@Bean(name = "skippingBatchJob")
public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) {
return jobBuilderFactory
.get("skippingBatchJob")
.start(skippingStep)
.build();
}
@Bean
public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory
.get("skipPolicyStep")
.<Transaction, Transaction>chunk(10)
.reader(itemReader(invalidInputCsv))
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(new CustomSkipPolicy())
.build();
}
@Bean(name = "skipPolicyBatchJob")
public Job skipPolicyBatchJob(@Qualifier("skipPolicyStep") Step skipPolicyStep) {
return jobBuilderFactory
.get("skipPolicyBatchJob")
.start(skipPolicyStep)
.build();
}
}

View File

@ -0,0 +1,78 @@
package org.baeldung.batch;
import java.net.MalformedURLException;
import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing
public class SpringConfig {
@Value("org/springframework/batch/core/schema-drop-sqlite.sql")
private Resource dropReopsitoryTables;
@Value("org/springframework/batch/core/schema-sqlite.sql")
private Resource dataReopsitorySchema;
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public DataSourceInitializer dataSourceInitializer(DataSource dataSource) throws MalformedURLException {
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
databasePopulator.addScript(dropReopsitoryTables);
databasePopulator.addScript(dataReopsitorySchema);
databasePopulator.setIgnoreFailedDrops(true);
DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(dataSource);
initializer.setDatabasePopulator(databasePopulator);
return initializer;
}
private JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
// JobRepositoryFactoryBean's methods Throws Generic Exception,
// it would have been better to have a specific one
factory.afterPropertiesSet();
return (JobRepository) factory.getObject();
}
private PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
// SimpleJobLauncher's methods Throws Generic Exception,
// it would have been better to have a specific one
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}

View File

@ -0,0 +1,54 @@
package org.baeldung.batch.model;
import java.util.Date;
import javax.xml.bind.annotation.XmlRootElement;
@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private Date transactionDate;
private double amount;
/* getters and setters for the attributes */
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public Date getTransactionDate() {
return transactionDate;
}
public void setTransactionDate(Date transactionDate) {
this.transactionDate = transactionDate;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
@Override
public String toString() {
return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]";
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2006-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.baeldung.batch.partitioner;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
public class CustomMultiResourcePartitioner implements Partitioner {
private static final String DEFAULT_KEY_NAME = "fileName";
private static final String PARTITION_KEY = "partition";
private Resource[] resources = new Resource[0];
private String keyName = DEFAULT_KEY_NAME;
/**
* The resources to assign to each partition. In Spring configuration you
* can use a pattern to select multiple resources.
* @param resources the resources to use
*/
public void setResources(Resource[] resources) {
this.resources = resources;
}
/**
* The name of the key for the file name in each {@link ExecutionContext}.
* Defaults to "fileName".
* @param keyName the value of the key
*/
public void setKeyName(String keyName) {
this.keyName = keyName;
}
/**
* Assign the filename of each of the injected resources to an
* {@link ExecutionContext}.
*
* @see Partitioner#partition(int)
*/
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: " + resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName", "output" + k++ + ".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}

View File

@ -0,0 +1,166 @@
package org.baeldung.batch.partitioner;
import org.baeldung.batch.model.Transaction;
import org.baeldung.batch.service.RecordFieldSetMapper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.io.IOException;
import java.net.MalformedURLException;
import java.text.ParseException;
@Configuration
@EnableBatchProcessing
public class SpringbatchPartitionConfig {
@Autowired
ResourcePatternResolver resoursePatternResolver;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean(name = "partitionerJob")
public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitionerJob")
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep")
.<Transaction, Transaction>chunk(1)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver.getResources("file:src/main/resources/input/partitioner/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
@Bean
@StepScope
public FlatFileItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/partitioner/" + filename));
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean(destroyMethod = "")
@StepScope
public StaxEventItemWriter<Transaction> itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(new FileSystemResource("src/main/resources/output/" + filename));
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Transaction.class);
return marshaller;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
private JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
// JobRepositoryFactoryBean's methods Throws Generic Exception,
// it would have been better to have a specific one
factory.afterPropertiesSet();
return factory.getObject();
}
private DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
return builder.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.build();
}
private PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
// SimpleJobLauncher's methods Throws Generic Exception,
// it would have been better to have a specific one
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}

View File

@ -0,0 +1,28 @@
package org.baeldung.batch.partitioner;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class SpringbatchPartitionerApp {
public static void main(final String[] args) {
// Spring Java config
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringbatchPartitionConfig.class);
context.refresh();
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
final Job job = (Job) context.getBean("partitionerJob");
System.out.println("Starting the batch job");
try {
final JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job succeeded");
} catch (final Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}

View File

@ -0,0 +1,12 @@
package org.baeldung.batch.service;
import org.baeldung.batch.model.Transaction;
import org.springframework.batch.item.ItemProcessor;
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) {
System.out.println("Processing..." + item);
return item;
}
}

View File

@ -0,0 +1,29 @@
package org.baeldung.batch.service;
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipPolicy;
public class CustomSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 2;
private static final int INVALID_TX_AMOUNT_LIMIT = -1000;
@Override
public boolean shouldSkip(Throwable throwable, int skipCount) throws SkipLimitExceededException {
if (throwable instanceof MissingUsernameException && skipCount < MAX_SKIP_COUNT) {
return true;
}
if (throwable instanceof NegativeAmountException && skipCount < MAX_SKIP_COUNT ) {
NegativeAmountException ex = (NegativeAmountException) throwable;
if(ex.getAmount() < INVALID_TX_AMOUNT_LIMIT){
return false;
} else{
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,4 @@
package org.baeldung.batch.service;
public class MissingUsernameException extends RuntimeException {
}

View File

@ -0,0 +1,14 @@
package org.baeldung.batch.service;
public class NegativeAmountException extends RuntimeException {
private double amount;
public NegativeAmountException(double amount){
this.amount = amount;
}
public double getAmount() {
return amount;
}
}

View File

@ -0,0 +1,35 @@
package org.baeldung.batch.service;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.baeldung.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
Transaction transaction = new Transaction();
// you can either use the indices or custom names
// I personally prefer the custom names easy for debugging and
// validating the pipelines
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt("userid"));
transaction.setAmount(fieldSet.readDouble(3));
// Converting the date
String dateString = fieldSet.readString(2);
try {
transaction.setTransactionDate(dateFormat.parse(dateString));
} catch (ParseException e) {
e.printStackTrace();
}
return transaction;
}
}

View File

@ -0,0 +1,24 @@
package org.baeldung.batch.service;
import org.baeldung.batch.model.Transaction;
import org.springframework.batch.item.ItemProcessor;
public class SkippingItemProcessor implements ItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction transaction) {
System.out.println("SkippingItemProcessor: " + transaction);
if (transaction.getUsername() == null || transaction.getUsername().isEmpty()) {
throw new MissingUsernameException();
}
double txAmount = transaction.getAmount();
if (txAmount < 0) {
throw new NegativeAmountException(txAmount);
}
return transaction;
}
}

View File

@ -0,0 +1,172 @@
package org.baeldung.batchscheduler;
import java.util.Date;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.baeldung.batchscheduler.model.Book;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
@Configuration
@EnableBatchProcessing
@EnableScheduling
public class SpringBatchScheduler {
private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class);
private AtomicBoolean enabled = new AtomicBoolean(true);
private AtomicInteger batchRunCounter = new AtomicInteger(0);
private final Map<Object, ScheduledFuture<?>> scheduledTasks = new IdentityHashMap<>();
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Scheduled(fixedRate = 2000)
public void launchJob() throws Exception {
Date date = new Date();
logger.debug("scheduler starts at " + date);
if (enabled.get()) {
JobExecution jobExecution = jobLauncher().run(job(), new JobParametersBuilder().addDate("launchDate", date)
.toJobParameters());
batchRunCounter.incrementAndGet();
logger.debug("Batch job ends with status as " + jobExecution.getStatus());
}
logger.debug("scheduler ends ");
}
public void stop() {
enabled.set(false);
}
public void start() {
enabled.set(true);
}
@Bean
public TaskScheduler poolScheduler() {
return new CustomTaskScheduler();
}
private class CustomTaskScheduler extends ThreadPoolTaskScheduler {
private static final long serialVersionUID = -7142624085505040603L;
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledFuture<?> future = super.scheduleAtFixedRate(task, period);
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task;
scheduledTasks.put(runnable.getTarget(), future);
return future;
}
}
public void cancelFutureSchedulerTasks() {
scheduledTasks.forEach((k, v) -> {
if (k instanceof SpringBatchScheduler) {
v.cancel(false);
}
});
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(readBooks())
.build();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRepository jobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(new ResourcelessTransactionManager());
return (JobRepository) factory.getObject();
}
@Bean
protected Step readBooks() {
return stepBuilderFactory.get("readBooks")
.<Book, Book> chunk(2)
.reader(reader())
.writer(writer())
.build();
}
@Bean
public FlatFileItemReader<Book> reader() {
return new FlatFileItemReaderBuilder<Book>().name("bookItemReader")
.resource(new ClassPathResource("books.csv"))
.delimited()
.names(new String[] { "id", "name" })
.fieldSetMapper(new BeanWrapperFieldSetMapper<Book>() {
{
setTargetType(Book.class);
}
})
.build();
}
@Bean
public ItemWriter<Book> writer() {
return new ItemWriter<Book>() {
@Override
public void write(List<? extends Book> items) throws Exception {
logger.debug("writer..." + items.size());
for (Book item : items) {
logger.debug(item.toString());
}
}
};
}
public AtomicInteger getBatchRunCounter() {
return batchRunCounter;
}
}

View File

@ -0,0 +1,35 @@
package org.baeldung.batchscheduler.model;
public class Book {
private int id;
private String name;
public Book() {}
public Book(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Book [id=" + id + ", name=" + name + "]";
}
}

View File

@ -0,0 +1,36 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemProcessor;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
public class LineProcessor implements ItemProcessor<Line, Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,36 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemReader;
public class LineReader implements ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,39 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
public class LinesWriter implements ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
}

View File

@ -0,0 +1,90 @@
package org.baeldung.taskletsvschunks.config;
import org.baeldung.taskletsvschunks.chunks.LineProcessor;
import org.baeldung.taskletsvschunks.chunks.LineReader;
import org.baeldung.taskletsvschunks.chunks.LinesWriter;
import org.baeldung.taskletsvschunks.model.Line;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing
public class ChunksConfig {
@Autowired private JobBuilderFactory jobs;
@Autowired private StepBuilderFactory steps;
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(transactionManager());
return (JobRepository) factory.getObject();
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(ItemReader<Line> reader, ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return steps.get("processLines").<Line, Line> chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job() {
return jobs
.get("chunksJob")
.start(processLines(itemReader(), itemProcessor(), itemWriter()))
.build();
}
}

View File

@ -0,0 +1,103 @@
package org.baeldung.taskletsvschunks.config;
import org.baeldung.taskletsvschunks.tasklets.LinesProcessor;
import org.baeldung.taskletsvschunks.tasklets.LinesReader;
import org.baeldung.taskletsvschunks.tasklets.LinesWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing
public class TaskletsConfig {
@Autowired private JobBuilderFactory jobs;
@Autowired private StepBuilderFactory steps;
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(transactionManager());
return (JobRepository) factory.getObject();
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
@Bean
public LinesReader linesReader() {
return new LinesReader();
}
@Bean
public LinesProcessor linesProcessor() {
return new LinesProcessor();
}
@Bean
public LinesWriter linesWriter() {
return new LinesWriter();
}
@Bean
protected Step readLines() {
return steps
.get("readLines")
.tasklet(linesReader())
.build();
}
@Bean
protected Step processLines() {
return steps
.get("processLines")
.tasklet(linesProcessor())
.build();
}
@Bean
protected Step writeLines() {
return steps
.get("writeLines")
.tasklet(linesWriter())
.build();
}
@Bean
public Job job() {
return jobs
.get("taskletsJob")
.start(readLines())
.next(processLines())
.next(writeLines())
.build();
}
}

View File

@ -0,0 +1,55 @@
package org.baeldung.taskletsvschunks.model;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
public Line(String name, LocalDate dob) {
this.name = name;
this.dob = dob;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public LocalDate getDob() {
return dob;
}
public void setDob(LocalDate dob) {
this.dob = dob;
}
public Long getAge() {
return age;
}
public void setAge(Long age) {
this.age = age;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(this.name);
sb.append(",");
sb.append(this.dob.format(DateTimeFormatter.ofPattern("MM/dd/yyyy")));
if (this.age != null) {
sb.append(",");
sb.append(this.age);
}
sb.append("]");
return sb.toString();
}
}

View File

@ -0,0 +1,49 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
public class LinesProcessor implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesProcessor.class);
private List<Line> lines;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,53 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.ArrayList;
import java.util.List;
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<Line>();
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,50 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.List;
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,95 @@
package org.baeldung.taskletsvschunks.utils;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class FileUtils {
private final Logger logger = LoggerFactory.getLogger(FileUtils.class);
private String fileName;
private CSVReader CSVReader;
private CSVWriter CSVWriter;
private FileReader fileReader;
private FileWriter fileWriter;
private File file;
public FileUtils(String fileName) {
this.fileName = fileName;
}
public Line readLine() {
try {
if (CSVReader == null) initReader();
String[] line = CSVReader.readNext();
if (line == null) return null;
return new Line(line[0], LocalDate.parse(line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy")));
} catch (Exception e) {
logger.error("Error while reading line in file: " + this.fileName);
return null;
}
}
public void writeLine(Line line) {
try {
if (CSVWriter == null) initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
} catch (Exception e) {
logger.error("Error while writing line in file: " + this.fileName);
}
}
private void initReader() throws Exception {
ClassLoader classLoader = this
.getClass()
.getClassLoader();
if (file == null) file = new File(classLoader
.getResource(fileName)
.getFile());
if (fileReader == null) fileReader = new FileReader(file);
if (CSVReader == null) CSVReader = new CSVReader(fileReader);
}
private void initWriter() throws Exception {
if (file == null) {
file = new File(fileName);
file.createNewFile();
}
if (fileWriter == null) fileWriter = new FileWriter(file, true);
if (CSVWriter == null) CSVWriter = new CSVWriter(fileWriter);
}
public void closeWriter() {
try {
CSVWriter.close();
fileWriter.close();
} catch (IOException e) {
logger.error("Error while closing writer.");
}
}
public void closeReader() {
try {
CSVReader.close();
fileReader.close();
} catch (IOException e) {
logger.error("Error while closing reader.");
}
}
}

View File

@ -0,0 +1,4 @@
1,SHARP OBJECTS (MOVIE TIE-IN): A NOVEL
2,ARTEMIS: A NOVEL
3,HER PRETTY FACE
4,ALL WE EVER WANTED
1 1 SHARP OBJECTS (MOVIE TIE-IN): A NOVEL
2 2 ARTEMIS: A NOVEL
3 3 HER PRETTY FACE
4 4 ALL WE EVER WANTED

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,4 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411

View File

@ -0,0 +1,7 @@
username, user_id, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
, 2536, 3/10/2019, 100
mike, 9876, 5/11/2018, -500
, 3425, 10/10/2017, 100
1 username user_id transaction_date transaction_amount
2 devendra 1234 31/10/2015 10000
3 john 2134 3/12/2015 12321
4 robin 2134 2/02/2015 23411
5 2536 3/10/2019 100
6 mike 9876 5/11/2018 -500
7 3425 10/10/2017 100

View File

@ -0,0 +1,24 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<logger name="org.baeldung.taskletsvschunks" level="debug"
additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.baeldung.batchscheduler" level="debug" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<root level="error">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+05:30</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+05:30</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+05:30</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+05:30</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+05:30</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+05:30</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>

View File

@ -0,0 +1,57 @@
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
">
<import resource="spring.xml" />
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="input/record.csv" />
<property name="lineMapper">
<bean
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="username,userid,transaction_date,transaction_amount" />
</bean>
</property>
<property name="fieldSetMapper">
<bean
class="org.baeldung.batch.service.RecordFieldSetMapper" />
</property>
</bean>
</property>
<property name="linesToSkip" value="1" />
</bean>
<bean id="itemProcessor" class="org.baeldung.batch.service.CustomItemProcessor" />
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="recordMarshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>
<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>org.baeldung.batch.model.Transaction</value>
</list>
</property>
</bean>
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
</beans>

View File

@ -0,0 +1,45 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc-4.2.xsd">
<!-- connect to SQLite database -->
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.sqlite.JDBC" />
<property name="url" value="jdbc:sqlite:repository.sqlite" />
<property name="username" value="" />
<property name="password" value="" />
</bean>
<!-- create job-meta tables automatically -->
<jdbc:initialize-database data-source="dataSource">
<jdbc:script
location="org/springframework/batch/core/schema-drop-sqlite.sql" />
<jdbc:script location="org/springframework/batch/core/schema-sqlite.sql" />
</jdbc:initialize-database>
<!-- stored job-meta in memory -->
<!-- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" /> </bean> -->
<!-- stored job-meta in database -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="sqlite" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>

View File

@ -0,0 +1,6 @@
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
1 Mae Hodges 10/22/1972
2 Gary Potter 02/22/1953
3 Betty Wise 02/17/1968
4 Wayne Rose 04/06/1977
5 Adam Caldwell 09/27/1995
6 Lucille Phillips 05/14/1992

View File

@ -0,0 +1,12 @@
package org.baeldung;
import org.baeldung.batch.App;
import org.junit.Test;
public class SpringContextIntegrationTest {
@Test
public final void testMain() throws Exception {
App.main(null);
}
}

View File

@ -0,0 +1,12 @@
package org.baeldung;
import org.baeldung.batch.App;
import org.junit.Test;
public class SpringContextTest {
@Test
public final void testMain() throws Exception {
App.main(null);
}
}

View File

@ -0,0 +1,60 @@
package org.baeldung.batchscheduler;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.*;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringBatchScheduler.class)
public class SpringBatchSchedulerIntegrationTest {
@Autowired
private ApplicationContext context;
@Test
public void stopJobsWhenSchedulerDisabled() throws Exception {
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get()));
schedulerBean.stop();
await().atLeast(3, SECONDS);
Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get());
}
@Test
public void stopJobSchedulerWhenSchedulerDestroyed() throws Exception {
ScheduledAnnotationBeanPostProcessor bean = context.getBean(ScheduledAnnotationBeanPostProcessor.class);
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get()));
bean.postProcessBeforeDestruction(schedulerBean, "SpringBatchScheduler");
await().atLeast(3, SECONDS);
Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get());
}
@Test
public void stopJobSchedulerWhenFutureTasksCancelled() throws Exception {
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
await().untilAsserted(() -> Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get()));
schedulerBean.cancelFutureSchedulerTasks();
await().atLeast(3, SECONDS);
Assert.assertEquals(2, schedulerBean.getBatchRunCounter()
.get());
}
}

View File

@ -0,0 +1,25 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.config.ChunksConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksIntegrationTest {
@Autowired private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob_WhenJobEnds_ThenStatusCompleted() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}

View File

@ -0,0 +1,25 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.config.TaskletsConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsIntegrationTest {
@Autowired private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob_WhenJobEnds_ThenStatusCompleted() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}

View File

@ -0,0 +1 @@
<?xml version="1.0" encoding="UTF-8"?><transactionRecord><transactionRecord><amount>10000.0</amount><transactionDate>2015-10-31T00:00:00+05:30</transactionDate><userId>1234</userId><username>devendra</username></transactionRecord><transactionRecord><amount>12321.0</amount><transactionDate>2015-12-03T00:00:00+05:30</transactionDate><userId>2134</userId><username>john</username></transactionRecord><transactionRecord><amount>23411.0</amount><transactionDate>2015-02-02T00:00:00+05:30</transactionDate><userId>2134</userId><username>robin</username></transactionRecord></transactionRecord>