BAEL-2168 Java EE 7 Batch Processing (#5645)

* jberet batch

* Batch Understanding

* partition

* exception

* some more changes
This commit is contained in:
Amitabh Mandal 2018-11-25 15:31:22 +05:30 committed by Grzegorz Piwowarek
parent 2f9fee0391
commit f784c3bb79
39 changed files with 1473 additions and 414 deletions

View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>jee-7</artifactId> <artifactId>jee-7</artifactId>
@ -121,6 +122,58 @@
<artifactId>spring-security-taglibs</artifactId> <artifactId>spring-security-taglibs</artifactId>
<version>${org.springframework.security.version}</version> <version>${org.springframework.security.version}</version>
</dependency> </dependency>
<!-- Batch dependencies -->
<dependency>
<groupId>org.jboss.spec.javax.batch</groupId>
<artifactId>jboss-batch-api_1.0_spec</artifactId>
<version>1.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-core</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-support</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.transaction</groupId>
<artifactId>jboss-transaction-api_1.2_spec</artifactId>
<version>1.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.4.2.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.weld</groupId>
<artifactId>weld-core</artifactId>
<version>2.1.1.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se</artifactId>
<version>2.1.1.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-se</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.178</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-jetty-servlet</artifactId>
<version>2.22.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -135,6 +188,42 @@
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-pmd-plugin
</artifactId>
<versionRange>
[3.8,)
</versionRange>
<goals>
<goal>check</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build> </build>
<dependencyManagement> <dependencyManagement>
@ -196,6 +285,12 @@
<artifactId>wildfly-arquillian-container-managed</artifactId> <artifactId>wildfly-arquillian-container-managed</artifactId>
<version>${wildfly.version}</version> <version>${wildfly.version}</version>
<scope>test</scope> <scope>test</scope>
<exclusions>
<exclusion>
<groupId>sun.jdk</groupId>
<artifactId>jconsole</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
</dependencies> </dependencies>
@ -224,6 +319,12 @@
<type>zip</type> <type>zip</type>
<overWrite>false</overWrite> <overWrite>false</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory> <outputDirectory>${project.build.directory}</outputDirectory>
<exclusions>
<exclusion>
<groupId>sun.jdk</groupId>
<artifactId>jconsole</artifactId>
</exclusion>
</exclusions>
</artifactItem> </artifactItem>
</artifactItems> </artifactItems>
</configuration> </configuration>
@ -275,6 +376,12 @@
<artifactId>wildfly-arquillian-container-remote</artifactId> <artifactId>wildfly-arquillian-container-remote</artifactId>
<version>${wildfly.version}</version> <version>${wildfly.version}</version>
<scope>test</scope> <scope>test</scope>
<exclusions>
<exclusion>
<groupId>sun.jdk</groupId>
<artifactId>jconsole</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
</dependencies> </dependencies>
</profile> </profile>

View File

@ -0,0 +1,11 @@
package com.baeldung.batch.understanding;
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.inject.Named;
@Named
public class ChunkExceptionSkipReadListener implements SkipReadListener {
@Override
public void onSkipReadItem(Exception e) throws Exception {
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.batch.understanding;
import javax.batch.api.chunk.AbstractCheckpointAlgorithm;
import javax.inject.Named;
@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
@Override
public boolean isReadyToCheckpoint() throws Exception {
return SimpleChunkItemReader.COUNT % 5 == 0;
}
}

View File

@ -0,0 +1,14 @@
package com.baeldung.batch.understanding;
import javax.batch.api.Decider;
import javax.batch.runtime.StepExecution;
import javax.inject.Named;
@Named
public class DeciderJobSequence implements Decider {
@Override
public String decide(StepExecution[] ses) throws Exception {
return "nothing";
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.batch.understanding;
import javax.batch.api.AbstractBatchlet;
import javax.batch.api.BatchProperty;
import javax.batch.runtime.BatchStatus;
import javax.inject.Inject;
import javax.inject.Named;
@Named
public class InjectSimpleBatchLet extends AbstractBatchlet {
@Inject
@BatchProperty(name = "name")
private String nameString;
@Override
public String process() throws Exception {
System.out.println("Value passed in = " + nameString);
return BatchStatus.COMPLETED.toString();
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.batch.understanding;
import javax.batch.api.AbstractBatchlet;
import javax.batch.runtime.BatchStatus;
import javax.inject.Named;
@Named
public class SimpleBatchLet extends AbstractBatchlet {
@Override
public String process() throws Exception {
return BatchStatus.FAILED.toString();
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.batch.understanding;
import javax.batch.api.chunk.ItemProcessor;
import javax.inject.Named;
@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
@Override
public Integer processItem(Object t) {
return ((Integer) t).intValue() % 2 == 0 ? null : ((Integer) t).intValue();
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.batch.understanding;
import java.io.Serializable;
import java.util.StringTokenizer;
import javax.batch.api.chunk.AbstractItemReader;
import javax.inject.Named;
@Named
public class SimpleChunkItemReader extends AbstractItemReader {
private StringTokenizer tokens;
public static int COUNT = 0;
@Override
public Integer readItem() throws Exception {
if (tokens.hasMoreTokens()) {
COUNT++;
String tempTokenize = tokens.nextToken();
return Integer.valueOf(tempTokenize);
}
return null;
}
@Override
public void open(Serializable checkpoint) throws Exception {
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.batch.understanding;
import java.io.Serializable;
import java.util.StringTokenizer;
import javax.batch.api.chunk.AbstractItemReader;
import javax.inject.Named;
@Named
public class SimpleChunkItemReaderError extends AbstractItemReader {
private StringTokenizer tokens;
public static int COUNT = 0;
@Override
public Integer readItem() throws Exception {
if (tokens.hasMoreTokens()) {
COUNT++;
int token = Integer.valueOf(tokens.nextToken());
if (token == 3) {
throw new RuntimeException("Something happened");
}
return Integer.valueOf(token);
}
return null;
}
@Override
public void open(Serializable checkpoint) throws Exception {
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.batch.understanding;
import java.util.List;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.inject.Named;
@Named
public class SimpleChunkWriter extends AbstractItemWriter {
@Override
public void writeItems(List<Object> items) throws Exception {
}
}

View File

@ -0,0 +1,41 @@
package com.baeldung.batch.understanding.exception;
import java.io.Serializable;
public class MyInputRecord implements Serializable {
private int id;
public MyInputRecord(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MyInputRecord that = (MyInputRecord) o;
return id == that.id;
}
@Override
public int hashCode() {
return id;
}
@Override
public String toString() {
return "MyInputRecord: " + id;
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.ItemProcessor;
import javax.inject.Named;
@Named
public class MyItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object t) {
if (((MyInputRecord) t).getId() == 6) {
throw new NullPointerException();
}
return new MyOutputRecord(((MyInputRecord) t).getId());
}
}

View File

@ -0,0 +1,42 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.AbstractItemReader;
import javax.inject.Named;
import java.io.Serializable;
import java.util.StringTokenizer;
@Named
public class MyItemReader extends AbstractItemReader {
private StringTokenizer tokens;
private MyInputRecord lastElement;
private boolean alreadyFailed;
@Override
public void open(Serializable checkpoint) {
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");
if (checkpoint != null) {
while (!Integer.valueOf(tokens.nextToken())
.equals(((MyInputRecord) checkpoint).getId())) {
}
}
}
@Override
public Object readItem() {
if (tokens.hasMoreTokens()) {
int token = Integer.valueOf(tokens.nextToken());
if (token == 5 && !alreadyFailed) {
alreadyFailed = true;
throw new IllegalArgumentException("Could not read record");
}
lastElement = new MyInputRecord(token);
return lastElement;
}
return null;
}
@Override
public Serializable checkpointInfo() throws Exception {
return lastElement;
}
}

View File

@ -0,0 +1,19 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.inject.Named;
import java.util.List;
@Named
public class MyItemWriter extends AbstractItemWriter {
private static int retries = 0;
@Override
public void writeItems(List list) {
if (retries <= 3 && list.contains(new MyOutputRecord(8))) {
retries++;
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,39 @@
package com.baeldung.batch.understanding.exception;
import java.io.Serializable;
public class MyOutputRecord implements Serializable {
private int id;
public MyOutputRecord(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyOutputRecord that = (MyOutputRecord) o;
return id == that.id;
}
@Override
public int hashCode() {
return id;
}
@Override
public String toString() {
return "MyOutputRecord: " + id;
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.RetryProcessListener;
import javax.inject.Named;
@Named
public class MyRetryProcessorListener implements RetryProcessListener {
@Override
public void onRetryProcessException(Object item, Exception ex) throws Exception {
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.RetryReadListener;
import javax.inject.Named;
@Named
public class MyRetryReadListener implements RetryReadListener {
@Override
public void onRetryReadException(Exception ex) throws Exception {
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.RetryWriteListener;
import javax.inject.Named;
import java.util.List;
@Named
public class MyRetryWriteListener implements RetryWriteListener {
@Override
public void onRetryWriteException(List<Object> items, Exception ex) throws Exception {
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.SkipProcessListener;
import javax.inject.Named;
@Named
public class MySkipProcessorListener implements SkipProcessListener {
@Override
public void onSkipProcessItem(Object t, Exception e) throws Exception {
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.inject.Named;
@Named
public class MySkipReadListener implements SkipReadListener {
@Override
public void onSkipReadItem(Exception e) throws Exception {
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.batch.understanding.exception;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.inject.Named;
import java.util.List;
@Named
public class MySkipWriteListener implements SkipWriteListener {
@Override
public void onSkipWriteItem(List list, Exception e) throws Exception {
list.remove(new MyOutputRecord(2));
}
}

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="customCheckPoint" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" version="1.0">
<step id="firstChunkStep" >
<chunk item-count="3" checkpoint-policy="custom">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
<checkpoint-algorithm ref="customCheckPoint"/>
</chunk>
</step>
</job>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="decideJobSequence" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstBatchStepStep1" next="firstDecider">
<batchlet ref="simpleBatchLet" />
</step>
<decision id="firstDecider" ref="deciderJobSequence">
<next on="nothing" to="firstBatchStepStep3"/>
</decision>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet" />
</step>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="flowJobSequence" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<flow id="flow1" next="firstBatchStepStep3">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader" />
<processor ref="simpleChunkItemProcessor" />
<writer ref="simpleChunkWriter" />
</chunk>
</step>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="injectSimpleBatchLet" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstStep" >
<batchlet ref="injectSimpleBatchLet">
<properties>
<property name="name" value="helloThere" />
</properties>
</batchlet>
</step>
</job>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="injectSimpleBatchLet"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstStep">
<batchlet ref="injectSimpleBatchLet">
<properties>
<property name="name" value="#{partitionPlan['name']}" />
</properties>
</batchlet>
<partition>
<plan partitions="2">
<properties partition="0">
<property name="name" value="firstPartition" />
</properties>
<properties partition="1">
<property name="name" value="secondpPartition" />
</properties>
</plan>
</partition>
</step>
</job>

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="simpleBatchLet" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstStep" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="simpleChunk" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" version="1.0">
<step id="firstChunkStep" >
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
</job>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="simpleErrorChunk"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstErrorChunkStep" >
<chunk item-count="3">
<reader ref="simpleChunkItemReaderError"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
<step id="firstErrorSkipChunkStep" >
<chunk item-count="3" skip-limit="3">
<reader ref="simpleChunkItemReaderError"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
<skippable-exception-classes>
<include class="java.lang.RuntimeException"/>
</skippable-exception-classes>
</chunk>
</step>
</job>

View File

@ -0,0 +1,26 @@
<job id="simpleErrorSkipChunk" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" version="1.0">
<step id="errorStep" >
<listeners>
<listener ref="mySkipReadListener"/>
<listener ref="mySkipProcessorListener"/>
<listener ref="mySkipWriteListener"/>
<listener ref="myRetryReadListener"/>
<listener ref="myRetryProcessorListener"/>
<listener ref="myRetryWriteListener"/>
</listeners>
<chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
<reader ref="myItemReader"/>
<processor ref="myItemProcessor"/>
<writer ref="myItemWriter"/>
<skippable-exception-classes>
<include class="java.lang.RuntimeException"/>
<include class="java.lang.UnsupportedOperationException"/>
</skippable-exception-classes>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
<include class="java.lang.UnsupportedOperationException"/>
</retryable-exception-classes>
</chunk>
</step>
</job>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="simpleJobSequence"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
<step id="firstBatchStepStep2" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="splitJobSequence"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<split id="split1" next="splitJobSequenceStep3">
<flow id="flow1">
<step id="splitJobSequenceStep1">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
<flow id="flow2">
<step id="splitJobSequenceStep2">
<batchlet ref="simpleBatchLet" />
</step>
</flow>
</split>
<step id="splitJobSequenceStep3">
<batchlet ref="simpleBatchLet" />
</step>
</job>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
bean-discovery-mode="all">
</beans>

View File

@ -0,0 +1,79 @@
package com.baeldung.batch.understanding;
import java.util.HashMap;
import java.util.Map;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.Metric;
public class BatchTestHelper {
private static final int MAX_TRIES = 40;
private static final int THREAD_SLEEP = 1000;
private BatchTestHelper() {
throw new UnsupportedOperationException();
}
public static JobExecution keepTestAlive(JobExecution jobExecution) throws InterruptedException {
int maxTries = 0;
while (!jobExecution.getBatchStatus()
.equals(BatchStatus.COMPLETED)) {
if (maxTries < MAX_TRIES) {
maxTries++;
Thread.sleep(THREAD_SLEEP);
jobExecution = BatchRuntime.getJobOperator()
.getJobExecution(jobExecution.getExecutionId());
} else {
break;
}
}
Thread.sleep(THREAD_SLEEP);
return jobExecution;
}
public static JobExecution keepTestFailed(JobExecution jobExecution) throws InterruptedException {
int maxTries = 0;
while (!jobExecution.getBatchStatus()
.equals(BatchStatus.FAILED)) {
if (maxTries < MAX_TRIES) {
maxTries++;
Thread.sleep(THREAD_SLEEP);
jobExecution = BatchRuntime.getJobOperator()
.getJobExecution(jobExecution.getExecutionId());
} else {
break;
}
}
Thread.sleep(THREAD_SLEEP);
return jobExecution;
}
public static JobExecution keepTestStopped(JobExecution jobExecution) throws InterruptedException {
int maxTries = 0;
while (!jobExecution.getBatchStatus()
.equals(BatchStatus.STOPPED)) {
if (maxTries < MAX_TRIES) {
maxTries++;
Thread.sleep(THREAD_SLEEP);
jobExecution = BatchRuntime.getJobOperator()
.getJobExecution(jobExecution.getExecutionId());
} else {
break;
}
}
Thread.sleep(THREAD_SLEEP);
return jobExecution;
}
public static Map<Metric.MetricType, Long> getMetricsMap(Metric[] metrics) {
Map<Metric.MetricType, Long> metricsMap = new HashMap<>();
for (Metric metric : metrics) {
metricsMap.put(metric.getType(), metric.getValue());
}
return metricsMap;
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.batch.understanding;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Map;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import com.baeldung.batch.understanding.BatchTestHelper;
import org.junit.jupiter.api.Test;
class CustomCheckPointUnitTest {
@Test
public void givenChunk_whenCustomCheckPoint_thenCommitCount_3() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("customCheckPoint", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
for (StepExecution stepExecution : jobOperator.getStepExecutions(executionId)) {
if (stepExecution.getStepName()
.equals("firstChunkStep")) {
Map<Metric.MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
assertEquals(3L, metricsMap.get(Metric.MetricType.COMMIT_COUNT)
.longValue());
}
}
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
}

View File

@ -0,0 +1,74 @@
package com.baeldung.batch.understanding;
import static org.junit.jupiter.api.Assertions.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.StepExecution;
import org.junit.jupiter.api.Test;
class JobSequenceUnitTest {
@Test
public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleJobSequence", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("flowJobSequence", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(3 , jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenDecider_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("decideJobSequence", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
List<String> executedSteps = new ArrayList<>();
for (StepExecution stepExecution : stepExecutions) {
executedSteps.add(stepExecution.getStepName());
}
assertEquals(2, jobOperator.getStepExecutions(executionId).size());
assertArrayEquals(new String[] { "firstBatchStepStep1", "firstBatchStepStep3" }, executedSteps.toArray());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("splitJobSequence", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
List<String> executedSteps = new ArrayList<>();
for (StepExecution stepExecution : stepExecutions) {
executedSteps.add(stepExecution.getStepName());
}
assertEquals(3, stepExecutions.size());
assertTrue(executedSteps.contains("splitJobSequenceStep1"));
assertTrue(executedSteps.contains("splitJobSequenceStep2"));
assertTrue(executedSteps.contains("splitJobSequenceStep3"));
assertTrue(executedSteps.get(0).equals("splitJobSequenceStep1") || executedSteps.get(0).equals("splitJobSequenceStep2"));
assertTrue(executedSteps.get(1).equals("splitJobSequenceStep1") || executedSteps.get(1).equals("splitJobSequenceStep2"));
assertTrue(executedSteps.get(2).equals("splitJobSequenceStep3"));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
}

View File

@ -0,0 +1,68 @@
package com.baeldung.batch.understanding;
import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import org.junit.jupiter.api.Test;
class SimpleBatchLetUnitTest {
@Test
public void givenBatchLet_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenBatchLetProperty_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("injectionSimpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenBatchLetPartition_thenBatch_CompleteWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("partitionSimpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobOperator.stop(executionId);
jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}
@Test
public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobOperator.stop(executionId);
jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
}

View File

@ -0,0 +1,67 @@
package com.baeldung.batch.understanding;
import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import org.junit.jupiter.api.Test;
class SimpleChunkUnitTest {
@Test
public void givenChunk_thenBatch_CompletesWithSucess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
for (StepExecution stepExecution : stepExecutions) {
if (stepExecution.getStepName()
.equals("firstChunkStep")) {
Map<Metric.MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
assertEquals(10L, metricsMap.get(Metric.MetricType.READ_COUNT)
.longValue());
assertEquals(10L / 2L, metricsMap.get(Metric.MetricType.WRITE_COUNT)
.longValue());
assertEquals(10L / 3 + (10L % 3 > 0 ? 1 : 0), metricsMap.get(Metric.MetricType.COMMIT_COUNT)
.longValue());
}
}
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
@Test
public void givenChunk__thenBatch_fetchInformation() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
// job name contains simpleBatchLet which is the name of the file
assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
// job parameters are empty
assertTrue(jobOperator.getParameters(executionId).isEmpty());
// step execution information
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
assertEquals("firstChunkStep", stepExecutions.get(0).getStepName());
// finding out batch status
assertEquals(BatchStatus.COMPLETED, stepExecutions.get(0).getBatchStatus());
Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecutions.get(0).getMetrics());
assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue());
assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue());
assertEquals(0L, metricTest.get(Metric.MetricType.READ_SKIP_COUNT).longValue());
assertEquals(0L, metricTest.get(Metric.MetricType.WRITE_SKIP_COUNT).longValue());
assertEquals(0L, metricTest.get(Metric.MetricType.PROCESS_SKIP_COUNT).longValue());
assertEquals(0L, metricTest.get(Metric.MetricType.ROLLBACK_COUNT).longValue());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
}

View File

@ -0,0 +1,48 @@
package com.baeldung.batch.understanding;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.Metric.MetricType;
import javax.batch.runtime.StepExecution;
import org.junit.jupiter.api.Test;
class SimpleErrorChunkUnitTest {
@Test
public void givenChunkError_thenBatch_CompletesWithFailed() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleErrorChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestFailed(jobExecution);
System.out.println(jobExecution.getBatchStatus());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}
@Test
public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleErrorSkipChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
for (StepExecution stepExecution : stepExecutions) {
if (stepExecution.getStepName()
.equals("errorStep")) {
Map<MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
long skipCount = metricsMap.get(MetricType.PROCESS_SKIP_COUNT)
.longValue();
assertTrue("Skip count=" + skipCount, skipCount == 1l || skipCount == 2l);
}
}
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
}