BAEL-1543: Spring Batch: Tasklets vs Chunks

This commit is contained in:
Magdalena Krause 2018-02-15 12:57:27 -03:00
parent 4f6988b134
commit 443e990fd1
15 changed files with 599 additions and 2 deletions

View File

@ -18,9 +18,10 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>4.3.4.RELEASE</spring.version>
<spring.batch.version>3.0.7.RELEASE</spring.batch.version>
<spring.version>5.0.3.RELEASE</spring.version>
<spring.batch.version>4.0.0.RELEASE</spring.batch.version>
<sqlite.version>3.15.1</sqlite.version>
<opencsv.version>4.1</opencsv.version>
</properties>
<dependencies>
@ -51,6 +52,16 @@
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,36 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemProcessor;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
public class LineProcessor implements ItemProcessor<Line, Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,36 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemReader;
public class LineReader implements ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,39 @@
package org.baeldung.taskletsvschunks.chunks;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
public class LinesWriter implements ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
}

View File

@ -0,0 +1,55 @@
package org.baeldung.taskletsvschunks.model;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
public Line(String name, LocalDate dob) {
this.name = name;
this.dob = dob;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public LocalDate getDob() {
return dob;
}
public void setDob(LocalDate dob) {
this.dob = dob;
}
public Long getAge() {
return age;
}
public void setAge(Long age) {
this.age = age;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(this.name);
sb.append(",");
sb.append(this.dob.format(DateTimeFormatter.ofPattern("MM/dd/yyyy")));
if (this.age != null) {
sb.append(",");
sb.append(this.age);
}
sb.append("]");
return sb.toString();
}
}

View File

@ -0,0 +1,49 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
public class LinesProcessor implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesProcessor.class);
private List<Line> lines;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,53 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.ArrayList;
import java.util.List;
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<Line>();
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,50 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.baeldung.taskletsvschunks.model.Line;
import org.baeldung.taskletsvschunks.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.List;
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}

View File

@ -0,0 +1,95 @@
package org.baeldung.taskletsvschunks.utils;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import org.baeldung.taskletsvschunks.model.Line;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
public class FileUtils {
private final Logger logger = LoggerFactory.getLogger(FileUtils.class);
private String fileName;
private CSVReader CSVReader;
private CSVWriter CSVWriter;
private FileReader fileReader;
private FileWriter fileWriter;
private File file;
public FileUtils(String fileName) {
this.fileName = fileName;
}
public Line readLine() {
try {
if (CSVReader == null) initReader();
String[] line = CSVReader.readNext();
if (line == null) return null;
return new Line(line[0], LocalDate.parse(line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy")));
} catch (Exception e) {
logger.error("Error while reading line in file: " + this.fileName);
return null;
}
}
public void writeLine(Line line) {
try {
if (CSVWriter == null) initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
} catch (Exception e) {
logger.error("Error while writing line in file: " + this.fileName);
}
}
private void initReader() throws Exception {
ClassLoader classLoader = this
.getClass()
.getClassLoader();
if (file == null) file = new File(classLoader
.getResource(fileName)
.getFile());
if (fileReader == null) fileReader = new FileReader(file);
if (CSVReader == null) CSVReader = new CSVReader(fileReader);
}
private void initWriter() throws Exception {
if (file == null) {
file = new File(fileName);
file.createNewFile();
}
if (fileWriter == null) fileWriter = new FileWriter(file, true);
if (CSVWriter == null) CSVWriter = new CSVWriter(fileWriter);
}
public void closeWriter() {
try {
CSVWriter.close();
fileWriter.close();
} catch (IOException e) {
logger.error("Error while closing writer.");
}
}
public void closeReader() {
try {
CSVReader.close();
fileReader.close();
} catch (IOException e) {
logger.error("Error while closing reader.");
}
}
}

View File

@ -0,0 +1,33 @@
<configuration>
<appender name="FILE-AUDIT"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>C:\Users\admin\Desktop\Baeldung\repo\magkrause\tutorials\spring-batch\src\main\resources\taskletsvschunks\logs.log</file>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} [%thread] %level %logger{35} - %msg%n
</Pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- rollover daily -->
<fileNamePattern>${DEV_HOME}/archived/debug.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<logger name="org.baeldung.taskletsvschunks" level="debug"
additivity="false">
<appender-ref ref="FILE-AUDIT" />
</logger>
<root level="error">
<appender-ref ref="FILE-AUDIT" />
</root>
</configuration>

View File

@ -0,0 +1,39 @@
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
<bean class="org.springframework.batch.test.JobLauncherTestUtils"/>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<bean id="lineReader" class="org.baeldung.taskletsvschunks.chunks.LineReader"/>
<bean id="lineProcessor" class="org.baeldung.taskletsvschunks.chunks.LineProcessor"/>
<bean id="linesWriter" class="org.baeldung.taskletsvschunks.chunks.LinesWriter"/>
<batch:job id="chunksJob">
<batch:step id="processLines">
<batch:tasklet>
<batch:chunk reader="lineReader" writer="linesWriter" processor="lineProcessor" commit-interval="2"/>
</batch:tasklet>
<batch:end on="FAILED"/>
<batch:end on="COMPLETED"/>
</batch:step>
</batch:job>
</beans>

View File

@ -0,0 +1,6 @@
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
1 Mae Hodges 10/22/1972
2 Gary Potter 02/22/1953
3 Betty Wise 02/17/1968
4 Wayne Rose 04/06/1977
5 Adam Caldwell 09/27/1995
6 Lucille Phillips 05/14/1992

View File

@ -0,0 +1,47 @@
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
<bean class="org.springframework.batch.test.JobLauncherTestUtils"/>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<bean id="linesReader" class="org.baeldung.taskletsvschunks.tasklets.LinesReader"/>
<bean id="linesProcessor" class="org.baeldung.taskletsvschunks.tasklets.LinesProcessor"/>
<bean id="linesWriter" class="org.baeldung.taskletsvschunks.tasklets.LinesWriter"/>
<batch:job id="taskletsJob">
<batch:step id="readLines">
<batch:tasklet ref="linesReader"/>
<batch:end on="FAILED"/>
<batch:next on="*" to="processLines"/>
</batch:step>
<batch:step id="processLines">
<batch:tasklet ref="linesProcessor"/>
<batch:end on="FAILED"/>
<batch:next on="*" to="writeLines"/>
</batch:step>
<batch:step id="writeLines">
<batch:tasklet ref="linesWriter"/>
<batch:end on="FAILED"/>
<batch:end on="COMPLETED"/>
</batch:step>
</batch:job>
</beans>

View File

@ -0,0 +1,24 @@
package org.baeldung.taskletsvschunks.chunks;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/taskletsvschunks/chunks.xml")
public class ChunksTest {
@Autowired private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void test() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}

View File

@ -0,0 +1,24 @@
package org.baeldung.taskletsvschunks.tasklets;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/taskletsvschunks/tasklets.xml")
public class TaskletsTest {
@Autowired private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void test() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}