This commit is contained in:
leif stawnyczy 2024-03-15 09:45:06 -04:00
parent 65d9d0d34b
commit ea68bda5e6
3 changed files with 87 additions and 1 deletions

View File

@ -433,6 +433,57 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
assertEquals(1.0, jobInstance.getProgress());
}
@Test
public void testJobWithLongPollingStep() {
// create job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> first = (step, sink) -> {
myFirstStepLatch.call(1);
return RunOutcome.SUCCESS;
};
// step 2
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> second = (step, sink) -> {
// TODO - poll
return RunOutcome.SUCCESS;
};
// step 3
ILastJobStepWorker<TestJobParameters, SecondStepOutput> last = (step, sink) -> {
myLastStepLatch.call(1);
return RunOutcome.SUCCESS;
};
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"First step",
FirstStepOutput.class,
first
)
.addIntermediateStep(SECOND_STEP_ID,
"Second step",
SecondStepOutput.class,
second)
.addLastStep(
LAST_STEP_ID,
"Final step",
last
)
.completionHandler(myCompletionHandler)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
}
@Test
public void testFirstStepToSecondStep_doubleChunk_doesNotFastTrack() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
@ -465,7 +516,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
@Test
public void JobExecutionFailedException_CausesInstanceFailure() {
public void jobExecutionFailedException_CausesInstanceFailure() {
// setup
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
throw new JobExecutionFailedException("Expected Test Exception");

View File

@ -87,6 +87,11 @@ public class WorkChunk implements IModelJson {
@JsonDeserialize(using = JsonDateDeserializer.class)
private Date myUpdateTime;
@JsonProperty("nextPollTimestamp")
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
private Date myNextPollTime;
@JsonProperty(value = "recordsProcessed", access = JsonProperty.Access.READ_ONLY)
private Integer myRecordsProcessed;
@ -251,6 +256,14 @@ public class WorkChunk implements IModelJson {
myUpdateTime = theUpdateTime;
}
public Date getNextPollTime() {
return myNextPollTime;
}
public void setNextPollTime(Date theNextPollTime) {
myNextPollTime = theNextPollTime;
}
public String getWarningMessage() {
return myWarningMessage;
}
@ -276,6 +289,9 @@ public class WorkChunk implements IModelJson {
b.append("EndTime", myEndTime);
b.append("UpdateTime", myUpdateTime);
b.append("RecordsProcessed", myRecordsProcessed);
if (myNextPollTime != null) {
b.append("NextPollTime", myNextPollTime);
}
if (isNotBlank(myErrorMessage)) {
b.append("ErrorMessage", myErrorMessage);
}

View File

@ -32,11 +32,30 @@ import java.util.Set;
*/
public enum WorkChunkStatusEnum {
// wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel.
/**
* The initial state all workchunks start in
*/
READY,
/**
* The state of workchunks that have been sent to the queue;
* or of workchunks that are about to be processed in a final
* reduction step (these workchunks are never queued)
*/
QUEUED,
/**
* The state of workchunks that are doing work.
*/
IN_PROGRESS,
/**
* A workchunk status for workchunks that are doing long-polling work
* that will not complete in a reasonably short amount of time
*/
POLL_WAITING,
ERRORED,
FAILED,
/**
* The state of workchunks that have finished their job's step.
*/
COMPLETED;
private static final EnumMap<WorkChunkStatusEnum, Set<WorkChunkStatusEnum>> ourPriorStates;