Merge pull request #8888 from mikebski/BAEL-3646
[BAEL-3646] Conditional Flow in Spring Batch
This commit is contained in:
commit
e9bedab7be
|
@ -0,0 +1,21 @@
|
||||||
|
package org.baeldung.conditionalflow;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.boot.CommandLineRunner;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class ConditionalFlowApplication implements CommandLineRunner {
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(ConditionalFlowApplication.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(ConditionalFlowApplication.class, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(String... args) throws Exception {
|
||||||
|
logger.info("Running conditional flow application...");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package org.baeldung.conditionalflow;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.StepExecution;
|
||||||
|
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
|
||||||
|
import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
||||||
|
|
||||||
|
public class NumberInfoDecider implements JobExecutionDecider {
|
||||||
|
|
||||||
|
public static final String NOTIFY = "NOTIFY";
|
||||||
|
public static final String QUIET = "QUIET";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that determines notification status of job
|
||||||
|
* @return true if notifications should be sent.
|
||||||
|
*/
|
||||||
|
private boolean shouldNotify() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
|
||||||
|
if (shouldNotify()) {
|
||||||
|
return new FlowExecutionStatus(NOTIFY);
|
||||||
|
} else {
|
||||||
|
return new FlowExecutionStatus(QUIET);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
package org.baeldung.conditionalflow.config;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.NumberInfoDecider;
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.baeldung.conditionalflow.step.*;
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
|
||||||
|
import static org.baeldung.conditionalflow.NumberInfoDecider.NOTIFY;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableBatchProcessing
|
||||||
|
public class NumberInfoConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Qualifier("NotificationStep")
|
||||||
|
public Step notificationStep(StepBuilderFactory sbf) {
|
||||||
|
return sbf.get("Notify step")
|
||||||
|
.tasklet(new NotifierTasklet())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Step numberGeneratorStep(StepBuilderFactory sbf, int[] values, String prepend) {
|
||||||
|
return sbf.get("Number generator")
|
||||||
|
.<NumberInfo, Integer> chunk(1)
|
||||||
|
.reader(new NumberInfoGenerator(values))
|
||||||
|
.processor(new NumberInfoClassifier())
|
||||||
|
.writer(new PrependingStdoutWriter<>(prepend))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Step numberGeneratorStepDecider(StepBuilderFactory sbf, int[] values, String prepend) {
|
||||||
|
return sbf.get("Number generator decider")
|
||||||
|
.<NumberInfo, Integer> chunk(1)
|
||||||
|
.reader(new NumberInfoGenerator(values))
|
||||||
|
.processor(new NumberInfoClassifierWithDecider())
|
||||||
|
.writer(new PrependingStdoutWriter<>(prepend))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Qualifier("first_job")
|
||||||
|
public Job numberGeneratorNonNotifierJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, @Qualifier("NotificationStep") Step notificationStep) {
|
||||||
|
int[] nonNotifierData = { -1, -2, -3 };
|
||||||
|
Step step = numberGeneratorStep(stepBuilderFactory, nonNotifierData, "First Dataset Processor");
|
||||||
|
return jobBuilderFactory.get("Number generator - first dataset")
|
||||||
|
.start(step)
|
||||||
|
.on(NOTIFY)
|
||||||
|
.to(notificationStep)
|
||||||
|
.from(step)
|
||||||
|
.on("*")
|
||||||
|
.stop()
|
||||||
|
.end()
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Qualifier("second_job")
|
||||||
|
public Job numberGeneratorNotifierJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, @Qualifier("NotificationStep") Step notificationStep) {
|
||||||
|
int[] billableData = { 11, -2, -3 };
|
||||||
|
Step dataProviderStep = numberGeneratorStep(stepBuilderFactory, billableData, "Second Dataset Processor");
|
||||||
|
return jobBuilderFactory.get("Number generator - second dataset")
|
||||||
|
.start(dataProviderStep)
|
||||||
|
.on(NOTIFY)
|
||||||
|
.to(notificationStep)
|
||||||
|
.end()
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Qualifier("third_job")
|
||||||
|
@Primary
|
||||||
|
public Job numberGeneratorNotifierJobWithDecider(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, @Qualifier("NotificationStep") Step notificationStep) {
|
||||||
|
int[] billableData = { 11, -2, -3 };
|
||||||
|
Step dataProviderStep = numberGeneratorStepDecider(stepBuilderFactory, billableData, "Third Dataset Processor");
|
||||||
|
return jobBuilderFactory.get("Number generator - third dataset")
|
||||||
|
.start(dataProviderStep)
|
||||||
|
.next(new NumberInfoDecider())
|
||||||
|
.on(NOTIFY)
|
||||||
|
.to(notificationStep)
|
||||||
|
.end()
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package org.baeldung.conditionalflow.model;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public class NumberInfo {
|
||||||
|
private int number;
|
||||||
|
|
||||||
|
public NumberInfo(int number) {
|
||||||
|
this.number = number;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NumberInfo from(int number) {
|
||||||
|
return new NumberInfo(number);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPositive() {
|
||||||
|
return number > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEven() {
|
||||||
|
return number % 2 == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumber() {
|
||||||
|
return number;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o)
|
||||||
|
return true;
|
||||||
|
if (o == null || getClass() != o.getClass())
|
||||||
|
return false;
|
||||||
|
NumberInfo that = (NumberInfo) o;
|
||||||
|
return number == that.number;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(number);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "NumberInfo{" + "number=" + number + '}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.StepContribution;
|
||||||
|
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||||
|
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||||
|
import org.springframework.batch.repeat.RepeatStatus;
|
||||||
|
|
||||||
|
public class NotifierTasklet implements Tasklet {
|
||||||
|
@Override
|
||||||
|
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
|
||||||
|
System.err.println("[" + chunkContext.getStepContext()
|
||||||
|
.getJobName() + "] contains interesting data!!");
|
||||||
|
return RepeatStatus.FINISHED;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.springframework.batch.core.ExitStatus;
|
||||||
|
import org.springframework.batch.core.StepExecution;
|
||||||
|
import org.springframework.batch.core.annotation.BeforeStep;
|
||||||
|
import org.springframework.batch.core.listener.ItemListenerSupport;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
|
||||||
|
import static org.baeldung.conditionalflow.NumberInfoDecider.NOTIFY;
|
||||||
|
import static org.baeldung.conditionalflow.NumberInfoDecider.QUIET;
|
||||||
|
|
||||||
|
public class NumberInfoClassifier extends ItemListenerSupport<NumberInfo, Integer>
|
||||||
|
implements ItemProcessor<NumberInfo, Integer> {
|
||||||
|
private StepExecution stepExecution;
|
||||||
|
|
||||||
|
@BeforeStep
|
||||||
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
|
this.stepExecution = stepExecution;
|
||||||
|
this.stepExecution.setExitStatus(new ExitStatus(QUIET));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterProcess(NumberInfo item, Integer result) {
|
||||||
|
super.afterProcess(item, result);
|
||||||
|
if (item.isPositive()) {
|
||||||
|
stepExecution.setExitStatus(new ExitStatus(NOTIFY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer process(NumberInfo numberInfo) throws Exception {
|
||||||
|
return Integer.valueOf(numberInfo.getNumber());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.springframework.batch.core.listener.ItemListenerSupport;
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
|
||||||
|
public class NumberInfoClassifierWithDecider extends ItemListenerSupport<NumberInfo, Integer> implements ItemProcessor<NumberInfo, Integer> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer process(NumberInfo numberInfo) throws Exception {
|
||||||
|
return Integer.valueOf(numberInfo.getNumber());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.springframework.batch.item.ItemReader;
|
||||||
|
|
||||||
|
public class NumberInfoGenerator implements ItemReader<NumberInfo> {
|
||||||
|
private int[] values;
|
||||||
|
private int counter;
|
||||||
|
|
||||||
|
public NumberInfoGenerator(int[] values) {
|
||||||
|
this.values = values;
|
||||||
|
counter = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NumberInfo read() {
|
||||||
|
if (counter == values.length) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return new NumberInfo(values[counter++]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
|
||||||
|
public class PrependingStdoutWriter<T> implements ItemWriter<T> {
|
||||||
|
private String prependText;
|
||||||
|
|
||||||
|
public PrependingStdoutWriter(String prependText) {
|
||||||
|
this.prependText = prependText;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(List<? extends T> list) {
|
||||||
|
for (T listItem : list) {
|
||||||
|
System.out.println(prependText + " " + listItem.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
package org.baeldung.conditionalflow;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.config.NumberInfoConfig;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.batch.core.ExitStatus;
|
||||||
|
import org.springframework.batch.core.JobExecution;
|
||||||
|
import org.springframework.batch.core.StepExecution;
|
||||||
|
import org.springframework.batch.test.JobLauncherTestUtils;
|
||||||
|
import org.springframework.batch.test.context.SpringBatchTest;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.TestExecutionListeners;
|
||||||
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
|
||||||
|
import org.springframework.test.context.support.DirtiesContextTestExecutionListener;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@RunWith(SpringRunner.class)
|
||||||
|
@SpringBatchTest
|
||||||
|
@EnableAutoConfiguration
|
||||||
|
@ContextConfiguration(classes = { NumberInfoConfig.class })
|
||||||
|
@TestExecutionListeners({ DependencyInjectionTestExecutionListener.class, DirtiesContextTestExecutionListener.class })
|
||||||
|
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
|
||||||
|
public class DeciderJobIntegrationTest {
|
||||||
|
@Autowired
|
||||||
|
private JobLauncherTestUtils jobLauncherTestUtils;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenNumberGeneratorDecider_whenDeciderRuns_thenStatusIsNotify() throws Exception {
|
||||||
|
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
|
||||||
|
Collection<StepExecution> actualStepExecutions = jobExecution.getStepExecutions();
|
||||||
|
ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
|
||||||
|
|
||||||
|
assertEquals("COMPLETED", actualJobExitStatus.getExitCode()
|
||||||
|
.toString());
|
||||||
|
assertEquals(2, actualStepExecutions.size());
|
||||||
|
boolean notifyStepDidRun = false;
|
||||||
|
Iterator<StepExecution> iterator = actualStepExecutions.iterator();
|
||||||
|
while (iterator.hasNext() && !notifyStepDidRun) {
|
||||||
|
if (iterator.next()
|
||||||
|
.getStepName()
|
||||||
|
.equals("Notify step")) {
|
||||||
|
notifyStepDidRun = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(notifyStepDidRun);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
package org.baeldung.conditionalflow.model;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
class NumberInfoUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenPositive_whenFrom_isPositive() {
|
||||||
|
assertTrue(NumberInfo.from(1)
|
||||||
|
.isPositive());
|
||||||
|
assertTrue(NumberInfo.from(11)
|
||||||
|
.isPositive());
|
||||||
|
assertFalse(NumberInfo.from(0)
|
||||||
|
.isPositive());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenNegative_whenFrom_isNegative() {
|
||||||
|
assertFalse(NumberInfo.from(-1)
|
||||||
|
.isPositive());
|
||||||
|
assertFalse(NumberInfo.from(-10)
|
||||||
|
.isPositive());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenEven_whenFrom_isEven() {
|
||||||
|
assertTrue(NumberInfo.from(0)
|
||||||
|
.isEven());
|
||||||
|
assertTrue(NumberInfo.from(-2)
|
||||||
|
.isEven());
|
||||||
|
assertTrue(NumberInfo.from(2)
|
||||||
|
.isEven());
|
||||||
|
assertTrue(NumberInfo.from(-22)
|
||||||
|
.isEven());
|
||||||
|
assertTrue(NumberInfo.from(22)
|
||||||
|
.isEven());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenOdd_whenFrom_isOdd() {
|
||||||
|
|
||||||
|
assertFalse(NumberInfo.from(1)
|
||||||
|
.isEven());
|
||||||
|
assertFalse(NumberInfo.from(-1)
|
||||||
|
.isEven());
|
||||||
|
|
||||||
|
assertFalse(NumberInfo.from(13)
|
||||||
|
.isEven());
|
||||||
|
assertFalse(NumberInfo.from(-13)
|
||||||
|
.isEven());
|
||||||
|
assertFalse(NumberInfo.from(31)
|
||||||
|
.isEven());
|
||||||
|
assertFalse(NumberInfo.from(-51)
|
||||||
|
.isEven());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void giveGeneratedInt_whenFrom_isNumberFromGenerator() {
|
||||||
|
for (int i = -100; i < 100; i++) {
|
||||||
|
assertEquals(i, NumberInfo.from(i)
|
||||||
|
.getNumber());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
class NumberInfoClassifierUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenNumberInfo_whenProcess_thenConvertsToInteger() throws Exception {
|
||||||
|
NumberInfoClassifier nic = new NumberInfoClassifier();
|
||||||
|
assertEquals(Integer.valueOf(4), nic.process(NumberInfo.from(4)));
|
||||||
|
assertEquals(Integer.valueOf(-4), nic.process(NumberInfo.from(-4)));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package org.baeldung.conditionalflow.step;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class NumberInfoGeneratorUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenArray_whenGenerator_correctOrderAndValue() {
|
||||||
|
int[] numbers = new int[] { 1, -2, 4, -10 };
|
||||||
|
NumberInfoGenerator numberGenerator = new NumberInfoGenerator(numbers);
|
||||||
|
assertEquals(new NumberInfo(numbers[0]), numberGenerator.read());
|
||||||
|
assertEquals(new NumberInfo(numbers[1]), numberGenerator.read());
|
||||||
|
assertEquals(new NumberInfo(numbers[2]), numberGenerator.read());
|
||||||
|
assertEquals(new NumberInfo(numbers[3]), numberGenerator.read());
|
||||||
|
assertNull(numberGenerator.read());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue