BAEL-2168 Java EE 7 batch processing (#5861)
* review changes * Fixed public static count * review comments
This commit is contained in:
parent
547f99925f
commit
b0f62f4eca
@ -1,12 +1,22 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
import javax.batch.api.chunk.AbstractCheckpointAlgorithm;
|
import javax.batch.api.chunk.AbstractCheckpointAlgorithm;
|
||||||
|
import javax.batch.runtime.context.JobContext;
|
||||||
|
import javax.inject.Inject;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
|
||||||
@Named
|
@Named
|
||||||
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
|
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
JobContext jobContext;
|
||||||
|
|
||||||
|
private Integer counterRead = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isReadyToCheckpoint() throws Exception {
|
public boolean isReadyToCheckpoint() throws Exception {
|
||||||
return SimpleChunkItemReader.COUNT % 5 == 0;
|
counterRead = (Integer)jobContext.getTransientUserData();
|
||||||
|
System.out.println("counterRead : " + counterRead);
|
||||||
|
return counterRead % 5 == 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,20 +1,38 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import javax.batch.api.AbstractBatchlet;
|
import javax.batch.api.AbstractBatchlet;
|
||||||
import javax.batch.api.BatchProperty;
|
import javax.batch.api.BatchProperty;
|
||||||
import javax.batch.runtime.BatchStatus;
|
import javax.batch.runtime.BatchStatus;
|
||||||
|
import javax.batch.runtime.context.JobContext;
|
||||||
|
import javax.batch.runtime.context.StepContext;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
|
||||||
@Named
|
@Named
|
||||||
public class InjectSimpleBatchLet extends AbstractBatchlet {
|
public class InjectSimpleBatchLet extends AbstractBatchlet {
|
||||||
|
Logger logger = Logger.getLogger(InjectSimpleBatchLet.class.getName());
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@BatchProperty(name = "name")
|
@BatchProperty(name = "name")
|
||||||
private String nameString;
|
private String nameString;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
StepContext stepContext;
|
||||||
|
private Properties stepProperties;
|
||||||
|
@Inject
|
||||||
|
JobContext jobContext;
|
||||||
|
private Properties jobProperties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String process() throws Exception {
|
public String process() throws Exception {
|
||||||
System.out.println("Value passed in = " + nameString);
|
logger.info("BatchProperty : " + nameString);
|
||||||
|
stepProperties = stepContext.getProperties();
|
||||||
|
jobProperties = jobContext.getProperties();
|
||||||
|
logger.info("Step property : "+ stepProperties.getProperty("stepProp1"));
|
||||||
|
logger.info("job property : "+jobProperties.getProperty("jobProp1"));
|
||||||
return BatchStatus.COMPLETED.toString();
|
return BatchStatus.COMPLETED.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,20 +1,28 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
import javax.batch.api.chunk.AbstractItemReader;
|
import javax.batch.api.chunk.AbstractItemReader;
|
||||||
|
import javax.batch.runtime.BatchStatus;
|
||||||
|
import javax.batch.runtime.context.JobContext;
|
||||||
|
import javax.inject.Inject;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
|
||||||
@Named
|
@Named
|
||||||
public class SimpleChunkItemReader extends AbstractItemReader {
|
public class SimpleChunkItemReader extends AbstractItemReader {
|
||||||
private StringTokenizer tokens;
|
private StringTokenizer tokens;
|
||||||
public static int COUNT = 0;
|
private Integer count=0;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
JobContext jobContext;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer readItem() throws Exception {
|
public Integer readItem() throws Exception {
|
||||||
if (tokens.hasMoreTokens()) {
|
if (tokens.hasMoreTokens()) {
|
||||||
COUNT++;
|
this.count++;
|
||||||
String tempTokenize = tokens.nextToken();
|
String tempTokenize = tokens.nextToken();
|
||||||
|
jobContext.setTransientUserData(count);
|
||||||
return Integer.valueOf(tempTokenize);
|
return Integer.valueOf(tempTokenize);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
@ -24,4 +32,5 @@ public class SimpleChunkItemReader extends AbstractItemReader {
|
|||||||
public void open(Serializable checkpoint) throws Exception {
|
public void open(Serializable checkpoint) throws Exception {
|
||||||
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");
|
tokens = new StringTokenizer("1,2,3,4,5,6,7,8,9,10", ",");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4,17 +4,22 @@ import java.io.Serializable;
|
|||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import javax.batch.api.chunk.AbstractItemReader;
|
import javax.batch.api.chunk.AbstractItemReader;
|
||||||
|
import javax.batch.runtime.context.JobContext;
|
||||||
|
import javax.inject.Inject;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
|
||||||
@Named
|
@Named
|
||||||
public class SimpleChunkItemReaderError extends AbstractItemReader {
|
public class SimpleChunkItemReaderError extends AbstractItemReader {
|
||||||
|
@Inject
|
||||||
|
JobContext jobContext;
|
||||||
private StringTokenizer tokens;
|
private StringTokenizer tokens;
|
||||||
public static int COUNT = 0;
|
private Integer count = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Integer readItem() throws Exception {
|
public Integer readItem() throws Exception {
|
||||||
if (tokens.hasMoreTokens()) {
|
if (tokens.hasMoreTokens()) {
|
||||||
COUNT++;
|
count++;
|
||||||
|
jobContext.setTransientUserData(count);
|
||||||
int token = Integer.valueOf(tokens.nextToken());
|
int token = Integer.valueOf(tokens.nextToken());
|
||||||
if (token == 3) {
|
if (token == 3) {
|
||||||
throw new RuntimeException("Something happened");
|
throw new RuntimeException("Something happened");
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.batch.api.chunk.AbstractItemWriter;
|
import javax.batch.api.chunk.AbstractItemWriter;
|
||||||
@ -7,7 +8,9 @@ import javax.inject.Named;
|
|||||||
|
|
||||||
@Named
|
@Named
|
||||||
public class SimpleChunkWriter extends AbstractItemWriter {
|
public class SimpleChunkWriter extends AbstractItemWriter {
|
||||||
|
List<Integer> processed = new ArrayList<>();
|
||||||
@Override
|
@Override
|
||||||
public void writeItems(List<Object> items) throws Exception {
|
public void writeItems(List<Object> items) throws Exception {
|
||||||
|
items.stream().map(Integer.class::cast).forEach(this.processed::add);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<job id="injectSimpleBatchLet" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
|
<job id="injectSimpleBatchLet"
|
||||||
|
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
|
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
|
||||||
version="1.0">
|
version="1.0">
|
||||||
<step id="firstStep">
|
<step id="firstStep">
|
||||||
|
<properties>
|
||||||
|
<property name="stepProp1" value="value1" />
|
||||||
|
</properties>
|
||||||
<batchlet ref="injectSimpleBatchLet">
|
<batchlet ref="injectSimpleBatchLet">
|
||||||
<properties>
|
<properties>
|
||||||
<property name="name" value="helloThere" />
|
<property name="name" value="helloThere" />
|
||||||
|
@ -4,7 +4,13 @@
|
|||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
|
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
|
||||||
version="1.0">
|
version="1.0">
|
||||||
|
<properties>
|
||||||
|
<property name="jobProp1" value="job-value1"/>
|
||||||
|
</properties>
|
||||||
<step id="firstStep">
|
<step id="firstStep">
|
||||||
|
<properties>
|
||||||
|
<property name="stepProp1" value="step-value1" />
|
||||||
|
</properties>
|
||||||
<batchlet ref="injectSimpleBatchLet">
|
<batchlet ref="injectSimpleBatchLet">
|
||||||
<properties>
|
<properties>
|
||||||
<property name="name" value="#{partitionPlan['name']}" />
|
<property name="name" value="#{partitionPlan['name']}" />
|
||||||
|
@ -7,6 +7,7 @@ import javax.batch.runtime.BatchRuntime;
|
|||||||
import javax.batch.runtime.BatchStatus;
|
import javax.batch.runtime.BatchStatus;
|
||||||
import javax.batch.runtime.JobExecution;
|
import javax.batch.runtime.JobExecution;
|
||||||
import javax.batch.runtime.Metric;
|
import javax.batch.runtime.Metric;
|
||||||
|
import javax.batch.runtime.StepExecution;
|
||||||
|
|
||||||
public class BatchTestHelper {
|
public class BatchTestHelper {
|
||||||
private static final int MAX_TRIES = 40;
|
private static final int MAX_TRIES = 40;
|
||||||
@ -68,6 +69,16 @@ public class BatchTestHelper {
|
|||||||
return jobExecution;
|
return jobExecution;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getCommitCount(StepExecution stepExecution) {
|
||||||
|
Map<Metric.MetricType, Long> metricsMap = getMetricsMap(stepExecution.getMetrics());
|
||||||
|
return metricsMap.get(Metric.MetricType.COMMIT_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getProcessSkipCount(StepExecution stepExecution) {
|
||||||
|
Map<Metric.MetricType, Long> metricsMap = getMetricsMap(stepExecution.getMetrics());
|
||||||
|
return metricsMap.get(Metric.MetricType.PROCESS_SKIP_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<Metric.MetricType, Long> getMetricsMap(Metric[] metrics) {
|
public static Map<Metric.MetricType, Long> getMetricsMap(Metric[] metrics) {
|
||||||
Map<Metric.MetricType, Long> metricsMap = new HashMap<>();
|
Map<Metric.MetricType, Long> metricsMap = new HashMap<>();
|
||||||
for (Metric metric : metrics) {
|
for (Metric metric : metrics) {
|
||||||
|
@ -15,7 +15,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
|
|
||||||
class CustomCheckPointUnitTest {
|
class CustomCheckPointUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void givenChunk_whenCustomCheckPoint_thenCommitCount_3() throws Exception {
|
public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception {
|
||||||
JobOperator jobOperator = BatchRuntime.getJobOperator();
|
JobOperator jobOperator = BatchRuntime.getJobOperator();
|
||||||
Long executionId = jobOperator.start("customCheckPoint", new Properties());
|
Long executionId = jobOperator.start("customCheckPoint", new Properties());
|
||||||
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
|
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
|
||||||
@ -23,9 +23,10 @@ class CustomCheckPointUnitTest {
|
|||||||
for (StepExecution stepExecution : jobOperator.getStepExecutions(executionId)) {
|
for (StepExecution stepExecution : jobOperator.getStepExecutions(executionId)) {
|
||||||
if (stepExecution.getStepName()
|
if (stepExecution.getStepName()
|
||||||
.equals("firstChunkStep")) {
|
.equals("firstChunkStep")) {
|
||||||
Map<Metric.MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
|
jobOperator.getStepExecutions(executionId)
|
||||||
assertEquals(3L, metricsMap.get(Metric.MetricType.COMMIT_COUNT)
|
.stream()
|
||||||
.longValue());
|
.map(BatchTestHelper::getCommitCount)
|
||||||
|
.forEach(count -> assertEquals(3L, count.longValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
|
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
|
||||||
|
@ -1,18 +1,11 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import javax.batch.operations.JobOperator;
|
import javax.batch.operations.JobOperator;
|
||||||
import javax.batch.runtime.BatchRuntime;
|
import javax.batch.runtime.BatchRuntime;
|
||||||
import javax.batch.runtime.BatchStatus;
|
import javax.batch.runtime.BatchStatus;
|
||||||
import javax.batch.runtime.JobExecution;
|
import javax.batch.runtime.JobExecution;
|
||||||
import javax.batch.runtime.Metric;
|
|
||||||
import javax.batch.runtime.StepExecution;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
class SimpleBatchLetUnitTest {
|
class SimpleBatchLetUnitTest {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.baeldung.batch.understanding;
|
package com.baeldung.batch.understanding;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
@ -23,7 +24,6 @@ class SimpleErrorChunkUnitTest {
|
|||||||
Long executionId = jobOperator.start("simpleErrorChunk", new Properties());
|
Long executionId = jobOperator.start("simpleErrorChunk", new Properties());
|
||||||
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
|
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
|
||||||
jobExecution = BatchTestHelper.keepTestFailed(jobExecution);
|
jobExecution = BatchTestHelper.keepTestFailed(jobExecution);
|
||||||
System.out.println(jobExecution.getBatchStatus());
|
|
||||||
assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
|
assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,10 +37,10 @@ class SimpleErrorChunkUnitTest {
|
|||||||
for (StepExecution stepExecution : stepExecutions) {
|
for (StepExecution stepExecution : stepExecutions) {
|
||||||
if (stepExecution.getStepName()
|
if (stepExecution.getStepName()
|
||||||
.equals("errorStep")) {
|
.equals("errorStep")) {
|
||||||
Map<MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
|
jobOperator.getStepExecutions(executionId)
|
||||||
long skipCount = metricsMap.get(MetricType.PROCESS_SKIP_COUNT)
|
.stream()
|
||||||
.longValue();
|
.map(BatchTestHelper::getProcessSkipCount)
|
||||||
assertTrue("Skip count=" + skipCount, skipCount == 1l || skipCount == 2l);
|
.forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
|
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user