diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 010b17c6739..aa55af1e827 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -1,12 +1,15 @@ package ca.uhn.fhir.jpa.batch2; +import ca.uhn.fhir.batch2.api.IJobMaintenanceService; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; @@ -28,6 +31,7 @@ import ca.uhn.fhir.testjob.models.FirstStepOutput; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest; import ca.uhn.hapi.fhir.batch2.test.configs.SpyOverrideConfig; +import ca.uhn.test.concurrency.PointcutLatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import jakarta.annotation.Nonnull; @@ -70,6 +74,12 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @TestMethodOrder(MethodOrderer.MethodName.class) @ContextConfiguration(classes = { @@ -97,12 +107,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Autowired public Batch2JobHelper myBatch2JobHelper; + // this is our spy + @Autowired + private BatchJobSender myBatchSender; + + @Autowired + private IJobMaintenanceService myMaintenanceService; + @Autowired public JobDefinitionRegistry myJobDefinitionRegistry; @AfterEach public void after() { myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER); + myMaintenanceService.enableMaintenancePass(true); } @Test @@ -243,10 +261,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @ParameterizedTest @MethodSource("provideStatuses") - public void testStartChunkOnlyWorksOnValidChunks(WorkChunkStatusEnum theStatus, boolean theShouldBeStartedByConsumer) { + public void testStartChunkOnlyWorksOnValidChunks(WorkChunkStatusEnum theStatus, boolean theShouldBeStartedByConsumer) throws InterruptedException { // Setup JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); +// PointcutLatch latch = new PointcutLatch("senderlatch"); +// doAnswer(a -> { +// latch.call(1); +// return Void.class; +// }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); +// latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); + storeWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, FIRST_STEP_ID, instanceId, 0, CHUNK_DATA, false); String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk); @@ -260,7 +286,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { // Verify boolean chunkStarted = workChunk.isPresent(); - assertEquals(chunkStarted, theShouldBeStartedByConsumer); + assertEquals(theShouldBeStartedByConsumer, chunkStarted); + verify(myBatchSender, never()) + .sendWorkChannelMessage(any()); } @Test @@ -450,11 +478,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { "false, READY, QUEUED", "true, GATE_WAITING, QUEUED" }) - public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) { + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionNoData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedStatusOnCreate, WorkChunkStatusEnum theExpectedStatusAfterTransition) throws InterruptedException { // setup JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); + // when + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); + // execute & verify String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); // mark the first chunk as COMPLETED to allow step advance @@ -468,16 +505,27 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); // assert null since we did not input any data when creating the chunks assertNull(chunk.getData()); + + latch.awaitExpected(); + verify(myBatchSender).sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test - public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() { + public void testStoreAndFetchWorkChunk_withGatedJobMultipleChunk_correctTransitions() throws InterruptedException { // setup boolean isGatedExecution = true; String expectedFirstChunkData = "IAmChunk1"; String expectedSecondChunkData = "IAmChunk2"; JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(2); + myMaintenanceService.enableMaintenancePass(false); // execute & verify String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, expectedFirstChunkData); @@ -516,6 +564,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { WorkChunk actualSecondChunkData = mySvc.onWorkChunkDequeue(secondChunkId).orElseThrow(IllegalArgumentException::new); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(secondChunkId).getStatus())); assertEquals(expectedSecondChunkData, actualSecondChunkData.getData()); + + latch.awaitExpected(); + verify(myBatchSender, times(2)) + .sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test @@ -562,7 +615,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNull(workChunk.getEndTime()); assertNull(workChunk.getErrorMessage()); assertEquals(0, workChunk.getErrorCount()); - assertEquals(null, workChunk.getRecordsProcessed()); + assertNull(workChunk.getRecordsProcessed()); } { @@ -570,7 +623,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(WorkChunkStatusEnum.ERRORED, workChunk1.getStatus()); assertEquals("Our error message", workChunk1.getErrorMessage()); assertEquals(1, workChunk1.getErrorCount()); - assertEquals(null, workChunk1.getRecordsProcessed()); + assertNull(workChunk1.getRecordsProcessed()); assertNotNull(workChunk1.getEndTime()); } @@ -582,7 +635,6 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNull(workChunk2.getErrorMessage()); assertEquals(0, workChunk2.getErrorCount()); } - } @ParameterizedTest @@ -590,10 +642,17 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { "false, READY, QUEUED", "true, GATE_WAITING, QUEUED" }) - public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) { + public void testStoreAndFetchWorkChunk_withOrWithoutGatedExecutionwithData_createdAndTransitionToExpectedStatus(boolean theGatedExecution, WorkChunkStatusEnum theExpectedCreatedStatus, WorkChunkStatusEnum theExpectedTransitionStatus) throws InterruptedException { // setup JobInstance instance = createInstance(true, theGatedExecution); String instanceId = mySvc.storeNewInstance(instance); + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); // execute & verify String firstChunkId = storeFirstWorkChunk(JOB_DEFINITION_ID, FIRST_STEP_ID, instanceId, 0, null); @@ -614,15 +673,25 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertEquals(CHUNK_DATA, chunk.getData()); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, findChunkByIdOrThrow(id).getStatus())); + latch.awaitExpected(); + verify(myBatchSender).sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test - public void testMarkChunkAsCompleted_Success() { + public void testMarkChunkAsCompleted_Success() throws InterruptedException { boolean isGatedExecution = false; JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA, isGatedExecution); assertNotNull(chunkId); + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); @@ -652,11 +721,22 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); }); + latch.awaitExpected(); + verify(myBatchSender).sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test public void testMarkChunkAsCompleted_Error() { boolean isGatedExecution = false; + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); + JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); String chunkId = storeWorkChunk(JOB_DEFINITION_ID, TestJobDefinitionUtils.FIRST_STEP_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); @@ -705,15 +785,25 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { List chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true)); assertEquals(1, chunks.size()); assertEquals(2, chunks.get(0).getErrorCount()); + + verify(myBatchSender).sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test - public void testMarkChunkAsCompleted_Fail() { + public void testMarkChunkAsCompleted_Fail() throws InterruptedException { boolean isGatedExecution = false; JobInstance instance = createInstance(true, isGatedExecution); String instanceId = mySvc.storeNewInstance(instance); String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null, isGatedExecution); assertNotNull(chunkId); + PointcutLatch latch = new PointcutLatch("senderlatch"); + doAnswer(a -> { + latch.call(1); + return Void.class; + }).when(myBatchSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + latch.setExpectedCount(1); + myMaintenanceService.enableMaintenancePass(false); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, findChunkByIdOrThrow(chunkId).getStatus())); myBatch2JobHelper.runMaintenancePass(); @@ -736,6 +826,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); }); + latch.awaitExpected(); + verify(myBatchSender) + .sendWorkChannelMessage(any()); + clearInvocations(myBatchSender); } @Test diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index ab7affed055..870d3f52a60 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -39,7 +39,6 @@ import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType; import ca.uhn.test.concurrency.PointcutLatch; import jakarta.annotation.Nonnull; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -65,7 +64,8 @@ import static org.mockito.Mockito.verify; * These tests are abstract, and do not depend on JPA. * Test setups should use the public batch2 api to create scenarios. */ -public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, ITestFixture, IWorkChunkCommon, WorkChunkTestConstants { +public abstract class AbstractIJobPersistenceSpecificationTest + implements ITestFixture, IWorkChunkCommon, WorkChunkTestConstants, IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions { private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class);