diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5818-update-batch2-framework-with-gate_waiting-state.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5818-update-batch2-framework-with-gate_waiting-state.yaml new file mode 100644 index 00000000000..852d110800a --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_2_0/5818-update-batch2-framework-with-gate_waiting-state.yaml @@ -0,0 +1,7 @@ +--- +type: add +issue: 5818 +title: "Added another state to the Batch2 work chunk state machine: GATE_WAITING. + This work chunk state will be the initial state on creation for gated jobs. + Once all chunks are completed for the previous step, they will transition to READY. +" diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md index 192444d9478..a435a7c3720 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md @@ -57,10 +57,11 @@ stateDiagram-v2 state FAILED state COMPLETED direction LR - [*] --> READY : on create - normal - [*] --> GATED : on create - gated - GATED --> READY : on create - gated - READY --> QUEUED : placed on kafka (maint.) + [*] --> READY : on create - normal or gated first chunk + [*] --> GATE_WAITING : on create - gated jobs for all but the first chunk + GATE_WAITING --> READY : on gate release - gated + QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to Hapi-fhir version 7.1) + READY --> QUEUED : placed on kafka (maint.) POLL_WAITING --> READY : after a poll delay on a POLL_WAITING work chunk has elapsed %% worker processing states diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md index efb5ff42a39..961cf193426 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/introduction.md @@ -19,19 +19,20 @@ A HAPI-FHIR batch job definition consists of a job name, version, parameter json After a job has been defined, *instances* of that job can be submitted for batch processing by populating a `JobInstanceStartRequest` with the job name and job parameters json and then submitting that request to the Batch Job Coordinator. The Batch Job Coordinator will then store two records in the database: -- Job Instance with status QUEUED: that is the parent record for all data concerning this job -- Batch Work Chunk with status READY: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data. +- Job Instance with status `QUEUED`: that is the parent record for all data concerning this job +- Batch Work Chunk with status `READY`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data. ### The Maintenance Job A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it: +1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database. 1. Moves all `POLL_WAITING` work chunks to `READY` if their `nextPollTime` has expired. -1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \* 1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any leftover work chunks still in the database. 1. Cleans up any complete, failed, or cancelled jobs that need to be removed. -1. Moves any gated jobs onto their next step when complete. +1. When the current step is complete, moves any gated jobs onto their next step and updates all chunks in `GATE_WAITING` to `READY`. 1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered. +1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \* \* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel, but instead processed inline. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 0db2950f0f1..5a4a9ef0dc9 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -129,7 +129,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence { entity.setSerializedData(theBatchWorkChunk.serializedData); entity.setCreateTime(new Date()); entity.setStartTime(new Date()); - entity.setStatus(WorkChunkStatusEnum.READY); + // set gated job chunks to GATE_WAITING; they will be transitioned to READY during maintenance pass when all + // chunks in the previous step are COMPLETED + entity.setStatus( + theBatchWorkChunk.isGatedExecution ? WorkChunkStatusEnum.GATE_WAITING : WorkChunkStatusEnum.READY); + ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); ourLog.trace( "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); @@ -595,4 +599,35 @@ public class JpaJobPersistenceImpl implements IJobPersistence { myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); } } + + @Override + @Transactional(propagation = Propagation.REQUIRES_NEW) + public boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId) { + boolean changed = updateInstance(theJobInstanceId, instance -> { + if (instance.getCurrentGatedStepId().equals(theNextStepId)) { + // someone else beat us here. No changes + return false; + } + ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId); + instance.setCurrentGatedStepId(theNextStepId); + return true; + }); + + if (changed) { + ourLog.debug( + "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.", + theJobInstanceId, + theNextStepId); + // when we reach here, the current step id is equal to theNextStepId + int numChanged = + myWorkChunkRepository.updateAllChunksForStepFromGateWaitingToReady(theJobInstanceId, theNextStepId); + ourLog.debug( + "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.", + numChanged, + theJobInstanceId, + theNextStepId); + } + + return changed; + } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index 2832cfe5601..34c8b23d138 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -128,6 +128,19 @@ public interface IBatch2WorkChunkRepository @Param("newStatus") WorkChunkStatusEnum theNewStatus, @Param("oldStatus") WorkChunkStatusEnum theOldStatus); + // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the + // workers. + // In order to keep them compatible, turn QUEUED chunks into READY, too. + // TODO: remove QUEUED from the IN clause when we are certain that no one is still running the old code. + @Modifying + @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.READY WHERE " + + "e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus in (" + + "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.GATE_WAITING," + + "ca.uhn.fhir.batch2.model.WorkChunkStatusEnum.QUEUED" + + ")") + int updateAllChunksForStepFromGateWaitingToReady( + @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId); + @Modifying @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId") int deleteAllForInstance(@Param("instanceId") String theInstanceId); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index ba013f714bc..14b21559d52 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; @@ -31,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java index 7fb41a9f1eb..ec40941af65 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java @@ -206,7 +206,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test { @Nonnull private JobDefinition buildGatedJobDefinition(String theJobId, IJobStepWorker theFirstStep, IJobStepWorker theLastStep) { - return TestJobDefinitionUtils.buildJobDefinition( + return TestJobDefinitionUtils.buildGatedJobDefinition( theJobId, theFirstStep, theLastStep, diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index a5e83893195..010b17c6739 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Import; @@ -78,7 +79,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class JpaJobPersistenceImplTest extends BaseJpaR4Test { public static final String JOB_DEFINITION_ID = "definition-id"; - public static final String TARGET_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID; + public static final String FIRST_STEP_ID = TestJobDefinitionUtils.FIRST_STEP_ID; + public static final String LAST_STEP_ID = TestJobDefinitionUtils.LAST_STEP_ID; public static final String DEF_CHUNK_ID = "definition-chunkId"; public static final String STEP_CHUNK_ID = TestJobDefinitionUtils.FIRST_STEP_ID; public static final int JOB_DEF_VER = 1; @@ -110,7 +112,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); for (int i = 0; i < 10; i++) { - storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}"))); + storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}")), false); } // Execute @@ -125,8 +127,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { }); } - private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { - WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData); + private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) { + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution); + return mySvc.onWorkChunkCreate(batchWorkChunk); + } + + private String storeFirstWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, TestJobDefinitionUtils.TEST_JOB_VERSION, theTargetStepId, theInstanceId, theSequence, theSerializedData, false); return mySvc.onWorkChunkCreate(batchWorkChunk); } @@ -136,7 +143,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String instanceId = mySvc.storeNewInstance(instance); runInTransaction(() -> { - Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId); assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus()); }); @@ -149,7 +156,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(instance.getReport(), foundInstance.getReport()); runInTransaction(() -> { - Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + Batch2JobInstanceEntity instanceEntity = findInstanceByIdOrThrow(instanceId); assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus()); }); } @@ -240,8 +247,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { // Setup JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); - WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); + storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk); Optional byId = myWorkChunkRepository.findById(chunkId); Batch2WorkChunkEntity entity = byId.get(); @@ -367,10 +374,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void testUpdateTime() { // Setup - JobInstance instance = createInstance(); + boolean isGatedExecution = false; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - Date updateTime = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime())); + Date updateTime = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime())); sleepUntilTimeChange(); @@ -378,39 +386,148 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { runInTransaction(() -> mySvc.updateInstanceUpdateTime(instanceId)); // Verify - Date updateTime2 = runInTransaction(() -> new Date(myJobInstanceRepository.findById(instanceId).orElseThrow().getUpdateTime().getTime())); + Date updateTime2 = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime())); assertNotEquals(updateTime, updateTime2); } + @Test + public void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesCurrentStepAndChunkStatus() { + // setup + boolean isGatedExecution = true; + JobInstance instance = createInstance(true, isGatedExecution); + String instanceId = mySvc.storeNewInstance(instance); + String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + + runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId())); + + // execute + runInTransaction(() -> { + boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, LAST_STEP_ID); + assertTrue(changed); + }); + + // verify + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus()); + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus()); + assertEquals(LAST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()); + }); + } + + @Test + public void advanceJobStepAndUpdateChunkStatus_whenAlreadyInTargetStep_DoesNotUpdateStepOrChunks() { + // setup + boolean isGatedExecution = true; + JobInstance instance = createInstance(true, isGatedExecution); + String instanceId = mySvc.storeNewInstance(instance); + String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + + runInTransaction(() -> assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId())); + + // execute + runInTransaction(() -> { + boolean changed = mySvc.advanceJobStepAndUpdateChunkStatus(instanceId, FIRST_STEP_ID); + assertFalse(changed); + }); + + // verify + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep1).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(chunkIdSecondStep2).getStatus()); + assertEquals(FIRST_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId()); + }); + } + @Test public void testFetchUnknownWork() { assertFalse(myWorkChunkRepository.findById("FOO").isPresent()); } - @Test - public void testStoreAndFetchWorkChunk_NoData() { - JobInstance instance = createInstance(true, false); + @ParameterizedTest + @CsvSource({ + "false, READY, QUEUED", + "true, GATE_WAITING, QUEUED" + }) + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) { + // setup + JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); + // execute & verify + String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); + // mark the first chunk as COMPLETED to allow step advance + runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY)); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, theGatedExecution); + runInTransaction(() -> assertEquals(theExpectedStatusOnCreate, findChunkByIdOrThrow(id).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, findChunkByIdOrThrow(id).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); + // assert null since we did not input any data when creating the chunks assertNull(chunk.getData()); } + @Test + public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() { + // setup + boolean isGatedExecution = true; + String expectedFirstChunkData = "IAmChunk1"; + String expectedSecondChunkData = "IAmChunk2"; + JobInstance instance = createInstance(true, isGatedExecution); + String instanceId = mySvc.storeNewInstance(instance); + + // execute & verify + String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData); + String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution); + + runInTransaction(() -> { + // check chunks created in expected states + assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); + + myBatch2JobHelper.runMaintenancePass(); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(firstChunkId).getStatus()); + // maintenance should not affect chunks in step 2 + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); + + WorkChunk actualFirstChunkData = mySvc.onWorkChunkDequeue(firstChunkId).orElseThrow(IllegalArgumentException::new); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(firstChunkId).getStatus())); + assertEquals(expectedFirstChunkData, actualFirstChunkData.getData()); + + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(firstChunkId, 50, 0)); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus()); + assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); + + myBatch2JobHelper.runMaintenancePass(); + runInTransaction(() -> { + assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus()); + // now that all chunks for step 1 is COMPLETED, should enqueue chunks in step 2 + assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(secondChunkId).getStatus()); + }); + + WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus())); + assertEquals(expectedSecondChunkData, actualSecondChunkData.getData()); + } + @Test void testStoreAndFetchChunksForInstance_NoData() { // given + boolean isGatedExecution = false; JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data"); - String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data"); - String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data"); + String queuedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, "some data", isGatedExecution); + String erroredId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 1, "some more data", isGatedExecution); + String completedId = storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 2, "some more data", isGatedExecution); mySvc.onWorkChunkDequeue(erroredId); WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(erroredId, "Our error message"); @@ -434,7 +551,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId()); assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion()); assertEquals(instanceId, workChunk.getInstanceId()); - assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId()); + assertEquals(FIRST_STEP_ID, workChunk.getTargetStepId()); assertEquals(0, workChunk.getSequence()); assertEquals(WorkChunkStatusEnum.READY, workChunk.getStatus()); @@ -468,17 +585,27 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { } - - @Test - public void testStoreAndFetchWorkChunk_WithData() { - JobInstance instance = createInstance(true, false); + @ParameterizedTest + @CsvSource({ + "false, READY, QUEUED", + "true, GATE_WAITING, QUEUED" + }) + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) { + // setup + JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); + // execute & verify + String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); + // mark the first chunk as COMPLETED to allow step advance + runInTransaction(() -> myWorkChunkRepository.updateChunkStatus(firstChunkId, WorkChunkStatusEnum.COMPLETED, WorkChunkStatusEnum.READY)); + + String id = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); assertNotNull(id); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(theExpectedCreatedStatus, findChunkByIdOrThrow(id).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(theExpectedTransitionStatus, findChunkByIdOrThrow(id).getStatus())); + WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertEquals(36, chunk.getInstanceId().length()); assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); @@ -486,19 +613,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); assertEquals(CHUNK_DATA, chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus())); } @Test public void testMarkChunkAsCompleted_Success() { - JobInstance instance = createInstance(true, false); + boolean isGatedExecution = false; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -508,13 +636,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNull(chunk.getEndTime()); assertNull(chunk.getRecordsProcessed()); assertNotNull(chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(chunkId).getStatus())); sleepUntilTimeChange(); mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); assertEquals(50, entity.getRecordsProcessed()); assertNotNull(entity.getCreateTime()); @@ -528,14 +656,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void testMarkChunkAsCompleted_Error() { - JobInstance instance = createInstance(true, false); + boolean isGatedExecution = false; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null); + String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -546,7 +675,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message"); mySvc.onWorkChunkError(request); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals("This is an error message", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -562,7 +691,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2"); mySvc.onWorkChunkError(request2); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals("This is an error message 2", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -580,14 +709,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void testMarkChunkAsCompleted_Fail() { - JobInstance instance = createInstance(true, false); + boolean isGatedExecution = false; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); assertNotNull(chunkId); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus())); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(chunkId).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); @@ -597,7 +727,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { mySvc.onWorkChunkFailed(chunkId, "This is an error message"); runInTransaction(() -> { - Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); + Batch2WorkChunkEntity entity = findChunkByIdOrThrow(chunkId); assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); assertEquals("This is an error message", entity.getErrorMessage()); assertNotNull(entity.getCreateTime()); @@ -620,7 +750,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { "stepId", instanceId, 0, - "{}" + "{}", + false ); String id = mySvc.onWorkChunkCreate(chunk); chunkIds.add(id); @@ -698,6 +829,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { } ); + instance.setCurrentGatedStepId(jobDef.getFirstStepId()); } else { jobDef = TestJobDefinitionUtils.buildJobDefinition( JOB_DEFINITION_ID, @@ -754,4 +886,12 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { Arguments.of(WorkChunkStatusEnum.COMPLETED, false) ); } + + private Batch2JobInstanceEntity findInstanceByIdOrThrow(String instanceId) { + return myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new); + } + + private Batch2WorkChunkEntity findChunkByIdOrThrow(String secondChunkId) { + return myWorkChunkRepository.findById(secondChunkId).orElseThrow(IllegalArgumentException::new); + } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index c06b3698da9..ab7affed055 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -39,6 +39,7 @@ import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType; import ca.uhn.test.concurrency.PointcutLatch; import jakarta.annotation.Nonnull; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -91,15 +92,16 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa return mySvc; } + @Nonnull public JobDefinition withJobDefinition(boolean theIsGatedBoolean) { JobDefinition.Builder builder = JobDefinition.newBuilder() .setJobDefinitionId(theIsGatedBoolean ? GATED_JOB_DEFINITION_ID : JOB_DEFINITION_ID) .setJobDefinitionVersion(JOB_DEF_VER) .setJobDescription("A job description") .setParametersType(TestJobParameters.class) - .addFirstStep(TARGET_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) - .addIntermediateStep("2nd-step-id", "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) - .addLastStep("last-step-id", "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)); + .addFirstStep(FIRST_STEP_ID, "the first step", TestJobStep2InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) + .addIntermediateStep(SECOND_STEP_ID, "the second step", TestJobStep3InputType.class, (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)) + .addLastStep(LAST_STEP_ID, "the final step", (theStepExecutionDetails, theDataSink) -> new RunOutcome(0)); if (theIsGatedBoolean) { builder.gatedExecution(); } @@ -165,11 +167,19 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa instance.setJobDefinitionVersion(JOB_DEF_VER); instance.setParameters(CHUNK_DATA); instance.setReport("TEST"); + if (jobDefinition.isGatedExecution()) { + instance.setCurrentGatedStepId(jobDefinition.getFirstStepId()); + } return instance; } - public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { - WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); + public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution) { + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData, theGatedExecution); + return mySvc.onWorkChunkCreate(batchWorkChunk); + } + + public String storeFirstWorkChunk(JobDefinition theJobDefinition, String theInstanceId) { + WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, theInstanceId); return mySvc.onWorkChunkCreate(batchWorkChunk); } @@ -229,7 +239,15 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa } public String createChunk(String theInstanceId) { - return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA); + return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, false); + } + + public String createChunk(String theInstanceId, boolean theGatedExecution) { + return storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, theInstanceId, 0, CHUNK_DATA, theGatedExecution); + } + + public String createFirstChunk(JobDefinition theJobDefinition, String theJobInstanceId){ + return storeFirstWorkChunk(theJobDefinition, theJobInstanceId); } public void enableMaintenanceRunner(boolean theToEnable) { diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java index 8bbcc689489..cb561ad87a1 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java @@ -1,16 +1,18 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.test.concurrency.PointcutLatch; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; + public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants { Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class); @@ -21,7 +23,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC } @Test - default void test_gatedJob_stepReady_advances() throws InterruptedException { + default void test_gatedJob_stepReady_stepAdvances() throws InterruptedException { // setup String initialState = """ # chunks ready - move to queued @@ -47,7 +49,7 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC @ValueSource(strings = { """ 1|COMPLETED - 2|GATED + 2|GATE_WAITING """, """ # Chunk already queued -> waiting for complete @@ -69,8 +71,9 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC """, """ # Not all steps ready to advance + # Latch Count: 1 1|COMPLETED - 2|READY # a single ready chunk + 2|READY,2|QUEUED # a single ready chunk 2|IN_PROGRESS """, """ @@ -78,50 +81,65 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 1|COMPLETED 2|COMPLETED 2|IN_PROGRESS - 3|READY - 3|READY + 3|GATE_WAITING + 3|GATE_WAITING """, """ - 1|COMPLETED - 2|READY - 2|QUEUED - 2|COMPLETED - 2|ERRORED - 2|FAILED - 2|IN_PROGRESS - 3|GATED - 3|GATED + # when current step is not all queued, should queue READY chunks + # Latch Count: 1 + 1|COMPLETED + 2|READY,2|QUEUED + 2|QUEUED + 2|COMPLETED + 2|ERRORED + 2|FAILED + 2|IN_PROGRESS + 3|GATE_WAITING + 3|QUEUED """, """ - 1|COMPLETED - 2|READY - 2|QUEUED - 2|COMPLETED - 2|ERRORED - 2|FAILED - 2|IN_PROGRESS - 3|QUEUED # a lie - 3|GATED + # when current step is all queued but not done, should not proceed + 1|COMPLETED + 2|COMPLETED + 2|QUEUED + 2|COMPLETED + 2|ERRORED + 2|FAILED + 2|IN_PROGRESS + 3|GATE_WAITING + 3|GATE_WAITING """ }) - default void testGatedStep2NotReady_notAdvance(String theChunkState) throws InterruptedException { + default void testGatedStep2NotReady_stepNotAdvanceToStep3(String theChunkState) throws InterruptedException { // setup + int expectedLatchCount = getLatchCountFromState(theChunkState); PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler(); - sendingLatch.setExpectedCount(0); - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); + sendingLatch.setExpectedCount(expectedLatchCount); + JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true); - getTestManager().createChunksInStates(result); + getTestManager().createChunksInStates(state); // test getTestManager().runMaintenancePass(); // verify // nothing ever queued -> nothing ever sent to queue - getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0); - verifyWorkChunkFinalStates(result); + getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, expectedLatchCount); + assertEquals(SECOND_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId()); + verifyWorkChunkFinalStates(state); + } + + /** + * Returns the expected latch count specified in the state. Defaults to 0 if not found. + * Expected format: # Latch Count: {} + * e.g. # Latch Count: 3 + */ + private int getLatchCountFromState(String theState){ + String keyStr = "# Latch Count: "; + int index = theState.indexOf(keyStr); + return index == -1 ? 0 : theState.charAt(index + keyStr.length()) - '0'; } - @Disabled @ParameterizedTest @ValueSource(strings = { """ @@ -129,37 +147,40 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 1|COMPLETED 2|COMPLETED 2|COMPLETED - 3|GATED|READY - 3|GATED|READY + 3|GATE_WAITING,3|QUEUED + 3|GATE_WAITING,3|QUEUED """, """ # OLD code only 1|COMPLETED - 2|QUEUED,2|READY - 2|QUEUED,2|READY + 2|COMPLETED + 2|COMPLETED + 3|QUEUED,3|QUEUED + 3|QUEUED,3|QUEUED """, """ - # mixed code only + # mixed code 1|COMPLETED 2|COMPLETED 2|COMPLETED - 3|GATED|READY - 3|QUEUED|READY + 3|GATE_WAITING,3|QUEUED + 3|QUEUED,3|QUEUED """ }) default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) throws InterruptedException { // setup PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler(); - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); - getTestManager().createChunksInStates(result); + sendingLatch.setExpectedCount(2); + JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true); + getTestManager().createChunksInStates(state); // test getTestManager().runMaintenancePass(); // verify - // things are being set to READY; is anything being queued? - getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 0); - verifyWorkChunkFinalStates(result); + getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 2); + assertEquals(LAST_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId()); + verifyWorkChunkFinalStates(state); } @Test @@ -206,4 +227,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) { theStateInformation.verifyFinalStates(getTestManager().getSvc()); } + + private JobInstance getJobInstanceFromState(JobMaintenanceStateInformation state) { + return getTestManager().freshFetchJobInstance(state.getInstanceId()); + } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java index 4425b062c6b..cd155fb5a7f 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/ITestFixture.java @@ -37,7 +37,7 @@ public interface ITestFixture { WorkChunk freshFetchWorkChunk(String theChunkId); - String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData); + String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData, boolean theGatedExecution); void runInTransaction(Runnable theRunnable); @@ -62,6 +62,14 @@ public interface ITestFixture { */ String createChunk(String theJobInstanceId); + String createChunk(String theJobInstanceId, boolean theGatedExecution); + + /** + * Create chunk as the first chunk of a job. + * @return the id of the created chunk + */ + String createFirstChunk(JobDefinition theJobDefinition, String theJobInstanceId); + /** * Enable/disable the maintenance runner (So it doesn't run on a scheduler) */ diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java index 70a0478cd8c..fcf857df7ed 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java @@ -19,16 +19,16 @@ */ package ca.uhn.hapi.fhir.batch2.test; -import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; - import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; +import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.test.concurrency.PointcutLatch; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,21 +44,44 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT Logger ourLog = LoggerFactory.getLogger(IWorkChunkStateTransitions.class); - @Test - default void chunkCreation_isQueued() { + @BeforeEach + default void before() { + getTestManager().enableMaintenanceRunner(false); + } + + @ParameterizedTest + @CsvSource({ + "false, READY", + "true, GATE_WAITING" + }) + default void chunkCreation_nonFirstChunk_isInExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) { String jobInstanceId = getTestManager().createAndStoreJobInstance(null); - String myChunkId = getTestManager().createChunk(jobInstanceId); + String myChunkId = getTestManager().createChunk(jobInstanceId, theGatedExecution); WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are READY"); + assertEquals(expectedStatus, fetchedWorkChunk.getStatus(), "New chunks are " + expectedStatus); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + default void chunkCreation_firstChunk_isInReady(boolean theGatedExecution) { + JobDefinition jobDef = getTestManager().withJobDefinition(theGatedExecution); + String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); + String myChunkId = getTestManager().createFirstChunk(jobDef, jobInstanceId); + + WorkChunk fetchedWorkChunk = getTestManager().freshFetchWorkChunk(myChunkId); + // the first chunk of both gated and non-gated job should start in READY + assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are " + WorkChunkStatusEnum.READY); } @Test - default void chunkReceived_queuedToInProgress() throws InterruptedException { + default void chunkReceived_forNongatedJob_queuedToInProgress() throws InterruptedException { PointcutLatch sendLatch = getTestManager().disableWorkChunkMessageHandler(); sendLatch.setExpectedCount(1); - String jobInstanceId = getTestManager().createAndStoreJobInstance(null); - String myChunkId = getTestManager().createChunk(jobInstanceId); + + JobDefinition jobDef = getTestManager().withJobDefinition(false); + String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); + String myChunkId = getTestManager().createChunk(jobInstanceId, false); getTestManager().runMaintenancePass(); // the worker has received the chunk, and marks it started. @@ -73,11 +96,37 @@ public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkT getTestManager().verifyWorkChunkMessageHandlerCalled(sendLatch, 1); } + @Test + default void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesBothREADYAndQUEUEDChunks() { + // setup + getTestManager().disableWorkChunkMessageHandler(); + + String state = """ + 1|COMPLETED + 2|COMPLETED + 3|GATE_WAITING,3|READY + 3|QUEUED,3|READY + """; + + JobDefinition jobDef = getTestManager().withJobDefinition(true); + String jobInstanceId = getTestManager().createAndStoreJobInstance(jobDef); + + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(jobInstanceId, jobDef, state); + getTestManager().createChunksInStates(info); + assertEquals(SECOND_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId()); + + // execute + getTestManager().runInTransaction(() -> getTestManager().getSvc().advanceJobStepAndUpdateChunkStatus(jobInstanceId, LAST_STEP_ID)); + + // verify + assertEquals(LAST_STEP_ID, getTestManager().freshFetchJobInstance(jobInstanceId).getCurrentGatedStepId()); + info.verifyFinalStates(getTestManager().getSvc()); + } + @Test default void enqueueWorkChunkForProcessing_enqueuesOnlyREADYChunks() throws InterruptedException { // setup getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); StringBuilder sb = new StringBuilder(); // first step is always complete diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java index af0792e6f86..3d1c8d8af30 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java @@ -21,24 +21,21 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertNull; import ca.uhn.fhir.batch2.model.JobDefinition; -import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.batch2.model.JobWorkNotification; -import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; -import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.test.concurrency.PointcutLatch; import com.google.common.collect.ImmutableList; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; -import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -48,17 +45,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants { + @BeforeEach + default void before() { + getTestManager().enableMaintenanceRunner(false); + } + @Test default void testStoreAndFetchWorkChunk_NoData() { JobInstance instance = createInstance(); String instanceId = getTestManager().getSvc().storeNewInstance(instance); - String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); + String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null, false); getTestManager().runInTransaction(() -> { WorkChunk chunk = getTestManager().freshFetchWorkChunk(id); @@ -66,24 +66,25 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC }); } - @Test - default void testWorkChunkCreate_inReadyState() { + @ParameterizedTest + @CsvSource({ + "false, READY", + "true, GATE_WAITING" + }) + default void testWorkChunkCreate_inExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum expectedStatus) { JobInstance instance = createInstance(); String instanceId = getTestManager().getSvc().storeNewInstance(instance); - getTestManager().enableMaintenanceRunner(false); - - String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); + String id = getTestManager().storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, theGatedExecution); assertNotNull(id); - getTestManager().runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, getTestManager().freshFetchWorkChunk(id).getStatus())); + getTestManager().runInTransaction(() -> assertEquals(expectedStatus, getTestManager().freshFetchWorkChunk(id).getStatus())); } @Test default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() throws InterruptedException { // setup int expectedCalls = 1; - getTestManager().enableMaintenanceRunner(false); PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler(); sendingLatch.setExpectedCount(expectedCalls); String state = "1|READY,1|QUEUED"; @@ -109,7 +110,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC default void testStoreAndFetchWorkChunk_WithData() { // setup getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); JobDefinition jobDefinition = getTestManager().withJobDefinition(false); JobInstance instance = createInstance(); String instanceId = getTestManager().getSvc().storeNewInstance(instance); @@ -144,7 +144,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC // setup String state = "2|IN_PROGRESS,2|COMPLETED"; getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); JobDefinition jobDefinition = getTestManager().withJobDefinition(false); String instanceId = getTestManager().createAndStoreJobInstance(jobDefinition); @@ -174,7 +173,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC // setup String state = "1|IN_PROGRESS,1|ERRORED"; getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); JobDefinition jobDef = getTestManager().withJobDefinition(false); String instanceId = getTestManager().createAndStoreJobInstance(jobDef); JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( @@ -216,7 +214,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC // setup String state = "1|IN_PROGRESS,1|FAILED"; getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); JobDefinition jobDef = getTestManager().withJobDefinition(false); String instanceId = getTestManager().createAndStoreJobInstance(jobDef); JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( @@ -248,7 +245,6 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC 1|IN_PROGRESS,1|COMPLETED """; getTestManager().disableWorkChunkMessageHandler(); - getTestManager().enableMaintenanceRunner(false); JobDefinition jobDef = getTestManager().withJobDefinition(false); String instanceId = getTestManager().createAndStoreJobInstance(jobDef); JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java index 8ab0157e52b..10df1376af9 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java @@ -24,7 +24,9 @@ public interface WorkChunkTestConstants { // we use a separate id for gated jobs because these job definitions might not // be cleaned up after any given test run String GATED_JOB_DEFINITION_ID = "gated_job_def_id"; - public static final String TARGET_STEP_ID = "step-id"; + public static final String FIRST_STEP_ID = "step-id"; + public static final String SECOND_STEP_ID = "2nd-step-id"; + public static final String LAST_STEP_ID = "last-step-id"; public static final String DEF_CHUNK_ID = "definition-chunkId"; public static final int JOB_DEF_VER = 1; public static final int SEQUENCE_NUMBER = 1; diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java index 245c33417c2..d83ca179928 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java @@ -150,7 +150,8 @@ public class JobMaintenanceStateInformation { if (jobDef.isGatedExecution()) { AtomicReference latestStepId = new AtomicReference<>(); int totalSteps = jobDef.getSteps().size(); - for (int i = totalSteps - 1; i >= 0; i--) { + // ignore the last step since tests in gated jobs needs the current step to be the second-last step + for (int i = totalSteps - 2; i >= 0; i--) { JobDefinitionStep step = jobDef.getSteps().get(i); if (stepIds.contains(step.getStepId())) { latestStepId.set(step.getStepId()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 731211fe8bb..1ec4ed915d8 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -289,4 +289,14 @@ public interface IJobPersistence extends IWorkChunkPersistence { @VisibleForTesting WorkChunk createWorkChunk(WorkChunk theWorkChunk); + + /** + * Atomically advance the given job to the given step and change the status of all QUEUED and GATE_WAITING chunks + * in the next step to READY + * @param theJobInstanceId the id of the job instance to be updated + * @param theNextStepId the id of the next job step + * @return whether any changes were made + */ + @Transactional(propagation = Propagation.REQUIRES_NEW) + boolean advanceJobStepAndUpdateChunkStatus(String theJobInstanceId, String theNextStepId); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java index f58687f3af4..49394e3e40f 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java @@ -56,7 +56,8 @@ public interface IWorkChunkPersistence { * The first state event, as the chunk is created. * This method should be atomic and should only * return when the chunk has been successfully stored in the database. - * Chunk should be stored with a status of {@link WorkChunkStatusEnum#QUEUED} + * Chunk should be stored with a status of {@link WorkChunkStatusEnum#READY} or + * {@link WorkChunkStatusEnum#GATE_WAITING} for ungated and gated jobs, respectively. * * @param theBatchWorkChunk the batch work chunk to be stored * @return a globally unique identifier for this chunk. diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java index d7b00bd32d7..0c067ef7a7c 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java @@ -82,7 +82,13 @@ class JobDataSink jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( + jobDefinition, updatedInstance.getCurrentGatedStepId()); + if (jobWorkCursor.isReductionStep()) { + // Reduction step work chunks should never be sent to the queue but to its specific service instead. + triggerReductionStep(updatedInstance, jobWorkCursor); + return; + } + } + + // enqueue the chunks as normal + enqueueReadyChunks(updatedInstance, jobDefinition); + ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch); } @@ -190,22 +201,12 @@ public class JobInstanceProcessor { JobWorkCursor jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( theJobDefinition, theInstance.getCurrentGatedStepId()); - String instanceId = theInstance.getInstanceId(); String currentStepId = jobWorkCursor.getCurrentStepId(); - boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, jobWorkCursor); + boolean canAdvance = canAdvanceGatedJob(theInstance); if (canAdvance) { - if (jobWorkCursor.isReductionStep()) { - // current step is the reduction step (all reduction steps are final) - JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( - jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId()); - myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor); - } else if (jobWorkCursor.isFinalStep()) { - // current step is the final step in a non-reduction gated job - processChunksForNextGatedSteps( - theInstance, theJobDefinition, jobWorkCursor, jobWorkCursor.getCurrentStepId()); - } else { - // all other gated job steps + if (!jobWorkCursor.isFinalStep()) { + // all other gated job steps except for final steps - final steps does not need to be advanced String nextStepId = jobWorkCursor.nextStep.getStepId(); ourLog.info( "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", @@ -213,8 +214,12 @@ public class JobInstanceProcessor { currentStepId, nextStepId); - // otherwise, continue processing as expected - processChunksForNextGatedSteps(theInstance, theJobDefinition, jobWorkCursor, nextStepId); + processChunksForNextGatedSteps(theInstance, nextStepId); + } else { + ourLog.info( + "Ready to advance gated execution of instance {} but already at the final step {}. Not proceeding to advance steps.", + instanceId, + jobWorkCursor.getCurrentStepId()); } } else { String stepId = jobWorkCursor.nextStep != null @@ -228,8 +233,7 @@ public class JobInstanceProcessor { } } - private boolean canAdvanceGatedJob( - JobDefinition theJobDefinition, JobInstance theInstance, JobWorkCursor theWorkCursor) { + private boolean canAdvanceGatedJob(JobInstance theInstance) { // make sure our instance still exists if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) { // no more job @@ -240,36 +244,14 @@ public class JobInstanceProcessor { Set workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( theInstance.getInstanceId(), currentGatedStepId); - // we only advance if all of the current steps workchunks are READY - if (workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.READY))) { - if (theWorkCursor.isFirstStep()) { - // first step - all ready means we're ready to proceed to the next step - return true; - } else { - // it's a future step; - // make sure previous step's workchunks are completed - JobDefinitionStep previousStep = - theJobDefinition.getSteps().get(0); - for (JobDefinitionStep step : theJobDefinition.getSteps()) { - if (step.getStepId().equalsIgnoreCase(currentGatedStepId)) { - break; - } - previousStep = step; - } - Set previousStepWorkChunkStates = - myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( - theInstance.getInstanceId(), previousStep.getStepId()); - - // completed means "all in COMPLETE state" or no previous chunks (they're cleaned up or never existed) - if (previousStepWorkChunkStates.isEmpty() - || previousStepWorkChunkStates.equals(Set.of(WorkChunkStatusEnum.COMPLETED))) { - return true; - } - } + if (workChunkStatuses.isEmpty()) { + // no work chunks = no output + // trivial to advance to next step + return true; } - // anything else - return false; + // all workchunks for the current step are in COMPLETED -> proceed. + return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); } protected PagingIterator getReadyChunks() { @@ -283,39 +265,26 @@ public class JobInstanceProcessor { }); } + /** + * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued. + */ + private void triggerReductionStep(JobInstance theInstance, JobWorkCursor jobWorkCursor) { + String instanceId = theInstance.getInstanceId(); + ourLog.debug("Triggering Reduction step {} of instance {}.", jobWorkCursor.getCurrentStepId(), instanceId); + myReductionStepExecutorService.triggerReductionStep(instanceId, jobWorkCursor); + } + /** * Chunks are initially created in READY state. * We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka) * for processing. - * - * We could block chunks from being moved from QUEUE'd to READY here for gated steps - * but currently, progress is calculated by looking at completed chunks only; - * we'd need a new GATE_WAITING state to move chunks to to prevent jobs from - * completing prematurely. */ - private void enqueueReadyChunks( - JobInstance theJobInstance, JobDefinition theJobDefinition, boolean theIsGatedExecutionAdvancementBool) { + private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition theJobDefinition) { Iterator iter = getReadyChunks(); - AtomicInteger counter = new AtomicInteger(); - ConcurrentHashMap> stepToWorkCursor = new ConcurrentHashMap<>(); + int counter = 0; while (iter.hasNext()) { WorkChunkMetadata metadata = iter.next(); - JobWorkCursor jobWorkCursor = stepToWorkCursor.computeIfAbsent(metadata.getTargetStepId(), (e) -> { - return JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, metadata.getTargetStepId()); - }); - counter.getAndIncrement(); - if (!theIsGatedExecutionAdvancementBool - && (theJobDefinition.isGatedExecution() || jobWorkCursor.isReductionStep())) { - /* - * Gated executions are queued later when all work chunks are ready. - * - * Reduction steps are not submitted to the queue at all, but processed inline. - * Currently all reduction steps are also gated, but this might not always - * be true. - */ - return; - } /* * For each chunk id @@ -325,9 +294,13 @@ public class JobInstanceProcessor { * * commit */ updateChunkAndSendToQueue(metadata); + counter++; } ourLog.debug( - "Encountered {} READY work chunks for job {}", counter.get(), theJobDefinition.getJobDefinitionId()); + "Encountered {} READY work chunks for job {} of type {}", + counter, + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); } /** @@ -370,53 +343,22 @@ public class JobInstanceProcessor { myBatchJobSender.sendWorkChannelMessage(workNotification); } - private void processChunksForNextGatedSteps( - JobInstance theInstance, - JobDefinition theJobDefinition, - JobWorkCursor theWorkCursor, - String nextStepId) { + private void processChunksForNextGatedSteps(JobInstance theInstance, String nextStepId) { String instanceId = theInstance.getInstanceId(); List readyChunksForNextStep = - myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.READY); + myProgressAccumulator.getChunkIdsWithStatus(instanceId, nextStepId, WorkChunkStatusEnum.GATE_WAITING); int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId); if (totalChunksForNextStep != readyChunksForNextStep.size()) { ourLog.debug( - "Total ProgressAccumulator READY chunk count does not match READY chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", + "Total ProgressAccumulator GATE_WAITING chunk count does not match GATE_WAITING chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", instanceId, nextStepId, totalChunksForNextStep, readyChunksForNextStep.size()); } - // TODO - // create a new persistence transition for state advance - // update stepId to next step AND update all chunks in this step to READY (from GATED or QUEUED ;-P) - // so we can queue them safely. - // update the job step so the workers will process them. - // if it's the last (gated) step, there will be no change - but we should - // queue up the chunks anyways - boolean changed = theWorkCursor.isFinalStep() - || myJobPersistence.updateInstance(instanceId, instance -> { - if (instance.getCurrentGatedStepId().equals(nextStepId)) { - // someone else beat us here. No changes - return false; - } - instance.setCurrentGatedStepId(nextStepId); - return true; - }); - - if (!changed) { - // we collided with another maintenance job. - ourLog.warn("Skipping gate advance to {} for instance {} - already advanced.", nextStepId, instanceId); - return; - } - - ourLog.debug("Moving gated instance {} to next step.", theInstance.getInstanceId()); - - // because we now have all gated job chunks in READY state, - // we can enqueue them - enqueueReadyChunks(theInstance, theJobDefinition, true); + myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId); } /** diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java index 95e07c87761..c2b489016b7 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java @@ -36,6 +36,7 @@ public class WorkChunkCreateEvent { public final String instanceId; public final int sequence; public final String serializedData; + public final boolean isGatedExecution; /** * Constructor @@ -52,20 +53,28 @@ public class WorkChunkCreateEvent { @Nonnull String theTargetStepId, @Nonnull String theInstanceId, int theSequence, - @Nullable String theSerializedData) { + @Nullable String theSerializedData, + boolean theGatedExecution) { jobDefinitionId = theJobDefinitionId; jobDefinitionVersion = theJobDefinitionVersion; targetStepId = theTargetStepId; instanceId = theInstanceId; sequence = theSequence; serializedData = theSerializedData; + isGatedExecution = theGatedExecution; } + /** + * Creates the WorkChunkCreateEvent for the first chunk of a job. + */ public static WorkChunkCreateEvent firstChunk(JobDefinition theJobDefinition, String theInstanceId) { String firstStepId = theJobDefinition.getFirstStepId(); String jobDefinitionId = theJobDefinition.getJobDefinitionId(); int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion(); - return new WorkChunkCreateEvent(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null); + // the first chunk of a job is always READY, no matter whether the job is gated + boolean isGatedExecution = false; + return new WorkChunkCreateEvent( + jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null, isGatedExecution); } @Override @@ -83,6 +92,7 @@ public class WorkChunkCreateEvent { .append(instanceId, that.instanceId) .append(sequence, that.sequence) .append(serializedData, that.serializedData) + .append(isGatedExecution, that.isGatedExecution) .isEquals(); } @@ -95,6 +105,7 @@ public class WorkChunkCreateEvent { .append(instanceId) .append(sequence) .append(serializedData) + .append(isGatedExecution) .toHashCode(); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java index b898399a0f3..85909e3b624 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java @@ -31,17 +31,17 @@ import java.util.Set; * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md */ public enum WorkChunkStatusEnum { - // wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel. /** * The initial state all workchunks start in */ READY, + GATE_WAITING, + QUEUED, /** * The state of workchunks that have been sent to the queue; * or of workchunks that are about to be processed in a final * reduction step (these workchunks are never queued) */ - QUEUED, /** * The state of workchunks that are doing work. */ @@ -84,6 +84,8 @@ public enum WorkChunkStatusEnum { public Set getNextStates() { switch (this) { + case GATE_WAITING: + return EnumSet.of(READY); case READY: return EnumSet.of(QUEUED); case QUEUED: diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index 1014dbf5679..c38c7a5cc09 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -73,6 +73,7 @@ public class InstanceProgress { statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1); switch (theChunk.getStatus()) { + case GATE_WAITING: case READY: case QUEUED: case POLL_WAITING: