diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 6c9b2202221..f186ca79c45 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -374,7 +374,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void testUpdateTime() { // Setup - JobInstance instance = createInstance(true, false); + boolean isGatedExecution = false; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); Date updateTime = runInTransaction(() -> new Date(findInstanceByIdOrThrow(instanceId).getUpdateTime().getTime())); @@ -392,10 +393,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void advanceJobStepAndUpdateChunkStatus_forGatedJob_updatesCurrentStepAndChunkStatus() { // setup - JobInstance instance = createInstance(true, true); + boolean isGatedExecution = true; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); - String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); + String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId())); @@ -416,10 +418,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Test public void advanceJobStepAndUpdateChunkStatus_whenAlreadyInTargetStep_DoesNotUpdateStepOrChunks() { // setup - JobInstance instance = createInstance(true, true); + boolean isGatedExecution = true; + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); - String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); - String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, true); + String chunkIdSecondStep1 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); + String chunkIdSecondStep2 = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, null, isGatedExecution); runInTransaction(() -> assertEquals(TARGET_STEP_ID, findInstanceByIdOrThrow(instanceId).getCurrentGatedStepId())); @@ -447,7 +450,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { "false, READY, QUEUED", "true, GATE_WAITING, QUEUED" }) - public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecution_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) { + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) { // setup JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); @@ -459,6 +462,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { runInTransaction(() -> assertEquals(theExpectedStatusAfterTransition, findChunkByIdOrThrow(id).getStatus())); WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); + // assert null since we did not input any data when creating the chunks assertNull(chunk.getData()); } @@ -476,6 +480,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String secondChunkId = storeWorkChunk(JOB_DEFINITION_ID, LAST_STEP_ID, instanceId, 0, expectedSecondChunkData, isGatedExecution); runInTransaction(() -> { + // check chunks created in expected states assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(firstChunkId).getStatus()); assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); }); @@ -483,6 +488,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { myBatch2JobHelper.runMaintenancePass(); runInTransaction(() -> { assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(firstChunkId).getStatus()); + // maintenance should not affect chunks in step 2 assertEquals(WorkChunkStatusEnum.GATE_WAITING, findChunkByIdOrThrow(secondChunkId).getStatus()); }); @@ -499,6 +505,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { myBatch2JobHelper.runMaintenancePass(); runInTransaction(() -> { assertEquals(WorkChunkStatusEnum.COMPLETED, findChunkByIdOrThrow(firstChunkId).getStatus()); + // now that all chunks for step 1 is COMPLETED, should enqueue chunks in step 2 assertEquals(WorkChunkStatusEnum.QUEUED, findChunkByIdOrThrow(secondChunkId).getStatus()); }); @@ -579,7 +586,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { "false, READY, QUEUED", "true, GATE_WAITING, QUEUED" }) - public void testStoreAndFetchWorkChunk_WithData(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) { + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) { JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java index 5426fc54948..cb561ad87a1 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java @@ -1,6 +1,7 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.test.concurrency.PointcutLatch; import org.junit.jupiter.api.BeforeEach; @@ -10,6 +11,8 @@ import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; + public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants { Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class); @@ -107,14 +110,14 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 3|GATE_WAITING """ }) - default void testGatedStep2NotReady_stepNotAdvance(String theChunkState) throws InterruptedException { + default void testGatedStep2NotReady_stepNotAdvanceToStep3(String theChunkState) throws InterruptedException { // setup int expectedLatchCount = getLatchCountFromState(theChunkState); PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler(); sendingLatch.setExpectedCount(expectedLatchCount); - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); + JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true); - getTestManager().createChunksInStates(result); + getTestManager().createChunksInStates(state); // test getTestManager().runMaintenancePass(); @@ -122,7 +125,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC // verify // nothing ever queued -> nothing ever sent to queue getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, expectedLatchCount); - verifyWorkChunkFinalStates(result); + assertEquals(SECOND_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId()); + verifyWorkChunkFinalStates(state); } /** @@ -167,15 +171,16 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC // setup PointcutLatch sendingLatch = getTestManager().disableWorkChunkMessageHandler(); sendingLatch.setExpectedCount(2); - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); - getTestManager().createChunksInStates(result); + JobMaintenanceStateInformation state = setupGatedWorkChunkTransitionTest(theChunkState, true); + getTestManager().createChunksInStates(state); // test getTestManager().runMaintenancePass(); // verify getTestManager().verifyWorkChunkMessageHandlerCalled(sendingLatch, 2); - verifyWorkChunkFinalStates(result); + assertEquals(LAST_STEP_ID, getJobInstanceFromState(state).getCurrentGatedStepId()); + verifyWorkChunkFinalStates(state); } @Test @@ -222,4 +227,8 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) { theStateInformation.verifyFinalStates(getTestManager().getSvc()); } + + private JobInstance getJobInstanceFromState(JobMaintenanceStateInformation state) { + return getTestManager().freshFetchJobInstance(state.getInstanceId()); + } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java index 9b9d2517671..1c775b364cb 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkStatusEnum.java @@ -31,7 +31,6 @@ import java.util.Set; * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md */ public enum WorkChunkStatusEnum { - // wipmb For 6.8 Add WAITING for gated, and READY for in db, but not yet sent to channel. GATE_WAITING, READY, QUEUED,