diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java index 0875f47a678..a781d3f510c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java @@ -164,7 +164,7 @@ public class Batch2WorkChunkEntity implements Serializable { } public static Batch2WorkChunkEntity fromWorkChunk(WorkChunk theWorkChunk) { - return new Batch2WorkChunkEntity( + Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity( theWorkChunk.getId(), theWorkChunk.getSequence(), theWorkChunk.getJobDefinitionId(), @@ -181,6 +181,9 @@ public class Batch2WorkChunkEntity implements Serializable { theWorkChunk.getRecordsProcessed(), theWorkChunk.getWarningMessage() ); + entity.setSerializedData(theWorkChunk.getData()); + + return entity; } public int getErrorCount() { diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index 303b5b2882a..4ffc4a05b2f 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -26,19 +26,19 @@ import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; import ca.uhn.fhir.batch2.model.JobDefinition; -import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; -import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.StopWatch; -import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation; +import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType; import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType; import jakarta.annotation.Nonnull; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Nested; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,21 +48,20 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Specification tests for batch2 storage and event system. * These tests are abstract, and do not depend on JPA. * Test setups should use the public batch2 api to create scenarios. */ -public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants { +public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMaintenanceActions, IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkCommon, WorkChunkTestConstants { private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class); @@ -107,9 +106,43 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa @AfterEach public void after() { myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER); + + // re-enable our runner after every test (just in case) myMaintenanceService.enableMaintenancePass(true); } + @Nested + class WorkChunkStorage implements IWorkChunkStorageTests { + + @Override + public IWorkChunkCommon getTestManager() { + return AbstractIJobPersistenceSpecificationTest.this; + } + + @Nested + class StateTransitions implements IWorkChunkStateTransitions { + + @Override + public IWorkChunkCommon getTestManager() { + return AbstractIJobPersistenceSpecificationTest.this; + } + + @Nested + class ErrorActions implements IWorkChunkErrorActionsTests { + + @Override + public IWorkChunkCommon getTestManager() { + return AbstractIJobPersistenceSpecificationTest.this; + } + } + } + } + + @Override + public IWorkChunkCommon getTestManager() { + return this; + } + @Nonnull public JobInstance createInstance(JobDefinition theJobDefinition) { JobDefinition jobDefinition = theJobDefinition == null ? withJobDefinition(false) @@ -185,50 +218,24 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IJobMa return chunkId; } + public String createChunk(String theInstanceId) { + return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theInstanceId, 0, CHUNK_DATA); + } + public void enableMaintenanceRunner(boolean theToEnable) { myMaintenanceService.enableMaintenancePass(theToEnable); } - public BatchJobSender getBatchJobSender() { - return myBatchJobSender; + public void disableWorkChunkMessageHandler() { + doNothing().when(myBatchJobSender).sendWorkChannelMessage(any(JobWorkNotification.class)); + } + + public void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) { + verify(myBatchJobSender, times(theNumberOfTimes)) + .sendWorkChannelMessage(any(JobWorkNotification.class)); } public void createChunksInStates(JobMaintenanceStateInformation theJobMaintenanceStateInformation) { - // should have as many input workchunks as output workchunks - // unless we have newly created ones somewhere - assertEquals(theJobMaintenanceStateInformation.getInitialWorkChunks().size(), theJobMaintenanceStateInformation.getFinalWorkChunk().size()); - - Set stepIds = new HashSet<>(); - for (int i = 0; i < theJobMaintenanceStateInformation.getInitialWorkChunks().size(); i++) { - WorkChunk workChunk = theJobMaintenanceStateInformation.getInitialWorkChunks().get(i); - WorkChunk saved = mySvc.createWorkChunk(workChunk); - ourLog.info("Created WorkChunk: " + saved.toString()); - workChunk.setId(saved.getId()); - - theJobMaintenanceStateInformation.getFinalWorkChunk().get(i) - .setId(saved.getId()); - - stepIds.add(workChunk.getTargetStepId()); - } - // if it's a gated job, we'll manually set the step id for the instance - JobDefinition jobDef = theJobMaintenanceStateInformation.getJobDefinition(); - if (jobDef.isGatedExecution()) { - AtomicReference latestStepId = new AtomicReference<>(); - int totalSteps = jobDef.getSteps().size(); - for (int i = totalSteps - 1; i >= 0; i--) { - JobDefinitionStep step = jobDef.getSteps().get(i); - if (stepIds.contains(step.getStepId())) { - latestStepId.set(step.getStepId()); - break; - } - } - // should def have a value - assertNotNull(latestStepId.get()); - String instanceId = theJobMaintenanceStateInformation.getInstanceId(); - mySvc.updateInstance(instanceId, instance -> { - instance.setCurrentGatedStepId(latestStepId.get()); - return true; - }); - } + theJobMaintenanceStateInformation.initialize(mySvc); } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java index a03ee9c95c8..c2ba9eed7be 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IJobMaintenanceActions.java @@ -1,42 +1,27 @@ package ca.uhn.hapi.fhir.batch2.test; -import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobWorkNotification; -import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.hapi.fhir.batch2.test.models.JobMaintenanceStateInformation; +import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestConstants { Logger ourLog = LoggerFactory.getLogger(IJobMaintenanceActions.class); - void enableMaintenanceRunner(boolean theToEnable); - - void createChunksInStates(JobMaintenanceStateInformation theInitialState); - - BatchJobSender getBatchJobSender(); - @Test default void test_gatedJob_stepReady_advances() { - // given + // setup String initialState = """ # chunks ready - move to queued 1|COMPLETED @@ -44,18 +29,16 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 2|READY,2|QUEUED """; enableMaintenanceRunner(false); + disableWorkChunkMessageHandler(); JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(initialState, true); - - // setup createChunksInStates(result); - // TEST run job maintenance - force transition - doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class)); - enableMaintenanceRunner(true); + // test runMaintenancePass(); // verify verifyWorkChunkFinalStates(result); + verifyWorkChunkMessageHandlerCalled(2); } @ParameterizedTest @@ -120,21 +103,20 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC """ }) default void testGatedStep2NotReady_notAdvance(String theChunkState) { - // given + // setup enableMaintenanceRunner(false); + disableWorkChunkMessageHandler(); JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); - // setup createChunksInStates(result); - // TEST run job maintenance - force transition - enableMaintenanceRunner(true); - lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class)); - + // test runMaintenancePass(); // verify verifyWorkChunkFinalStates(result); + // nothing ever queued -> nothing ever sent to queue + verifyWorkChunkMessageHandlerCalled(0); } @ParameterizedTest @@ -163,25 +145,25 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC """ }) default void testGatedStep2ReadyToAdvance_advanceToStep3(String theChunkState) { - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); - // setup enableMaintenanceRunner(false); + disableWorkChunkMessageHandler(); + JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, true); createChunksInStates(result); - // TEST run job maintenance - force transition - enableMaintenanceRunner(true); - lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class)); - + // test runMaintenancePass(); // verify verifyWorkChunkFinalStates(result); + // things are being set to READY; is anything being queued? + verifyWorkChunkMessageHandlerCalled(0); } - @ParameterizedTest - @ValueSource(strings = { - """ + @Test + default void test_ungatedJob_advancesSteps() { + // setup + String state = """ # READY chunks should transition; others should stay 1|COMPLETED 2|READY,2|QUEUED @@ -189,31 +171,28 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC 2|COMPLETED 2|IN_PROGRESS 3|IN_PROGRESS - """ - }) - default void test_ungatedJob_advancesSteps(String theChunkState) { - JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(theChunkState, false); + """; + JobMaintenanceStateInformation result = setupGatedWorkChunkTransitionTest(state, false); - // setup enableMaintenanceRunner(false); + disableWorkChunkMessageHandler(); createChunksInStates(result); // TEST run job maintenance - force transition enableMaintenanceRunner(true); - lenient().doNothing().when(getBatchJobSender()).sendWorkChannelMessage(any(JobWorkNotification.class)); runMaintenancePass(); // verify verifyWorkChunkFinalStates(result); + verifyWorkChunkMessageHandlerCalled(2); } private JobMaintenanceStateInformation setupGatedWorkChunkTransitionTest(String theChunkState, boolean theIsGated) { // get the job def and store the instance JobDefinition definition = withJobDefinition(theIsGated); String instanceId = createAndStoreJobInstance(definition); - JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition); - stateInformation.initialize(theChunkState); + JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, definition, theChunkState); ourLog.info("Starting test case \n {}", theChunkState); // display comments if there are any @@ -222,28 +201,6 @@ public interface IJobMaintenanceActions extends IWorkChunkCommon, WorkChunkTestC } private void verifyWorkChunkFinalStates(JobMaintenanceStateInformation theStateInformation) { - assertEquals(theStateInformation.getInitialWorkChunks().size(), theStateInformation.getFinalWorkChunk().size()); - - HashMap workchunkMap = new HashMap<>(); - for (WorkChunk fs : theStateInformation.getFinalWorkChunk()) { - workchunkMap.put(fs.getId(), fs); - } - - // fetch all workchunks - Iterator workChunkIterator = getSvc().fetchAllWorkChunksIterator(theStateInformation.getInstanceId(), true); - List workchunks = new ArrayList<>(); - workChunkIterator.forEachRemaining(workchunks::add); - - assertEquals(workchunks.size(), workchunkMap.size()); - workchunks.forEach(c -> ourLog.info("Returned " + c.toString())); - - for (WorkChunk wc : workchunks) { - WorkChunk expected = workchunkMap.get(wc.getId()); - assertNotNull(expected); - - // verify status and step id - assertEquals(expected.getTargetStepId(), wc.getTargetStepId()); - assertEquals(expected.getStatus(), wc.getStatus()); - } + theStateInformation.verifyFinalStates(getSvc()); } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java index 6738e3307aa..010006d7664 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java @@ -1,40 +1,74 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.channel.BatchJobSender; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; public interface IWorkChunkCommon extends WorkChunkTestConstants { - String createAndStoreJobInstance(JobDefinition theJobDefinition); - String createAndDequeueWorkChunk(String theJobInstanceId); + /** + * Returns the concrete class that is implementing this stuff. + * Used primarily for structure + */ + IWorkChunkCommon getTestManager(); - WorkChunk freshFetchWorkChunk(String theChunkId); + default String createAndStoreJobInstance(JobDefinition theJobDefinition) { + return getTestManager().createAndStoreJobInstance(theJobDefinition); + } - JobInstance createInstance(); + default String createAndDequeueWorkChunk(String theJobInstanceId) { + return getTestManager().createAndDequeueWorkChunk(theJobInstanceId); + } - String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData); + default WorkChunk freshFetchWorkChunk(String theChunkId) { + return getTestManager().freshFetchWorkChunk(theChunkId); + } - void runInTransaction(Runnable theRunnable); + default JobInstance createInstance() { + return getTestManager().createInstance(); + } - public void sleepUntilTimeChanges(); + default String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + return getTestManager().storeWorkChunk(theJobDefinitionId, theTargetStepId, theInstanceId, theSequence, theSerializedData); + } - JobDefinition withJobDefinition(boolean theIsGatedJob); + default void runInTransaction(Runnable theRunnable) { + getTestManager().runInTransaction(theRunnable); + } - TransactionTemplate newTxTemplate(); + default void sleepUntilTimeChanges() { + getTestManager().sleepUntilTimeChanges(); + } - JobInstance freshFetchJobInstance(String theInstanceId); + default JobDefinition withJobDefinition(boolean theIsGatedJob) { + return getTestManager().withJobDefinition(theIsGatedJob); + } - void runMaintenancePass(); + default TransactionTemplate newTxTemplate() { + return getTestManager().newTxTemplate(); + } - PlatformTransactionManager getTransactionManager(); + default JobInstance freshFetchJobInstance(String theInstanceId) { + return getTestManager().freshFetchJobInstance(theInstanceId); + } - IJobPersistence getSvc(); + default void runMaintenancePass() { + getTestManager().runMaintenancePass(); + } + + default PlatformTransactionManager getTransactionManager() { + return getTestManager().getTransactionManager(); + } + + default IJobPersistence getSvc() { + return getTestManager().getSvc(); + } /** * This assumes a creation of JOB_DEFINITION already @@ -42,6 +76,36 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants { * @return */ default String createChunk(String theJobInstanceId) { - return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theJobInstanceId, 0, CHUNK_DATA); + return getTestManager().createChunk(theJobInstanceId); + } + + /** + * Enable/disable the maintenance runner (So it doesn't run on a scheduler) + */ + default void enableMaintenanceRunner(boolean theToEnable) { + getTestManager().enableMaintenanceRunner(theToEnable); + } + + /** + * Uses the JobMaintenanceState information and the format: + * "step_number|initialstate,step_number|finalstate" to construct + * an initial state for a test scenario + */ + default void createChunksInStates(JobMaintenanceStateInformation theInitialState) { + getTestManager().createChunksInStates(theInitialState); + } + + /** + * Disables the workchunk message handler + * so that we do not actually send messages to the queue; + * useful if mocking state transitions and we don't want to test + * dequeuing. + */ + default void disableWorkChunkMessageHandler() { + getTestManager().disableWorkChunkMessageHandler(); + } + + default void verifyWorkChunkMessageHandlerCalled(int theNumberOfTimes) { + getTestManager().verifyWorkChunkMessageHandlerCalled(theNumberOfTimes); } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java index dd4c98c584a..59f117bfe09 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java @@ -1,22 +1,29 @@ package ca.uhn.hapi.fhir.batch2.test; +import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.hapi.fhir.batch2.test.support.JobMaintenanceStateInformation; import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants { @@ -26,28 +33,75 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC String instanceId = getSvc().storeNewInstance(instance); String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); - runMaintenancePass(); - WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); - assertNull(chunk.getData()); + runInTransaction(() -> { + WorkChunk chunk = freshFetchWorkChunk(id); + assertNull(chunk.getData()); + }); } - // fixme add test - testWorkChunkCreate_inReady - - // fixme add test - testNonGatedWorkChunkInReady_IsQueuedDuringMaintentaince - @Test - default void testStoreAndFetchWorkChunk_WithData() { + default void testWorkChunkCreate_inReadyState() { JobInstance instance = createInstance(); String instanceId = getSvc().storeNewInstance(instance); + enableMaintenanceRunner(false); + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); assertNotNull(id); - // fixme drop this verify against READY state. + + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus())); + } + + @Test + default void testNonGatedWorkChunkInReady_IsQueuedDuringMaintenance() { + // setup + enableMaintenanceRunner(false); + disableWorkChunkMessageHandler(); + String state = "1|READY,1|QUEUED"; + JobDefinition jobDefinition = withJobDefinition(false); + String instanceId = createAndStoreJobInstance(jobDefinition); + JobMaintenanceStateInformation stateInformation = new JobMaintenanceStateInformation(instanceId, jobDefinition, state); + + createChunksInStates(stateInformation); + String id = stateInformation.getInitialWorkChunks().stream().findFirst().orElseThrow().getId(); + + // verify created in ready + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(id).getStatus())); + + // test runMaintenancePass(); + + // verify it's in QUEUED now + stateInformation.verifyFinalStates(getSvc()); + verifyWorkChunkMessageHandlerCalled(1); + } + + @Test + default void testStoreAndFetchWorkChunk_WithData() { + // setup + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + JobDefinition jobDefinition = withJobDefinition(false); + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + + // we're not transitioning this state; we're just checking storage of data + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(instanceId, jobDefinition, "1|QUEUED"); + info.addWorkChunkModifier((chunk) -> { + chunk.setData(CHUNK_DATA); + }); + + createChunksInStates(info); + String id = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId(); + + // verify created in QUEUED runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus())); + // test; manually dequeue chunk WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); + + // verify assertEquals(36, chunk.getInstanceId().length()); assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); @@ -59,69 +113,56 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC @Test default void testMarkChunkAsCompleted_Success() { - JobInstance instance = createInstance(); - String instanceId = getSvc().storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA); - assertNotNull(chunkId); + // setup + String state = "2|IN_PROGRESS,2|COMPLETED"; + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); - runMaintenancePass(); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus())); - sleepUntilTimeChanges(); + JobDefinition jobDefinition = withJobDefinition(false); + String instanceId = createAndStoreJobInstance(jobDefinition); + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation(instanceId, jobDefinition, state); + info.addWorkChunkModifier(chunk -> { + chunk.setCreateTime(new Date()); + chunk.setData(CHUNK_DATA); + }); + createChunksInStates(info); - WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - assertNotNull(chunk.getCreateTime()); - assertNotNull(chunk.getStartTime()); - assertNull(chunk.getEndTime()); - assertNull(chunk.getRecordsProcessed()); - assertNotNull(chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus())); - - sleepUntilTimeChanges(); + String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId(); + // run test runInTransaction(() -> getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0))); + // verify + info.verifyFinalStates(getSvc()); WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); assertEquals(50, entity.getRecordsProcessed()); assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); assertNull(entity.getData()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); } @Test default void testMarkChunkAsCompleted_Error() { - JobInstance instance = createInstance(); - String instanceId = getSvc().storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null); - assertNotNull(chunkId); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus())); - runMaintenancePass(); - sleepUntilTimeChanges(); - - WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - - sleepUntilTimeChanges(); + // setup + String state = "1|IN_PROGRESS,1|ERRORED"; + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + JobDefinition jobDef = withJobDefinition(false); + String instanceId = createAndStoreJobInstance(jobDef); + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( + instanceId, jobDef, state + ); + createChunksInStates(info); + String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId(); + // test WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A); getSvc().onWorkChunkError(request); runInTransaction(() -> { WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); assertEquals(1, entity.getErrorCount()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); }); // Mark errored again @@ -132,68 +173,64 @@ public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestC WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); assertEquals("This is an error message 2", entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); assertEquals(2, entity.getErrorCount()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); }); List chunks = ImmutableList.copyOf(getSvc().fetchAllWorkChunksIterator(instanceId, true)); assertEquals(1, chunks.size()); assertEquals(2, chunks.get(0).getErrorCount()); + + info.verifyFinalStates(getSvc()); } @Test default void testMarkChunkAsCompleted_Fail() { - JobInstance instance = createInstance(); - String instanceId = getSvc().storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null); - assertNotNull(chunkId); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus())); - runMaintenancePass(); - sleepUntilTimeChanges(); - - WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - - sleepUntilTimeChanges(); + // setup + String state = "1|IN_PROGRESS,1|FAILED"; + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + JobDefinition jobDef = withJobDefinition(false); + String instanceId = createAndStoreJobInstance(jobDef); + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( + instanceId, jobDef, state + ); + createChunksInStates(info); + String chunkId = info.getInitialWorkChunks().stream().findFirst().orElseThrow().getId(); + // test getSvc().onWorkChunkFailed(chunkId, "This is an error message"); + + // verify runInTransaction(() -> { WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); assertEquals("This is an error message", entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); }); + + info.verifyFinalStates(getSvc()); } @Test default void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() { - JobInstance instance = createInstance(); - String instanceId = getSvc().storeNewInstance(instance); - ArrayList chunkIds = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - WorkChunkCreateEvent chunk = new WorkChunkCreateEvent( - "defId", - 1, - "stepId", - instanceId, - 0, - "{}" - ); - String id = getSvc().onWorkChunkCreate(chunk); - chunkIds.add(id); - } + // setup + String state = """ + 1|IN_PROGRESS,1|COMPLETED + 1|ERRORED,1|COMPLETED + 1|QUEUED,1|COMPLETED + 1|IN_PROGRESS,1|COMPLETED + """; + disableWorkChunkMessageHandler(); + enableMaintenanceRunner(false); + JobDefinition jobDef = withJobDefinition(false); + String instanceId = createAndStoreJobInstance(jobDef); + JobMaintenanceStateInformation info = new JobMaintenanceStateInformation( + instanceId, jobDef, state + ); + createChunksInStates(info); + List chunkIds = info.getInitialWorkChunks().stream().map(WorkChunk::getId) + .collect(Collectors.toList()); - runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null)); + runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instanceId, chunkIds, WorkChunkStatusEnum.COMPLETED, null)); Iterator reducedChunks = getSvc().fetchAllWorkChunksIterator(instanceId, true); diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/models/JobMaintenanceStateInformation.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java similarity index 56% rename from hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/models/JobMaintenanceStateInformation.java rename to hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java index 5e67250f844..dcfb255d6ea 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/models/JobMaintenanceStateInformation.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/support/JobMaintenanceStateInformation.java @@ -1,5 +1,6 @@ -package ca.uhn.hapi.fhir.batch2.test.models; +package ca.uhn.hapi.fhir.batch2.test.support; +import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.WorkChunk; @@ -9,13 +10,19 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.UUID; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class JobMaintenanceStateInformation { @@ -33,9 +40,20 @@ public class JobMaintenanceStateInformation { private final String myInstanceId; - public JobMaintenanceStateInformation(String theInstanceId, JobDefinition theJobDefinition) { + private Consumer myWorkChunkModifier = (chunk) -> {}; + + public JobMaintenanceStateInformation( + String theInstanceId, + JobDefinition theJobDefinition, + String theStateUnderTest) { myInstanceId = theInstanceId; myJobDefinition = theJobDefinition; + + setState(theStateUnderTest); + } + + public void addWorkChunkModifier(Consumer theModifier) { + myWorkChunkModifier = theModifier; } public List getLineComments() { @@ -58,7 +76,73 @@ public class JobMaintenanceStateInformation { return myInstanceId; } - public void initialize(String theState) { + public void verifyFinalStates(IJobPersistence theJobPersistence) { + assertEquals(getInitialWorkChunks().size(), getFinalWorkChunk().size()); + + HashMap workchunkMap = new HashMap<>(); + for (WorkChunk fs : getFinalWorkChunk()) { + workchunkMap.put(fs.getId(), fs); + } + + // fetch all workchunks + Iterator workChunkIterator = theJobPersistence.fetchAllWorkChunksIterator(getInstanceId(), true); + List workchunks = new ArrayList<>(); + workChunkIterator.forEachRemaining(workchunks::add); + + assertEquals(workchunks.size(), workchunkMap.size()); + workchunks.forEach(c -> ourLog.info("Returned " + c.toString())); + + for (WorkChunk wc : workchunks) { + WorkChunk expected = workchunkMap.get(wc.getId()); + assertNotNull(expected); + + // verify status and step id + assertEquals(expected.getTargetStepId(), wc.getTargetStepId()); + assertEquals(expected.getStatus(), wc.getStatus()); + } + } + + public void initialize(IJobPersistence theJobPersistence) { + // should have as many input workchunks as output workchunks + // unless we have newly created ones somewhere + assertEquals(getInitialWorkChunks().size(), getFinalWorkChunk().size()); + + Set stepIds = new HashSet<>(); + for (int i = 0; i < getInitialWorkChunks().size(); i++) { + WorkChunk workChunk = getInitialWorkChunks().get(i); + myWorkChunkModifier.accept(workChunk); + WorkChunk saved = theJobPersistence.createWorkChunk(workChunk); + ourLog.info("Created WorkChunk: " + saved.toString()); + workChunk.setId(saved.getId()); + + getFinalWorkChunk().get(i) + .setId(saved.getId()); + + stepIds.add(workChunk.getTargetStepId()); + } + // if it's a gated job, we'll manually set the step id for the instance + JobDefinition jobDef = getJobDefinition(); + if (jobDef.isGatedExecution()) { + AtomicReference latestStepId = new AtomicReference<>(); + int totalSteps = jobDef.getSteps().size(); + for (int i = totalSteps - 1; i >= 0; i--) { + JobDefinitionStep step = jobDef.getSteps().get(i); + if (stepIds.contains(step.getStepId())) { + latestStepId.set(step.getStepId()); + break; + } + } + // should def have a value + assertNotNull(latestStepId.get()); + String instanceId = getInstanceId(); + theJobPersistence.updateInstance(instanceId, instance -> { + instance.setCurrentGatedStepId(latestStepId.get()); + return true; + }); + } + } + + private void setState(String theState) { String[] chunkLines = theState.split("\n"); Pattern pattern = Pattern.compile(COMMENT_PATTERN); for (String chunkLine : chunkLines) { @@ -117,10 +201,10 @@ public class JobMaintenanceStateInformation { String stepId = getJobStepId(parts[0]); WorkChunkStatusEnum initialStatus = WorkChunkStatusEnum.valueOf(parts[1].trim()); - WorkChunk initial = createBaseWorkChunk(); - initial.setStatus(initialStatus); - initial.setTargetStepId(stepId); - theAdder.accept(initial); + WorkChunk chunk = createBaseWorkChunk(); + chunk.setStatus(initialStatus); + chunk.setTargetStepId(stepId); + theAdder.accept(chunk); } private String getJobStepId(String theIndexId) { @@ -150,7 +234,6 @@ public class JobMaintenanceStateInformation { private WorkChunk createBaseWorkChunk() { WorkChunk chunk = new WorkChunk(); -// chunk.setId(UUID.randomUUID().toString()); chunk.setJobDefinitionId(myJobDefinition.getJobDefinitionId()); chunk.setInstanceId(myInstanceId); chunk.setJobDefinitionVersion(myJobDefinition.getJobDefinitionVersion());