From 1c1ceb0b27be9f6cdf02054ad11ae4aa287d2dad Mon Sep 17 00:00:00 2001 From: tyner Date: Wed, 10 Apr 2024 07:51:04 -0400 Subject: [PATCH] - have only one path through the equeueReady method - fixed tests --- .../jpa/batch2/JpaJobPersistenceImplTest.java | 125 ++++++++++-------- ...tractIJobPersistenceSpecificationTest.java | 23 +++- .../batch2/test/IJobMaintenanceActions.java | 4 +- .../hapi/fhir/batch2/test/ITestFixture.java | 4 + .../test/IWorkChunkStateTransitions.java | 62 +++++++-- .../batch2/test/IWorkChunkStorageTests.java | 7 +- .../batch2/test/WorkChunkTestConstants.java | 4 +- .../maintenance/JobInstanceProcessor.java | 119 ++--------------- 8 files changed, 166 insertions(+), 182 deletions(-) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index f2da87cd00f..d872def6b83 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -132,13 +132,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { return mySvc.onWorkChunkCreate(batchWorkChunk); } + private String storeFirstWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, false); + return mySvc.onWorkChunkCreate(batchWorkChunk); + } + @Test public void testStoreAndFetchInstance() { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); runInTransaction(() -> { - Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId); assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus()); }); @@ -151,7 +156,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(instance.getReport(), foundInstance.getReport()); runInTransaction(() -> { - Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId); assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus()); }); } @@ -372,7 +377,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(true, false); String instanceId = mySvc.storeNewInstance(instance); - Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime())); + Date updateTime = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime())); sleepUntilTimeChange(); @@ -380,7 +385,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId)); // Verify - Date updateTime2 = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime())); + Date updateTime2 = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime())); assertNotEquals(updateTime, updateTime2); } @@ -389,20 +394,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { // setup JobInstance instance = createInstance(true, true); String instanceId = mySvc.storeNewInstance(instance); - String chunkIdFirstStep = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, true); String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); - runInTransaction(() -> assertEquals(TARGET_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId())); + runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId())); // execute runInTransaction(() -> mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID)); // verify - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(LAST_STEP_ID, myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new).getCurrentGatedStepId())); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus()); + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus()); + assertEquals(LAST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()); + }); } @Test @@ -416,17 +421,21 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); - runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdFirstStep).getStatus()); + assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus()); + assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus()); + }); // execute runInTransaction(() -> mySvc.updateAllChunksForStepWithStatus(instanceId, LAST_STEP_ID, expectedChangedStatus, WorkChunkStatusEnum.GATE_WAITING)); // verify - runInTransaction(() -> assertEquals(expectedNoChangeStatus, myWorkChunkRepository.findById(chunkIdFirstStep).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep1).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(expectedChangedStatus, myWorkChunkRepository.findById(chunkIdSecondStep2).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(expectedNoChangeStatus, findChunkByIdOrThrow(chunkIdFirstStep).getStatus()); + assertEquals(expectedChangedStatus, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus()); + assertEquals(expectedChangedStatus, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus()); + }); } @Test @@ -445,15 +454,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String instanceId = mySvc.storeNewInstance(instance); // execute & verify - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, theGatedExecution); - runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, theGatedExecution); + runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, findChunkByIdOrThrow(id).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); - - if (WorkChunkStatusEnum.READY.equals(theExpectedStatusAfterTransition)) { - myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); - } + runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, findChunkByIdOrThrow(id).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertNull(chunk.getData()); @@ -469,30 +473,38 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String instanceId = mySvc.storeNewInstance(instance); // execute & verify - String firstChunkId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData, isGatedExecution); + String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, expectedFirstChunkData); String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); WorkChunk actualFirstChunkData = mySvc.onWorkChunkDequeue(firstChunkId).orElseThrow(IllegalArgumentException::new); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(firstChunkId).getStatus())); assertEquals(expectedFirstChunkData, actualFirstChunkData.getData()); mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(firstChunkId, 50, 0)); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.GATE_WAITING, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.COMPLETED, myWorkChunkRepository.findById(firstChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus())); assertEquals(expectedSecondChunkData, actualSecondChunkData.getData()); } @@ -572,16 +584,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); + String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); assertNotNull(id); - runInTransaction(() -> assertEquals(theExpectedCreatedStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(theExpectedCreatedStatus, findChunkByIdOrThrow(id).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(theExpectedTransitionStatus, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); - - if (WorkChunkStatusEnum.READY.equals(theExpectedTransitionStatus)){ - myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); - } + runInTransaction(() -> assertEquals(theExpectedTransitionStatus, findChunkByIdOrThrow(id).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertEquals(36, chunk.getInstanceId().length()); @@ -590,7 +597,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); assertEquals(CHUNK_DATA, chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus())); } @Test @@ -601,9 +608,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -613,13 +620,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNull(chunk.getEndTime()); assertNull(chunk.getRecordsProcessed()); assertNotNull(chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(chunkId).getStatus())); sleepUntilTimeChange(); mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); assertEquals(50, entity.getRecordsProcessed()); assertNotNull(entity.getCreateTime()); @@ -639,9 +646,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -652,7 +659,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message"); mySvc.onWorkChunkError(request); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals("This is an error message", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -668,7 +675,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2"); mySvc.onWorkChunkError(request2); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals("This is an error message 2", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -692,9 +699,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -704,7 +711,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { mySvc.onWorkChunkFailed(chunkId, "This is an error message"); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); assertEquals("This is an error message", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -863,4 +870,12 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { Arguments.of(WorkChunkStatusEnum.COMPLETED, false) ); } + + private Batch2JobInstanceEntity findInstanceByIdOrThrow(String instanceId) { + return myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + } + + private Batch2WorkChunkEntity findChunkByIdOrThrow(String secondChunkId) { + return myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new); + } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index 53608062525..0d2a00bc622 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -98,9 +98,9 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa .setJobDefinitionVersion(JOB_DEF_VER) .setJobDescription("A job description") .setParametersType(TestJobParameters.class) - .addFirstStep(TARGET_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) - .addIntermediateStep("2nd-step-id", "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) - .addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)); + .addFirstStep(FIRST_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) + .addIntermediateStep(SECOND_STEP_ID, "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) + .addLastStep(LAST_STEP_ID, "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)); if (theIsGatedBoolean) { builder.gatedExecution(); } @@ -177,6 +177,11 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa return mySvc.onWorkChunkCreate(batchWorkChunk); } + public String storeFirstWorkChunk(JobDefinition theJobDefinition, String theInstanceId) { + WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, theInstanceId); + return mySvc.onWorkChunkCreate(batchWorkChunk); + } + public abstract PlatformTransactionManager getTxManager(); public JobInstance freshFetchJobInstance(String theInstanceId) { @@ -233,11 +238,19 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa } public String createChunk(String theInstanceId) { - return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, false); + return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, false); } public String createChunk(String theInstanceId, boolean theGatedExecution) { - return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution); + return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution); + } + + public String createChunkInStep(String theInstanceId, String theStepId, boolean theGatedExecution) { + return storeWorkChunk(JOB_DEFINITION_ID, theStepId, theInstanceId, 0, CHUNK_DATA, theGatedExecution); + } + + public String createFirstChunk(JobDefinition theJobDefinition, String theJobInstanceId){ + return storeFirstWorkChunk(theJobDefinition, theJobInstanceId); } public void enableMaintenanceRunner(boolean theToEnable) { diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java index 1fa09807103..843ddecb7c8 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java @@ -79,8 +79,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 1|COMPLETED 2|COMPLETED 2|IN_PROGRESS - 3|READY - 3|READY + 3|GATE_WAITING + 3|GATE_WAITING """, """ # when current step is not all queued, should queue READY chunks diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java index 8263ac0eb98..47a4230728a 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java @@ -64,6 +64,10 @@ public interface ITestFixture { String createChunk(String theJobInstanceId, boolean theGatedExecution); + String createChunkInStep(String theJobInstanceId, String theStepId, boolean theGatedExecution); + + String createFirstChunk(JobDefinition theJobDefinition, String theJobInstanceId); + /** * Enable/disable the maintenance runner (So it doesn't run on a scheduler) */ diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java index 084b8bd0e2a..5d9444e9664 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java @@ -21,15 +21,20 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; +import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.test.concurrency.PointcutLatch; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; + import static org.junit.jupiter.api.Assertions.assertEquals; public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants { @@ -41,7 +46,7 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT "false, READY", "true, GATE_WAITING" }) - default void chunkCreation_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) { + default void chunkCreation_nonFirstChunk_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) { String jobInstanceId = getTestManager().createAndStoreJobInstance(null); String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution); @@ -50,17 +55,24 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT } @ParameterizedTest - @CsvSource({ - "false", - "true" - }) - default void chunkReceived_queuedToInProgress(boolean theGatedExecution) throws InterruptedException { + @ValueSource(booleans = {true, false}) + default void chunkCreation_firstChunk_isInReady(boolean theGatedExecution) { + JobDefinition jobDef = getTestManager().withJobDefinition(theGatedExecution); + String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); + String myChunkId = getTestManager().createFirstChunk(jobDef, jobInstanceId); + + WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are " + WorkChunkStatusEnum.READY); + } + + @Test + default void chunkReceived_forNongatedJob_queuedToInProgress() throws InterruptedException { PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler(); sendLatch.setExpectedCount(1); - JobDefinition jobDef = getTestManager().withJobDefinition(false); + JobDefinition jobDef = getTestManager().withJobDefinition(false); String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); - String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution); + String myChunkId = getTestManager().createChunk(jobInstanceId, false); getTestManager().runMaintenancePass(); // the worker has received the chunk, and marks it started. @@ -75,6 +87,40 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 1); } + @Test + default void chunkReceived_forGatedJob_queuedToInProgress() throws InterruptedException { + PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler(); + sendLatch.setExpectedCount(2); + + JobDefinition jobDef = getTestManager().withJobDefinition(true); + String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); + String chunkInStep1 = getTestManager().createFirstChunk(jobDef, jobInstanceId); + String chunkInStep2 = getTestManager().createChunkInStep(jobInstanceId, SECOND_STEP_ID, true); + + // dequeue and completes the first chunk + getTestManager().runMaintenancePass(); + // the worker has received the chunk, and marks it started. + WorkChunk chunk1 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep1).orElseThrow(IllegalArgumentException::new); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk1.getStatus()); + WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus()); + + getTestManager().getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkInStep1, 50, 0)); + fetchedWorkChunk = getTestManager().freshFetchWorkChunk(chunkInStep1); + assertEquals(WorkChunkStatusEnum.COMPLETED, fetchedWorkChunk.getStatus()); + + // dequeue the second chunk + getTestManager().runMaintenancePass(); + WorkChunk chunk2 = getTestManager().getSvc().onWorkChunkDequeue(chunkInStep2).orElseThrow(IllegalArgumentException::new); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk2.getStatus()); + assertEquals(CHUNK_DATA, chunk2.getData()); + + WorkChunk fetchedWorkChunk2 = getTestManager().freshFetchWorkChunk(chunkInStep2); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk2.getStatus()); + + getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 2); + } + @Test default void enqueueWorkChunkForProcessing_enqueuesOnlyREADYChunks() throws InterruptedException { // setup diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java index 48bf7264f35..6fb3b0e2787 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java @@ -26,15 +26,12 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertNull; import ca.uhn.fhir.batch2.model.JobDefinition; -import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.test.concurrency.PointcutLatch; import com.google.common.collect.ImmutableList; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -55,7 +52,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC JobInstance instance = createInstance(); String instanceId = getTestManager().getSvc().storeNewInstance(instance); - String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null, false); + String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null, false); getTestManager().runInTransaction(() -> { WorkChunk chunk = getTestManager().freshFetchWorkChunk(id); @@ -74,7 +71,7 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC getTestManager().enableMaintenanceRunner(false); - String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); + String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); assertNotNull(id); getTestManager().runInTransaction(() -> assertEquals(expectedStatus, getTestManager().freshFetchWorkChunk(id).getStatus())); diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java index 8ab0157e52b..10df1376af9 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java @@ -24,7 +24,9 @@ public interface WorkChunkTestConstants { // we use a separate id for gated jobs because these job definitions might not // be cleaned up after any given test run String GATED_JOB_DEFINITION_ID = "gated_job_def_id"; - public static final String TARGET_STEP_ID = "step-id"; + public static final String FIRST_STEP_ID = "step-id"; + public static final String SECOND_STEP_ID = "2nd-step-id"; + public static final String LAST_STEP_ID = "last-step-id"; public static final String DEF_CHUNK_ID = "definition-chunkId"; public static final int JOB_DEF_VER = 1; public static final int SEQUENCE_NUMBER = 1; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 455d3d2380a..8570d4b9252 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -99,9 +99,9 @@ public class JobInstanceProcessor { JobDefinition jobDefinition = myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance); - enqueueReadyChunks(theInstance, jobDefinition, false); cleanupInstance(theInstance); triggerGatedExecutions(theInstance, jobDefinition); + enqueueReadyChunks(theInstance, jobDefinition); ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch); } @@ -192,19 +192,15 @@ public class JobInstanceProcessor { String instanceId = theInstance.getInstanceId(); String currentStepId = jobWorkCursor.getCurrentStepId(); - boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor); + boolean canAdvance = canAdvanceGatedJob(theInstance); if (canAdvance) { if (jobWorkCursor.isReductionStep()) { // current step is the reduction step (all reduction steps are final) JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId()); myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor); - } else if (jobWorkCursor.isFinalStep()) { - // current step is the final step in a non-reduction gated job - processChunksForNextGatedSteps( - theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId()); - } else { - // all other gated job steps + } else if (!jobWorkCursor.isFinalStep()) { + // all other gated job steps except for final steps String nextStepId = jobWorkCursor.nextStep.getStepId(); ourLog.info( "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", @@ -213,7 +209,7 @@ public class JobInstanceProcessor { nextStepId); // otherwise, continue processing as expected - processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId); + processChunksForNextGatedSteps(theInstance, nextStepId); } } else { String stepId = jobWorkCursor.nextStep != null @@ -227,8 +223,7 @@ public class JobInstanceProcessor { } } - private boolean canAdvanceGatedJob( - JobDefinition theJobDefinition, JobInstance theInstance, JobWorkCursor theWorkCursor) { + private boolean canAdvanceGatedJob(JobInstance theInstance) { // make sure our instance still exists if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) { // no more job @@ -245,49 +240,9 @@ public class JobInstanceProcessor { return true; } - if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING)) && theWorkCursor.isFirstStep()) { - // We are in the first step and all chunks are in GATE_WAITING - // this means that the job has just started, no workchunks have been queued yet -> proceed. - return true; - } - - if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) { - // all workchunks for the current step are in COMPLETED -> proceed. - return true; - } - - // all workchunks for gated jobs should be turned to QUEUED immediately after they are set to READY - // but in case we die in between, this conditional ought to catch the READY chunks. - if (workChunkStatuses.contains(WorkChunkStatusEnum.READY)) { - if (theWorkCursor.isFirstStep()) { - // first step - all ready means we're ready to proceed to enqueue - return true; - } else { - // it's a future step; - // make sure previous step's workchunks are completed - JobDefinitionStep previousStep = - theJobDefinition.getSteps().get(0); - for (JobDefinitionStep step : theJobDefinition.getSteps()) { - if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) { - break; - } - previousStep = step; - } - Set previousStepWorkChunkStates = - myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( - theInstance.getInstanceId(), previousStep.getStepId()); - - // completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed) - if (previousStepWorkChunkStates.isEmpty() - || previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) { - return true; - } - } - } - - // anything else - return false; - } + // all workchunks for the current step are in COMPLETED -> proceed. + return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); + } protected PagingIterator getReadyChunks() { return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { @@ -311,27 +266,12 @@ public class JobInstanceProcessor { * completing prematurely. */ private void enqueueReadyChunks( - JobInstance theJobInstance, JobDefinition theJobDefinition, boolean theIsGatedExecutionBool) { + JobInstance theJobInstance, JobDefinition theJobDefinition) { Iterator iter = getReadyChunks(); AtomicInteger counter = new AtomicInteger(); - ConcurrentHashMap> stepToWorkCursor = new ConcurrentHashMap<>(); while (iter.hasNext()) { WorkChunkMetadata metadata = iter.next(); - JobWorkCursor jobWorkCursor = stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> { - return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId()); - }); - counter.getAndIncrement(); - if (!theIsGatedExecutionBool && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) { - /* - * Gated executions are queued later when all work chunks are ready. - * - * Reduction steps are not submitted to the queue at all, but processed inline. - * Currently all reduction steps are also gated, but this might not always - * be true. - */ - return; - } /* * For each chunk id @@ -343,7 +283,7 @@ public class JobInstanceProcessor { updateChunkAndSendToQueue(metadata); } ourLog.debug( - "Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId()); + "Encountered {} READY work chunks for job {} of type {}", counter.get(), theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); } /** @@ -388,8 +328,6 @@ public class JobInstanceProcessor { private void processChunksForNextGatedSteps( JobInstance theInstance, - JobDefinition theJobDefinition, - JobWorkCursor theWorkCursor, String nextStepId) { String instanceId = theInstance.getInstanceId(); List readyChunksForNextStep = @@ -404,38 +342,7 @@ public class JobInstanceProcessor { readyChunksForNextStep.size()); } - boolean isEnqueue; - String currentStepId = theWorkCursor.getCurrentStepId(); - Set workChunkStatusesForCurrentStep = - myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(theInstance.getInstanceId(), currentStepId); - if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.GATE_WAITING)) - && theWorkCursor.isFirstStep()) { - // this means that the job has just started, no workchunks have been queued yet - // turn the first chunk to READY, do NOT advance the step. - isEnqueue = myJobPersistence.updateAllChunksForStepWithStatus( - instanceId, currentStepId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.GATE_WAITING) - != 0; - } else if (workChunkStatusesForCurrentStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) { - // update the job step so the workers will process them. - // if it's the last (gated) step, there will be no change - but we should - // queue up the chunks anyway - isEnqueue = theWorkCursor.isFinalStep() - || myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId); - } else { - // this means that the current step's chunks contains only READY and QUEUED chunks, possibly leftover from - // other maintenance job who died in the middle - // should enqueue the rest of the ready chunks - isEnqueue = true; - } - - if (!isEnqueue) { - // we collided with another maintenance job. - ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId); - return; - } - - // because we now have all gated job chunks in READY state, - // we can enqueue them - enqueueReadyChunks(theInstance, theJobDefinition, true); + // update the job step so the workers will process them. + myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId); } }