diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index c2cdca398fb..f453fc737ab 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -18,6 +18,7 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.jpa.test.Batch2JobHelper; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest; import com.google.common.collect.ImmutableList; @@ -77,6 +78,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Autowired private IBatch2JobInstanceRepository myJobInstanceRepository; + @Autowired + public Batch2JobHelper myBatch2JobHelper; + @Test public void testDeleteInstance() { // Setup @@ -323,14 +327,19 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { class Batch2SpecTest extends AbstractIJobPersistenceSpecificationTest { @Override - protected PlatformTransactionManager getTxManager() { + public PlatformTransactionManager getTxManager() { return JpaJobPersistenceImplTest.this.getTxManager(); } @Override - protected WorkChunk freshFetchWorkChunk(String chunkId) { + public WorkChunk freshFetchWorkChunk(String chunkId) { return JpaJobPersistenceImplTest.this.freshFetchWorkChunk(chunkId); } + + @Override + public void runMaintenancePass() { + myBatch2JobHelper.runMaintenancePass(); + } } @Test diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index a85f58c6d7b..d90129dea19 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -23,29 +23,18 @@ package ca.uhn.hapi.fhir.batch2.test; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.RunOutcome; import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; -import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator; -import ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor; import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; -import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; -import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; -import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.StopWatch; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep2InputType; import ca.uhn.hapi.fhir.batch2.test.support.TestJobStep3InputType; -import com.google.common.collect.ImmutableList; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import jakarta.annotation.Nonnull; +import org.junit.jupiter.api.AfterEach; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; @@ -53,572 +42,35 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; -import jakarta.annotation.Nonnull; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.concurrent.Callable; import static org.awaitility.Awaitility.await; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.emptyString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; /** * Specification tests for batch2 storage and event system. * These tests are abstract, and do not depend on JPA. * Test setups should use the public batch2 api to create scenarios. */ -public abstract class AbstractIJobPersistenceSpecificationTest { - private static final Logger ourLog = LoggerFactory.getLogger(AbstractIJobPersistenceSpecificationTest.class); - - public static final String JOB_DEFINITION_ID = "definition-id"; - public static final String TARGET_STEP_ID = "step-id"; - public static final String DEF_CHUNK_ID = "definition-chunkId"; - public static final String STEP_CHUNK_ID = "step-chunkId"; - public static final int JOB_DEF_VER = 1; - public static final int SEQUENCE_NUMBER = 1; - public static final String CHUNK_DATA = "{\"key\":\"value\"}"; - public static final String ERROR_MESSAGE_A = "This is an error message: A"; - public static final String ERROR_MESSAGE_B = "This is a different error message: B"; - public static final String ERROR_MESSAGE_C = "This is a different error message: C"; +public abstract class AbstractIJobPersistenceSpecificationTest implements IInProgressActionsTests, IInstanceStateTransitions, IWorkChunkStateTransitions, IWorkChunkStorageTests, IWorkChunkErrorActionsTests, WorkChunkTestConstants { @Autowired private IJobPersistence mySvc; - @Nested - class WorkChunkStorage { + @Autowired + private JobDefinitionRegistry myJobDefinitionRegistry; - @Test - public void testStoreAndFetchWorkChunk_NoData() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); + @Autowired + private IHapiTransactionService myTransactionService; - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); - - WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); - assertNull(chunk.getData()); - } - - @Test - public void testStoreAndFetchWorkChunk_WithData() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); - - String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); - assertNotNull(id); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus())); - - WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); - assertEquals(36, chunk.getInstanceId().length()); - assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); - assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - assertEquals(CHUNK_DATA, chunk.getData()); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus())); - } - - /** - * Should match the diagram in batch2_states.md - * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md - */ - @Nested - class StateTransitions { - - private String myInstanceId; - private String myChunkId; - - @BeforeEach - void setUp() { - JobInstance jobInstance = createInstance(); - myInstanceId = mySvc.storeNewInstance(jobInstance); - - } - - private String createChunk() { - return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, myInstanceId, 0, CHUNK_DATA); - } - - @Test - public void chunkCreation_isQueued() { - - myChunkId = createChunk(); - - WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.QUEUED, fetchedWorkChunk.getStatus(), "New chunks are QUEUED"); - } - - @Test - public void chunkReceived_queuedToInProgress() { - - myChunkId = createChunk(); - - // the worker has received the chunk, and marks it started. - WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); - - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - assertEquals(CHUNK_DATA, chunk.getData()); - - // verify the db was updated too - WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus()); - } - - @Nested - class InProgressActions { - @BeforeEach - void setUp() { - // setup - the worker has received the chunk, and has marked it IN_PROGRESS. - myChunkId = createChunk(); - mySvc.onWorkChunkDequeue(myChunkId); - } - - @Test - public void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() { - - // execution ok - mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0)); - - // verify the db was updated - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus()); - assertNull(workChunkEntity.getData()); - assertEquals(3, workChunkEntity.getRecordsProcessed()); - assertNull(workChunkEntity.getErrorMessage()); - assertEquals(0, workChunkEntity.getErrorCount()); - } - - @Test - public void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() { - - // execution had a retryable error - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A)); - - // verify the db was updated - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus()); - assertEquals(ERROR_MESSAGE_A, workChunkEntity.getErrorMessage()); - assertEquals(1, workChunkEntity.getErrorCount()); - } - - @Test - public void processingFailure_inProgressToFailed() { - - // execution had a failure - mySvc.onWorkChunkFailed(myChunkId, "some error"); - - // verify the db was updated - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.FAILED, workChunkEntity.getStatus()); - assertEquals("some error", workChunkEntity.getErrorMessage()); - } - } - - @Nested - class ErrorActions { - public static final String FIRST_ERROR_MESSAGE = ERROR_MESSAGE_A; - @BeforeEach - void setUp() { - // setup - the worker has received the chunk, and has marked it IN_PROGRESS. - myChunkId = createChunk(); - mySvc.onWorkChunkDequeue(myChunkId); - // execution had a retryable error - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); - } - - /** - * The consumer will retry after a retryable error is thrown - */ - @Test - void errorRetry_errorToInProgress() { - - // when consumer restarts chunk - WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); - - // then - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - - // verify the db state, error message, and error count - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, workChunkEntity.getStatus()); - assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Original error message kept"); - assertEquals(1, workChunkEntity.getErrorCount(), "error count kept"); - } - - @Test - void errorRetry_repeatError_increasesErrorCount() { - // setup - the consumer is re-trying, and marks it IN_PROGRESS - mySvc.onWorkChunkDequeue(myChunkId); - - - // when another error happens - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); - - - // verify the state, new message, and error count - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus()); - assertEquals(ERROR_MESSAGE_B, workChunkEntity.getErrorMessage(), "new error message"); - assertEquals(2, workChunkEntity.getErrorCount(), "error count inc"); - } - - @Test - void errorThenRetryAndComplete_addsErrorCounts() { - // setup - the consumer is re-trying, and marks it IN_PROGRESS - mySvc.onWorkChunkDequeue(myChunkId); - - // then it completes ok. - mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1)); - - // verify the state, new message, and error count - var workChunkEntity = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus()); - assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Error message kept."); - assertEquals(2, workChunkEntity.getErrorCount(), "error combined with earlier error"); - } - - @Test - void errorRetry_maxErrors_movesToFailed() { - // we start with 1 error already - - // 2nd try - mySvc.onWorkChunkDequeue(myChunkId); - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); - var chunk = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); - assertEquals(2, chunk.getErrorCount()); - - // 3rd try - mySvc.onWorkChunkDequeue(myChunkId); - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); - chunk = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); - assertEquals(3, chunk.getErrorCount()); - - // 4th try - mySvc.onWorkChunkDequeue(myChunkId); - mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C)); - chunk = freshFetchWorkChunk(myChunkId); - assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus()); - assertEquals(4, chunk.getErrorCount()); - assertThat("Error message contains last error", chunk.getErrorMessage(), containsString(ERROR_MESSAGE_C)); - assertThat("Error message contains error count and complaint", chunk.getErrorMessage(), containsString("many errors: 4")); - } - } - } - - @Test - public void testMarkChunkAsCompleted_Success() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA); - assertNotNull(chunkId); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus())); - - sleepUntilTimeChanges(); - - WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - assertNotNull(chunk.getCreateTime()); - assertNotNull(chunk.getStartTime()); - assertNull(chunk.getEndTime()); - assertNull(chunk.getRecordsProcessed()); - assertNotNull(chunk.getData()); - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus())); - - sleepUntilTimeChanges(); - - runInTransaction(() -> mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0))); - - WorkChunk entity = freshFetchWorkChunk(chunkId); - assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); - assertEquals(50, entity.getRecordsProcessed()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); - assertNull(entity.getData()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); - } - - - @Test - public void testMarkChunkAsCompleted_Error() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); - assertNotNull(chunkId); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus())); - - sleepUntilTimeChanges(); - - WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - - sleepUntilTimeChanges(); - - WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A); - mySvc.onWorkChunkError(request); - runInTransaction(() -> { - WorkChunk entity = freshFetchWorkChunk(chunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); - assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); - assertEquals(1, entity.getErrorCount()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); - }); - - // Mark errored again - - WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2"); - mySvc.onWorkChunkError(request2); - runInTransaction(() -> { - WorkChunk entity = freshFetchWorkChunk(chunkId); - assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); - assertEquals("This is an error message 2", entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); - assertEquals(2, entity.getErrorCount()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); - }); - - List chunks = ImmutableList.copyOf(mySvc.fetchAllWorkChunksIterator(instanceId, true)); - assertEquals(1, chunks.size()); - assertEquals(2, chunks.get(0).getErrorCount()); - } - - @Test - public void testMarkChunkAsCompleted_Fail() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); - String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); - assertNotNull(chunkId); - - runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus())); - - sleepUntilTimeChanges(); - - WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); - assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); - assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); - - sleepUntilTimeChanges(); - - mySvc.onWorkChunkFailed(chunkId, "This is an error message"); - runInTransaction(() -> { - WorkChunk entity = freshFetchWorkChunk(chunkId); - assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); - assertEquals("This is an error message", entity.getErrorMessage()); - assertNotNull(entity.getCreateTime()); - assertNotNull(entity.getStartTime()); - assertNotNull(entity.getEndTime()); - assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); - assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); - }); - } - - @Test - public void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() { - JobInstance instance = createInstance(); - String instanceId = mySvc.storeNewInstance(instance); - ArrayList chunkIds = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - WorkChunkCreateEvent chunk = new WorkChunkCreateEvent( - "defId", - 1, - "stepId", - instanceId, - 0, - "{}" - ); - String id = mySvc.onWorkChunkCreate(chunk); - chunkIds.add(id); - } - - runInTransaction(() -> mySvc.markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null)); - - Iterator reducedChunks = mySvc.fetchAllWorkChunksIterator(instanceId, true); - - while (reducedChunks.hasNext()) { - WorkChunk reducedChunk = reducedChunks.next(); - assertTrue(chunkIds.contains(reducedChunk.getId())); - assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus()); - } - } + public IHapiTransactionService getTransactionManager() { + return myTransactionService; } - /** - * Test - * * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md - */ - @Nested - class InstanceStateTransitions { - - @Test - void createInstance_createsInQueuedWithChunk() { - // given - JobDefinition jd = withJobDefinition(); - - // when - IJobPersistence.CreateResult createResult = - newTxTemplate().execute(status-> - mySvc.onCreateWithFirstChunk(jd, "{}")); - - // then - ourLog.info("job and chunk created {}", createResult); - assertNotNull(createResult); - assertThat(createResult.jobInstanceId, not(emptyString())); - assertThat(createResult.workChunkId, not(emptyString())); - - JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId); - assertThat(jobInstance.getStatus(), equalTo(StatusEnum.QUEUED)); - assertThat(jobInstance.getParameters(), equalTo("{}")); - - WorkChunk firstChunk = freshFetchWorkChunk(createResult.workChunkId); - assertThat(firstChunk.getStatus(), equalTo(WorkChunkStatusEnum.QUEUED)); - assertNull(firstChunk.getData(), "First chunk data is null - only uses parameters"); - } - - @Test - void testCreateInstance_firstChunkDequeued_movesToInProgress() { - // given - JobDefinition jd = withJobDefinition(); - IJobPersistence.CreateResult createResult = newTxTemplate().execute(status-> - mySvc.onCreateWithFirstChunk(jd, "{}")); - assertNotNull(createResult); - - // when - newTxTemplate().execute(status -> mySvc.onChunkDequeued(createResult.jobInstanceId)); - - // then - JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId); - assertThat(jobInstance.getStatus(), equalTo(StatusEnum.IN_PROGRESS)); - } - - - - @ParameterizedTest - @EnumSource(StatusEnum.class) - void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) { - // given - JobInstance cancelledInstance = createInstance(); - cancelledInstance.setStatus(theState); - String instanceId1 = mySvc.storeNewInstance(cancelledInstance); - mySvc.cancelInstance(instanceId1); - - JobInstance normalInstance = createInstance(); - normalInstance.setStatus(theState); - String instanceId2 = mySvc.storeNewInstance(normalInstance); - - JobDefinitionRegistry jobDefinitionRegistry = new JobDefinitionRegistry(); - jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition()); - - - // when - runInTransaction(()-> { - new JobInstanceProcessor( - mySvc, - null, - instanceId1, - new JobChunkProgressAccumulator(), - null, - jobDefinitionRegistry, - null // transaction manager - ).process(); - }); - - // then - JobInstance freshInstance1 = mySvc.fetchInstance(instanceId1).orElseThrow(); - if (theState.isCancellable()) { - assertEquals(StatusEnum.CANCELLED, freshInstance1.getStatus(), "cancel request processed"); - assertThat(freshInstance1.getErrorMessage(), containsString("Job instance cancelled")); - } else { - assertEquals(theState, freshInstance1.getStatus(), "cancel request ignored - state unchanged"); - assertNull(freshInstance1.getErrorMessage(), "no error message"); - } - JobInstance freshInstance2 = mySvc.fetchInstance(instanceId2).orElseThrow(); - assertEquals(theState, freshInstance2.getStatus(), "cancel request ignored - cancelled not set"); - } + public IJobPersistence getSvc() { + return mySvc; } - @Test - void testDeleteChunksAndMarkInstanceAsChunksPurged_doesWhatItSays() { - // given - JobDefinition jd = withJobDefinition(); - IJobPersistence.CreateResult createResult = newTxTemplate().execute(status-> - mySvc.onCreateWithFirstChunk(jd, "{}")); - String instanceId = createResult.jobInstanceId; - for (int i = 0; i < 10; i++) { - storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA); - } - JobInstance readback = freshFetchJobInstance(instanceId); - assertFalse(readback.isWorkChunksPurged()); - assertTrue(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "has chunk"); - - // when - mySvc.deleteChunksAndMarkInstanceAsChunksPurged(instanceId); - - // then - readback = freshFetchJobInstance(instanceId); - assertTrue(readback.isWorkChunksPurged(), "purged set"); - assertFalse(mySvc.fetchAllWorkChunksIterator(instanceId, true).hasNext(), "chunks gone"); - } - - @Test - void testInstanceUpdate_modifierApplied() { - // given - String instanceId = mySvc.storeNewInstance(createInstance()); - - // when - mySvc.updateInstance(instanceId, instance ->{ - instance.setErrorCount(42); - return true; - }); - - // then - JobInstance jobInstance = freshFetchJobInstance(instanceId); - assertEquals(42, jobInstance.getErrorCount()); - } - - @Test - void testInstanceUpdate_modifierNotAppliedWhenPredicateReturnsFalse() { - // given - JobInstance instance1 = createInstance(); - boolean initialValue = true; - instance1.setFastTracking(initialValue); - String instanceId = mySvc.storeNewInstance(instance1); - - // when - mySvc.updateInstance(instanceId, instance ->{ - instance.setFastTracking(false); - return false; - }); - - // then - JobInstance jobInstance = freshFetchJobInstance(instanceId); - assertEquals(initialValue, jobInstance.isFastTracking()); - } - - private JobDefinition withJobDefinition() { + public JobDefinition withJobDefinition() { return JobDefinition.newBuilder() .setJobDefinitionId(JOB_DEFINITION_ID) .setJobDefinitionVersion(JOB_DEF_VER) @@ -630,11 +82,20 @@ public abstract class AbstractIJobPersistenceSpecificationTest { .build(); } + @AfterEach + public void after() { + myJobDefinitionRegistry.removeJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER); + } @Nonnull - private JobInstance createInstance() { + public JobInstance createInstance() { + JobDefinition jobDefinition = withJobDefinition(); + if (myJobDefinitionRegistry.getJobDefinition(JOB_DEFINITION_ID, JOB_DEF_VER).isEmpty()) { + myJobDefinitionRegistry.addJobDefinition(jobDefinition); + } + JobInstance instance = new JobInstance(); - instance.setJobDefinitionId(JOB_DEFINITION_ID); + instance.setJobDefinitionId(jobDefinition.getJobDefinitionId()); instance.setStatus(StatusEnum.QUEUED); instance.setJobDefinitionVersion(JOB_DEF_VER); instance.setParameters(CHUNK_DATA); @@ -642,15 +103,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest { return instance; } - private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + public String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); return mySvc.onWorkChunkCreate(batchWorkChunk); } + public abstract PlatformTransactionManager getTxManager(); - protected abstract PlatformTransactionManager getTxManager(); - protected abstract WorkChunk freshFetchWorkChunk(String theChunkId); - protected JobInstance freshFetchJobInstance(String theInstanceId) { + public JobInstance freshFetchJobInstance(String theInstanceId) { return runInTransaction(() -> mySvc.fetchInstance(theInstanceId).orElseThrow()); } @@ -689,6 +149,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest { await().until(() -> sw.getMillis() > 0); } + public String createAndStoreJobInstance() { + JobInstance jobInstance = createInstance(); + return mySvc.storeNewInstance(jobInstance); + } - + public String createAndDequeueWorkChunk(String theJobInstanceId) { + String chunkId = createChunk(theJobInstanceId); + mySvc.onWorkChunkDequeue(chunkId); + return chunkId; + } } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInProgressActionsTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInProgressActionsTests.java new file mode 100644 index 00000000000..9e1db2803b1 --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInProgressActionsTests.java @@ -0,0 +1,55 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public interface IInProgressActionsTests extends IWorkChunkCommon, WorkChunkTestConstants { + + @Test + default void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + // execution ok + getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0)); + + // verify the db was updated + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus()); + assertNull(workChunkEntity.getData()); + assertEquals(3, workChunkEntity.getRecordsProcessed()); + assertNull(workChunkEntity.getErrorMessage()); + assertEquals(0, workChunkEntity.getErrorCount()); + } + + @Test + default void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + // execution had a retryable error + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A)); + + // verify the db was updated + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus()); + assertEquals(ERROR_MESSAGE_A, workChunkEntity.getErrorMessage()); + assertEquals(1, workChunkEntity.getErrorCount()); + } + + @Test + default void processingFailure_inProgressToFailed() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + // execution had a failure + getSvc().onWorkChunkFailed(myChunkId, "some error"); + + // verify the db was updated + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.FAILED, workChunkEntity.getStatus()); + assertEquals("some error", workChunkEntity.getErrorMessage()); + } +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java new file mode 100644 index 00000000000..dfea2959784 --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IInstanceStateTransitions.java @@ -0,0 +1,122 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; +import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator; +import ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.emptyString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public interface IInstanceStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants { + Logger ourLog = LoggerFactory.getLogger(IInstanceStateTransitions.class); + + @Test + default void createInstance_createsInQueuedWithChunk() { + // given + JobDefinition jd = withJobDefinition(); + + // when + IJobPersistence.CreateResult createResult = + newTxTemplate().execute(status-> + getSvc().onCreateWithFirstChunk(jd, "{}")); + + // then + ourLog.info("job and chunk created {}", createResult); + assertNotNull(createResult); + assertThat(createResult.jobInstanceId, not(emptyString())); + assertThat(createResult.workChunkId, not(emptyString())); + + JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId); + assertThat(jobInstance.getStatus(), equalTo(StatusEnum.QUEUED)); + assertThat(jobInstance.getParameters(), equalTo("{}")); + + WorkChunk firstChunk = freshFetchWorkChunk(createResult.workChunkId); + assertThat(firstChunk.getStatus(), equalTo(WorkChunkStatusEnum.READY)); + assertNull(firstChunk.getData(), "First chunk data is null - only uses parameters"); + } + + @Test + default void testCreateInstance_firstChunkDequeued_movesToInProgress() { + // given + JobDefinition jd = withJobDefinition(); + IJobPersistence.CreateResult createResult = newTxTemplate().execute(status-> + getSvc().onCreateWithFirstChunk(jd, "{}")); + assertNotNull(createResult); + + // when + newTxTemplate().execute(status -> getSvc().onChunkDequeued(createResult.jobInstanceId)); + + // then + JobInstance jobInstance = freshFetchJobInstance(createResult.jobInstanceId); + assertThat(jobInstance.getStatus(), equalTo(StatusEnum.IN_PROGRESS)); + } + + @ParameterizedTest + @EnumSource(StatusEnum.class) + default void cancelRequest_cancelsJob_whenNotFinalState(StatusEnum theState) { + // given + JobInstance cancelledInstance = createInstance(); + cancelledInstance.setStatus(theState); + String instanceId1 = getSvc().storeNewInstance(cancelledInstance); + getSvc().cancelInstance(instanceId1); + + JobInstance normalInstance = createInstance(); + normalInstance.setStatus(theState); + String instanceId2 = getSvc().storeNewInstance(normalInstance); + + JobDefinitionRegistry jobDefinitionRegistry = new JobDefinitionRegistry(); + jobDefinitionRegistry.addJobDefinitionIfNotRegistered(withJobDefinition()); + + // when + runInTransaction(()-> { + new JobInstanceProcessor( + getSvc(), + null, + instanceId1, + new JobChunkProgressAccumulator(), + null, + jobDefinitionRegistry, + getTransactionManager() + ).process(); + }); + + // then + JobInstance freshInstance1 = getSvc().fetchInstance(instanceId1).orElseThrow(); + if (theState.isCancellable()) { + assertEquals(StatusEnum.CANCELLED, freshInstance1.getStatus(), "cancel request processed"); + assertThat(freshInstance1.getErrorMessage(), containsString("Job instance cancelled")); + } else { + assertEquals(theState, freshInstance1.getStatus(), "cancel request ignored - state unchanged"); + assertNull(freshInstance1.getErrorMessage(), "no error message"); + } + JobInstance freshInstance2 = getSvc().fetchInstance(instanceId2).orElseThrow(); + assertEquals(theState, freshInstance2.getStatus(), "cancel request ignored - cancelled not set"); + } + + default JobInstance createInstance() { + JobInstance instance = new JobInstance(); + instance.setJobDefinitionId(JOB_DEFINITION_ID); + instance.setStatus(StatusEnum.QUEUED); + instance.setJobDefinitionVersion(JOB_DEF_VER); + instance.setParameters(CHUNK_DATA); + instance.setReport("TEST"); + return instance; + } +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java new file mode 100644 index 00000000000..163c1bb7bb3 --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java @@ -0,0 +1,46 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; +import org.springframework.transaction.support.TransactionTemplate; + +public interface IWorkChunkCommon extends WorkChunkTestConstants { + String createAndStoreJobInstance(); + + String createAndDequeueWorkChunk(String theJobInstanceId); + + WorkChunk freshFetchWorkChunk(String theChunkId); + + JobInstance createInstance(); + + String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData); + + void runInTransaction(Runnable theRunnable); + + public void sleepUntilTimeChanges(); + + JobDefinition withJobDefinition(); + + TransactionTemplate newTxTemplate(); + + JobInstance freshFetchJobInstance(String theInstanceId); + + void runMaintenancePass(); + + IHapiTransactionService getTransactionManager(); + + IJobPersistence getSvc(); + + /** + * This assumes a creation of JOB_DEFINITION already + * @param theJobInstanceId + * @return + */ + default String createChunk(String theJobInstanceId) { + return storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, theJobInstanceId, 0, CHUNK_DATA); + } +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java new file mode 100644 index 00000000000..074c558763b --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java @@ -0,0 +1,110 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public interface IWorkChunkErrorActionsTests extends IWorkChunkCommon, WorkChunkTestConstants { + + + /** + * The consumer will retry after a retryable error is thrown + */ + @Test + default void errorRetry_errorToInProgress() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); + + // when consumer restarts chunk + WorkChunk chunk = getSvc().onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); + + // then + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + + // verify the db state, error message, and error count + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, workChunkEntity.getStatus()); + assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Original error message kept"); + assertEquals(1, workChunkEntity.getErrorCount(), "error count kept"); + } + + @Test + default void errorRetry_repeatError_increasesErrorCount() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); + + // setup - the consumer is re-trying, and marks it IN_PROGRESS + getSvc().onWorkChunkDequeue(myChunkId); + + + // when another error happens + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + + + // verify the state, new message, and error count + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, workChunkEntity.getStatus()); + assertEquals(ERROR_MESSAGE_B, workChunkEntity.getErrorMessage(), "new error message"); + assertEquals(2, workChunkEntity.getErrorCount(), "error count inc"); + } + + @Test + default void errorThenRetryAndComplete_addsErrorCounts() { + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); + + // setup - the consumer is re-trying, and marks it IN_PROGRESS + getSvc().onWorkChunkDequeue(myChunkId); + + // then it completes ok. + getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1)); + + // verify the state, new message, and error count + var workChunkEntity = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.COMPLETED, workChunkEntity.getStatus()); + assertEquals(FIRST_ERROR_MESSAGE, workChunkEntity.getErrorMessage(), "Error message kept."); + assertEquals(2, workChunkEntity.getErrorCount(), "error combined with earlier error"); + } + + @Test + default void errorRetry_maxErrors_movesToFailed() { + // we start with 1 error already + String jobId = createAndStoreJobInstance(); + String myChunkId = createAndDequeueWorkChunk(jobId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); + + + // 2nd try + getSvc().onWorkChunkDequeue(myChunkId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + var chunk = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); + assertEquals(2, chunk.getErrorCount()); + + // 3rd try + getSvc().onWorkChunkDequeue(myChunkId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + chunk = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); + assertEquals(3, chunk.getErrorCount()); + + // 4th try + getSvc().onWorkChunkDequeue(myChunkId); + getSvc().onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C)); + chunk = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus()); + assertEquals(4, chunk.getErrorCount()); + assertThat("Error message contains last error", chunk.getErrorMessage(), containsString(ERROR_MESSAGE_C)); + assertThat("Error message contains error count and complaint", chunk.getErrorMessage(), containsString("many errors: 4")); + } + +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java new file mode 100644 index 00000000000..99073ab9f1f --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStateTransitions.java @@ -0,0 +1,36 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public interface IWorkChunkStateTransitions extends IWorkChunkCommon, WorkChunkTestConstants { + + @Test + default void chunkCreation_isQueued() { + String jobInstanceId = createAndStoreJobInstance(); + String myChunkId = createChunk(jobInstanceId); + + WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.READY, fetchedWorkChunk.getStatus(), "New chunks are READY"); + } + + @Test + default void chunkReceived_queuedToInProgress() { + String jobInstanceId = createAndStoreJobInstance(); + String myChunkId = createChunk(jobInstanceId); + + runMaintenancePass(); + // the worker has received the chunk, and marks it started. + WorkChunk chunk = getSvc().onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); + + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + assertEquals(CHUNK_DATA, chunk.getData()); + + // verify the db was updated too + WorkChunk fetchedWorkChunk = freshFetchWorkChunk(myChunkId); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, fetchedWorkChunk.getStatus()); + } +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java new file mode 100644 index 00000000000..2bcf79f4f2c --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkStorageTests.java @@ -0,0 +1,202 @@ +package ca.uhn.hapi.fhir.batch2.test; + +import ca.uhn.fhir.batch2.model.JobInstance; +import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; +import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; +import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public interface IWorkChunkStorageTests extends IWorkChunkCommon, WorkChunkTestConstants { + + @Test + default void testStoreAndFetchWorkChunk_NoData() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); + runMaintenancePass(); + + WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); + assertNull(chunk.getData()); + } + + @Test + default void testStoreAndFetchWorkChunk_WithData() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); + assertNotNull(id); + runMaintenancePass(); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus())); + + WorkChunk chunk = getSvc().onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); + assertEquals(36, chunk.getInstanceId().length()); + assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + assertEquals(CHUNK_DATA, chunk.getData()); + + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(id).getStatus())); + } + + @Test + default void testMarkChunkAsCompleted_Success() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA); + assertNotNull(chunkId); + + runMaintenancePass(); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(chunkId).getStatus())); + sleepUntilTimeChanges(); + + + WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + assertNotNull(chunk.getCreateTime()); + assertNotNull(chunk.getStartTime()); + assertNull(chunk.getEndTime()); + assertNull(chunk.getRecordsProcessed()); + assertNotNull(chunk.getData()); + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.IN_PROGRESS, freshFetchWorkChunk(chunkId).getStatus())); + + sleepUntilTimeChanges(); + + runInTransaction(() -> getSvc().onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0))); + + WorkChunk entity = freshFetchWorkChunk(chunkId); + assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); + assertEquals(50, entity.getRecordsProcessed()); + assertNotNull(entity.getCreateTime()); + assertNotNull(entity.getStartTime()); + assertNotNull(entity.getEndTime()); + assertNull(entity.getData()); + assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); + assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); + } + + @Test + default void testMarkChunkAsCompleted_Error() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null); + assertNotNull(chunkId); + + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus())); + runMaintenancePass(); + sleepUntilTimeChanges(); + + WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + + sleepUntilTimeChanges(); + + WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A); + getSvc().onWorkChunkError(request); + runInTransaction(() -> { + WorkChunk entity = freshFetchWorkChunk(chunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); + assertEquals(ERROR_MESSAGE_A, entity.getErrorMessage()); + assertNotNull(entity.getCreateTime()); + assertNotNull(entity.getStartTime()); + assertNotNull(entity.getEndTime()); + assertEquals(1, entity.getErrorCount()); + assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); + assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); + }); + + // Mark errored again + + WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2"); + getSvc().onWorkChunkError(request2); + runInTransaction(() -> { + WorkChunk entity = freshFetchWorkChunk(chunkId); + assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); + assertEquals("This is an error message 2", entity.getErrorMessage()); + assertNotNull(entity.getCreateTime()); + assertNotNull(entity.getStartTime()); + assertNotNull(entity.getEndTime()); + assertEquals(2, entity.getErrorCount()); + assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); + assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); + }); + + List chunks = ImmutableList.copyOf(getSvc().fetchAllWorkChunksIterator(instanceId, true)); + assertEquals(1, chunks.size()); + assertEquals(2, chunks.get(0).getErrorCount()); + } + + @Test + default void testMarkChunkAsCompleted_Fail() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, TARGET_STEP_ID, instanceId, SEQUENCE_NUMBER, null); + assertNotNull(chunkId); + + runInTransaction(() -> assertEquals(WorkChunkStatusEnum.READY, freshFetchWorkChunk(chunkId).getStatus())); + runMaintenancePass(); + sleepUntilTimeChanges(); + + WorkChunk chunk = getSvc().onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); + assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); + + sleepUntilTimeChanges(); + + getSvc().onWorkChunkFailed(chunkId, "This is an error message"); + runInTransaction(() -> { + WorkChunk entity = freshFetchWorkChunk(chunkId); + assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); + assertEquals("This is an error message", entity.getErrorMessage()); + assertNotNull(entity.getCreateTime()); + assertNotNull(entity.getStartTime()); + assertNotNull(entity.getEndTime()); + assertTrue(entity.getCreateTime().getTime() < entity.getStartTime().getTime()); + assertTrue(entity.getStartTime().getTime() < entity.getEndTime().getTime()); + }); + } + + @Test + default void markWorkChunksWithStatusAndWipeData_marksMultipleChunksWithStatus_asExpected() { + JobInstance instance = createInstance(); + String instanceId = getSvc().storeNewInstance(instance); + ArrayList chunkIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + WorkChunkCreateEvent chunk = new WorkChunkCreateEvent( + "defId", + 1, + "stepId", + instanceId, + 0, + "{}" + ); + String id = getSvc().onWorkChunkCreate(chunk); + chunkIds.add(id); + } + + runInTransaction(() -> getSvc().markWorkChunksWithStatusAndWipeData(instance.getInstanceId(), chunkIds, WorkChunkStatusEnum.COMPLETED, null)); + + Iterator reducedChunks = getSvc().fetchAllWorkChunksIterator(instanceId, true); + + while (reducedChunks.hasNext()) { + WorkChunk reducedChunk = reducedChunks.next(); + assertTrue(chunkIds.contains(reducedChunk.getId())); + assertEquals(WorkChunkStatusEnum.COMPLETED, reducedChunk.getStatus()); + } + } +} diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java new file mode 100644 index 00000000000..d2875499ce6 --- /dev/null +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/WorkChunkTestConstants.java @@ -0,0 +1,16 @@ +package ca.uhn.hapi.fhir.batch2.test; + +public interface WorkChunkTestConstants { + public static final String JOB_DEFINITION_ID = "definition-id"; + public static final String TARGET_STEP_ID = "step-id"; + public static final String DEF_CHUNK_ID = "definition-chunkId"; +// public static final String STEP_CHUNK_ID = "step-chunkId"; + public static final int JOB_DEF_VER = 1; + public static final int SEQUENCE_NUMBER = 1; + public static final String CHUNK_DATA = "{\"key\":\"value\"}"; + public static final String ERROR_MESSAGE_A = "This is an error message: A"; + public static final String ERROR_MESSAGE_B = "This is a different error message: B"; + public static final String ERROR_MESSAGE_C = "This is a different error message: C"; + + final String FIRST_ERROR_MESSAGE = ERROR_MESSAGE_A; +}