- have only one path through the equeueReady method

- fixed tests
This commit is contained in:
tyner 2024-04-10 07:51:04 -04:00
parent 0aabdbdf4f
commit 1c1ceb0b27
8 changed files with 166 additions and 182 deletions

View File

@ -132,13 +132,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
private String storeFirstWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) {
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, false);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
@Test
public void testStoreAndFetchInstance() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
@ -151,7 +156,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(instance.getReport(), foundInstance.getReport());
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
}
@ -372,7 +377,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance(true, false);
String instanceId = mySvc.storeNewInstance(instance);
Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
Date updateTime = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime()));
sleepUntilTimeChange();
@ -380,7 +385,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId));
// Verify
Date updateTime2 = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime()));
Date updateTime2 = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime()));
assertNotEquals(updateTime, updateTime2);
}
@ -389,20 +394,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// setup
JobInstance instance = createInstance(true, true);
String instanceId = mySvc.storeNewInstance(instance);
String chunkIdFirstStep = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
runInTransaction(() -> assertEquals(TARGET_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId()));
runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()));
// execute
runInTransaction(() -> mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID));
// verify
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(LAST_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId()));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
assertEquals(LAST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId());
});
}
@Test
@ -416,17 +421,21 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true);
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdFirstStep).getStatus());
assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
});
// execute
runInTransaction(() -> mySvc.updateAllChunksForStepWithStatus(instanceId, LAST_STEP_ID, expectedChangedStatus, WorkChunkStatusEnum.GATE_WAITING));
// verify
runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdFirstStep).getStatus());
assertEquals(expectedChangedStatus, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus());
assertEquals(expectedChangedStatus, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus());
});
}
@Test
@ -445,15 +454,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, theGatedExecution);
runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, theGatedExecution);
runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, findChunkByIdOrThrow(id).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
if (WorkChunkStatusEnum.READY.equals(theExpectedStatusAfterTransition)) {
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, findChunkByIdOrThrow(id).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
@ -469,30 +473,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
// execute & verify
String firstChunkId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData, isGatedExecution);
String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData);
String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
WorkChunk actualFirstChunkData = mySvc.onWorkChunkDequeue(firstChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(firstChunkId).getStatus()));
assertEquals(expectedFirstChunkData, actualFirstChunkData.getData());
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(firstChunkId, 50, 0));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus());
});
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> {
assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus());
assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(secondChunkId).getStatus());
});
WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus()));
assertEquals(expectedSecondChunkData, actualSecondChunkData.getData());
}
@ -572,16 +584,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
JobInstance instance = createInstance(true, theGatedExecution);
String instanceId = mySvc.storeNewInstance(instance);
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
runInTransaction(() -> assertEquals(theExpectedCreatedStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(theExpectedCreatedStatus, findChunkByIdOrThrow(id).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(theExpectedTransitionStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
if (WorkChunkStatusEnum.READY.equals(theExpectedTransitionStatus)){
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
runInTransaction(() -> assertEquals(theExpectedTransitionStatus, findChunkByIdOrThrow(id).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
@ -590,7 +597,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus()));
}
@Test
@ -601,9 +608,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -613,13 +620,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(chunkId).getStatus()));
sleepUntilTimeChange();
mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0));
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
@ -639,9 +646,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -652,7 +659,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message");
mySvc.onWorkChunkError(request);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -668,7 +675,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2");
mySvc.onWorkChunkError(request2);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -692,9 +699,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus()));
myBatch2JobHelper.runMaintenancePass();
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus()));
WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
@ -704,7 +711,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
mySvc.onWorkChunkFailed(chunkId, "This is an error message");
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId);
assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -863,4 +870,12 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
Arguments.of(WorkChunkStatusEnum.COMPLETED, false)
);
}
private Batch2JobInstanceEntity findInstanceByIdOrThrow(String instanceId) {
return myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
}
private Batch2WorkChunkEntity findChunkByIdOrThrow(String secondChunkId) {
return myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new);
}
}

View File

@ -98,9 +98,9 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
.setJobDefinitionVersion(JOB_DEF_VER)
.setJobDescription("A job description")
.setParametersType(TestJobParameters.class)
.addFirstStep(TARGET_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addIntermediateStep("2nd-step-id", "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0));
.addFirstStep(FIRST_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addIntermediateStep(SECOND_STEP_ID, "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0))
.addLastStep(LAST_STEP_ID, "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0));
if (theIsGatedBoolean) {
builder.gatedExecution();
}
@ -177,6 +177,11 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
public String storeFirstWorkChunk(JobDefinition<TestJobParameters> theJobDefinition, String theInstanceId) {
WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, theInstanceId);
return mySvc.onWorkChunkCreate(batchWorkChunk);
}
public abstract PlatformTransactionManager getTxManager();
public JobInstance freshFetchJobInstance(String theInstanceId) {
@ -233,11 +238,19 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa
}
public String createChunk(String theInstanceId) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, false);
return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, false);
}
public String createChunk(String theInstanceId, boolean theGatedExecution) {
return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public String createChunkInStep(String theInstanceId, String theStepId, boolean theGatedExecution) {
return storeWorkChunk(JOB_DEFINITION_ID, theStepId, theInstanceId, 0, CHUNK_DATA, theGatedExecution);
}
public String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId){
return storeFirstWorkChunk(theJobDefinition, theJobInstanceId);
}
public void enableMaintenanceRunner(boolean theToEnable) {

View File

@ -79,8 +79,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC
1|COMPLETED
2|COMPLETED
2|IN_PROGRESS
3|READY
3|READY
3|GATE_WAITING
3|GATE_WAITING
""",
"""
# when current step is not all queued, should queue READY chunks

View File

@ -64,6 +64,10 @@ public interface ITestFixture {
String createChunk(String theJobInstanceId, boolean theGatedExecution);
String createChunkInStep(String theJobInstanceId, String theStepId, boolean theGatedExecution);
String createFirstChunk(JobDefinition<TestJobParameters> theJobDefinition, String theJobInstanceId);
/**
* Enable/disable the maintenance runner (So it doesn't run on a scheduler)
*/

View File

@ -21,15 +21,20 @@ package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import ca.uhn.test.concurrency.PointcutLatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import static org.junit.jupiter.api.Assertions.assertEquals;
public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants {
@ -41,7 +46,7 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
"false, READY",
"true, GATE_WAITING"
})
default void chunkCreation_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
default void chunkCreation_nonFirstChunk_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) {
String jobInstanceId = getTestManager().createAndStoreJobInstance(null);
String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution);
@ -50,17 +55,24 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
}
@ParameterizedTest
@CsvSource({
"false",
"true"
})
default void chunkReceived_queuedToInProgress(boolean theGatedExecution) throws InterruptedException {
@ValueSource(booleans = {true, false})
default void chunkCreation_firstChunk_isInReady(boolean theGatedExecution) {
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(theGatedExecution);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String myChunkId = getTestManager().createFirstChunk(jobDef, jobInstanceId);
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId);
assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are " + WorkChunkStatusEnum.READY);
}
@Test
default void chunkReceived_forNongatedJob_queuedToInProgress() throws InterruptedException {
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(1);
JobDefinition<?> jobDef = getTestManager().withJobDefinition(false);
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(false);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution);
String myChunkId = getTestManager().createChunk(jobInstanceId, false);
getTestManager().runMaintenancePass();
// the worker has received the chunk, and marks it started.
@ -75,6 +87,40 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 1);
}
@Test
default void chunkReceived_forGatedJob_queuedToInProgress() throws InterruptedException {
PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler();
sendLatch.setExpectedCount(2);
JobDefinition<TestJobParameters> jobDef = getTestManager().withJobDefinition(true);
String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef);
String chunkInStep1 = getTestManager().createFirstChunk(jobDef, jobInstanceId);
String chunkInStep2 = getTestManager().createChunkInStep(jobInstanceId, SECOND_STEP_ID, true);
// dequeue and completes the first chunk
getTestManager().runMaintenancePass();
// the worker has received the chunk, and marks it started.
WorkChunk chunk1 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep1).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk1.getStatus());
WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus());
getTestManager().getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkInStep1, 50, 0));
fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1);
assertEquals(WorkChunkStatusEnum.COMPLETED, fetchedWorkChunk.getStatus());
// dequeue the second chunk
getTestManager().runMaintenancePass();
WorkChunk chunk2 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep2).orElseThrow(IllegalArgumentException::new);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk2.getStatus());
assertEquals(CHUNK_DATA, chunk2.getData());
WorkChunk fetchedWorkChunk2 = getTestManager().freshFetchWorkChunk(chunkInStep2);
assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk2.getStatus());
getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 2);
}
@Test
default void enqueueWorkChunkForProcessing_enqueuesOnlyREADYChunks() throws InterruptedException {
// setup

View File

@ -26,15 +26,12 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNull;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation;
import ca.uhn.test.concurrency.PointcutLatch;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@ -55,7 +52,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
JobInstance instance = createInstance();
String instanceId = getTestManager().getSvc().storeNewInstance(instance);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, false);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null, false);
getTestManager().runInTransaction(() -> {
WorkChunk chunk = getTestManager().freshFetchWorkChunk(id);
@ -74,7 +71,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC
getTestManager().enableMaintenanceRunner(false);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution);
assertNotNull(id);
getTestManager().runInTransaction(() -> assertEquals(expectedStatus, getTestManager().freshFetchWorkChunk(id).getStatus()));

View File

@ -24,7 +24,9 @@ public interface WorkChunkTestConstants {
// we use a separate id for gated jobs because these job definitions might not
// be cleaned up after any given test run
String GATED_JOB_DEFINITION_ID = "gated_job_def_id";
public static final String TARGET_STEP_ID = "step-id";
public static final String FIRST_STEP_ID = "step-id";
public static final String SECOND_STEP_ID = "2nd-step-id";
public static final String LAST_STEP_ID = "last-step-id";
public static final String DEF_CHUNK_ID = "definition-chunkId";
public static final int JOB_DEF_VER = 1;
public static final int SEQUENCE_NUMBER = 1;

View File

@ -99,9 +99,9 @@ public class JobInstanceProcessor {
JobDefinition<? extends IModelJson> jobDefinition =
myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
enqueueReadyChunks(theInstance, jobDefinition, false);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance, jobDefinition);
enqueueReadyChunks(theInstance, jobDefinition);
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
@ -192,19 +192,15 @@ public class JobInstanceProcessor {
String instanceId = theInstance.getInstanceId();
String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor);
boolean canAdvance = canAdvanceGatedJob(theInstance);
if (canAdvance) {
if (jobWorkCursor.isReductionStep()) {
// current step is the reduction step (all reduction steps are final)
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else if (jobWorkCursor.isFinalStep()) {
// current step is the final step in a non-reduction gated job
processChunksForNextGatedSteps(
theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId());
} else {
// all other gated job steps
} else if (!jobWorkCursor.isFinalStep()) {
// all other gated job steps except for final steps
String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -213,7 +209,7 @@ public class JobInstanceProcessor {
nextStepId);
// otherwise, continue processing as expected
processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId);
processChunksForNextGatedSteps(theInstance, nextStepId);
}
} else {
String stepId = jobWorkCursor.nextStep != null
@ -227,8 +223,7 @@ public class JobInstanceProcessor {
}
}
private boolean canAdvanceGatedJob(
JobDefinition<?> theJobDefinition, JobInstance theInstance, JobWorkCursor<?, ?, ?> theWorkCursor) {
private boolean canAdvanceGatedJob(JobInstance theInstance) {
// make sure our instance still exists
if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
// no more job
@ -245,49 +240,9 @@ public class JobInstanceProcessor {
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING)) && theWorkCursor.isFirstStep()) {
// We are in the first step and all chunks are in GATE_WAITING
// this means that the job has just started, no workchunks have been queued yet -> proceed.
return true;
}
if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// all workchunks for the current step are in COMPLETED -> proceed.
return true;
}
// all workchunks for gated jobs should be turned to QUEUED immediately after they are set to READY
// but in case we die in between, this conditional ought to catch the READY chunks.
if (workChunkStatuses.contains(WorkChunkStatusEnum.READY)) {
if (theWorkCursor.isFirstStep()) {
// first step - all ready means we're ready to proceed to enqueue
return true;
} else {
// it's a future step;
// make sure previous step's workchunks are completed
JobDefinitionStep<?, ?, ?> previousStep =
theJobDefinition.getSteps().get(0);
for (JobDefinitionStep<?, ?, ?> step : theJobDefinition.getSteps()) {
if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) {
break;
}
previousStep = step;
}
Set<WorkChunkStatusEnum> previousStepWorkChunkStates =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
theInstance.getInstanceId(), previousStep.getStepId());
// completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed)
if (previousStepWorkChunkStates.isEmpty()
|| previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
return true;
}
}
}
// anything else
return false;
}
// all workchunks for the current step are in COMPLETED -> proceed.
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}
protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
@ -311,27 +266,12 @@ public class JobInstanceProcessor {
* completing prematurely.
*/
private void enqueueReadyChunks(
JobInstance theJobInstance, JobDefinition<?> theJobDefinition, boolean theIsGatedExecutionBool) {
JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();
AtomicInteger counter = new AtomicInteger();
ConcurrentHashMap<String, JobWorkCursor<?, ?, ?>> stepToWorkCursor = new ConcurrentHashMap<>();
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();
JobWorkCursor<?, ?, ?> jobWorkCursor = stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> {
return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId());
});
counter.getAndIncrement();
if (!theIsGatedExecutionBool && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) {
/*
* Gated executions are queued later when all work chunks are ready.
*
* Reduction steps are not submitted to the queue at all, but processed inline.
* Currently all reduction steps are also gated, but this might not always
* be true.
*/
return;
}
/*
* For each chunk id
@ -343,7 +283,7 @@ public class JobInstanceProcessor {
updateChunkAndSendToQueue(metadata);
}
ourLog.debug(
"Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId());
"Encountered {} READY work chunks for job {} of type {}", counter.get(), theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId());
}
/**
@ -388,8 +328,6 @@ public class JobInstanceProcessor {
private void processChunksForNextGatedSteps(
JobInstance theInstance,
JobDefinition<?> theJobDefinition,
JobWorkCursor<?, ?, ?> theWorkCursor,
String nextStepId) {
String instanceId = theInstance.getInstanceId();
List<String> readyChunksForNextStep =
@ -404,38 +342,7 @@ public class JobInstanceProcessor {
readyChunksForNextStep.size());
}
boolean isEnqueue;
String currentStepId = theWorkCursor.getCurrentStepId();
Set<WorkChunkStatusEnum> workChunkStatusesForCurrentStep =
myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), currentStepId);
if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING))
&& theWorkCursor.isFirstStep()) {
// this means that the job has just started, no workchunks have been queued yet
// turn the first chunk to READY, do NOT advance the step.
isEnqueue = myJobPersistence.updateAllChunksForStepWithStatus(
instanceId, currentStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.GATE_WAITING)
!= 0;
} else if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) {
// update the job step so the workers will process them.
// if it's the last (gated) step, there will be no change - but we should
// queue up the chunks anyway
isEnqueue = theWorkCursor.isFinalStep()
|| myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId);
} else {
// this means that the current step's chunks contains only READY and QUEUED chunks, possibly leftover from
// other maintenance job who died in the middle
// should enqueue the rest of the ready chunks
isEnqueue = true;
}
if (!isEnqueue) {
// we collided with another maintenance job.
ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId);
return;
}
// because we now have all gated job chunks in READY state,
// we can enqueue them
enqueueReadyChunks(theInstance, theJobDefinition, true);
// update the job step so the workers will process them.
myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId);
}
}