diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md index a431755fc1d..a5ced6d3945 100644 --- a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md @@ -48,10 +48,10 @@ stateDiagram-v2 state FAILED state COMPLETED direction LR - [*] --> QUEUED : on store + [*] --> QUEUED : on create %% worker processing states - QUEUED --> on_receive : on receive by worker + QUEUED --> on_receive : on deque by worker on_receive --> IN_PROGRESS : start execution IN_PROGRESS --> execute: execute diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 800f2456e69..462dc0079ec 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -21,13 +21,13 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; -import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobInstance; -import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; -import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; +import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; @@ -90,7 +90,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional(propagation = Propagation.REQUIRED) - public String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) { + public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); entity.setId(UUID.randomUUID().toString()); entity.setSequence(theBatchWorkChunk.sequence); @@ -108,7 +108,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional(propagation = Propagation.REQUIRED) - public Optional fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) { + public Optional onWorkChunkDequeue(String theChunkId) { int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS)); if (rowsModified == 0) { ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); @@ -239,7 +239,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional(propagation = Propagation.REQUIRES_NEW) - public WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters) { + public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { String chunkId = theParameters.getChunkId(); String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); @@ -264,7 +264,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional - public void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) { + public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); String errorMessage = truncateErrorMessage(theErrorMessage); myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED); @@ -272,7 +272,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional - public void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent) { + public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED); } diff --git a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java index ce3411b07a9..4973213ac12 100644 --- a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java +++ b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/bulk/BulkGroupExportWithIndexedSearchParametersTest.java @@ -16,8 +16,6 @@ import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.Meta; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java index 3cd4fa46c0a..31f7386da7b 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java @@ -46,8 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; * {@link JobInstanceProcessor#cleanupInstance()} * For chunks: - * {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#storeWorkChunk} - * {@link JpaJobPersistenceImpl#fetchWorkChunkSetStartTimeAndMarkInProgress(String)} + * {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#onWorkChunkCreate} + * {@link JpaJobPersistenceImpl#onWorkChunkDequeue(String)} * Chunk execution {@link ca.uhn.fhir.batch2.coordinator.StepExecutor#executeStep} * wipmb figure this out state transition triggers. diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 4b4027cbbb9..4f759d7c7fb 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -2,12 +2,12 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.JobOperationResultJson; -import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk; import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; @@ -94,8 +94,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { } private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { - BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); - return mySvc.storeWorkChunk(batchWorkChunk); + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); + return mySvc.onWorkChunkCreate(batchWorkChunk); } @Test @@ -231,15 +231,15 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); - BatchWorkChunk batchWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); - String chunkId = mySvc.storeWorkChunk(batchWorkChunk); + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, JOB_DEF_VER, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); + String chunkId = mySvc.onWorkChunkCreate(batchWorkChunk); Optional byId = myWorkChunkRepository.findById(chunkId); Batch2WorkChunkEntity entity = byId.get(); entity.setStatus(theStatus); myWorkChunkRepository.save(entity); // Execute - Optional workChunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId); + Optional workChunk = mySvc.onWorkChunkDequeue(chunkId); // Verify boolean chunkStarted = workChunk.isPresent(); @@ -378,7 +378,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertNull(chunk.getData()); } @@ -391,7 +391,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNotNull(id); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus())); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertEquals(36, chunk.getInstanceId().length()); assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); @@ -412,7 +412,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); assertNotNull(chunk.getCreateTime()); @@ -424,7 +424,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0)); runInTransaction(() -> { Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); @@ -444,7 +444,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 0, 0)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 0, 0)); boolean canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID); assertTrue(canAdvance); @@ -456,18 +456,18 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertFalse(canAdvance); //Toggle it to complete - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newChunkId, 50, 0)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newChunkId, 50, 0)); canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID); assertTrue(canAdvance); //Create a new chunk and set it in progress. String newerChunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(newerChunkId); + mySvc.onWorkChunkDequeue(newerChunkId); canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID); assertFalse(canAdvance); //Toggle IN_PROGRESS to complete - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(newerChunkId, 50, 0)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(newerChunkId, 50, 0)); canAdvance = mySvc.canAdvanceInstanceToNextStep(instanceId, STEP_CHUNK_ID); assertTrue(canAdvance); } @@ -483,14 +483,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message"); - mySvc.workChunkErrorEvent(request); + mySvc.onWorkChunkError(request); runInTransaction(() -> { Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); @@ -506,7 +506,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { // Mark errored again WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId).setErrorMsg("This is an error message 2"); - mySvc.workChunkErrorEvent(request2); + mySvc.onWorkChunkError(request2); runInTransaction(() -> { Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); @@ -535,13 +535,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); - mySvc.markWorkChunkAsFailed(chunkId, "This is an error message"); + mySvc.onWorkChunkFailed(chunkId, "This is an error message"); runInTransaction(() -> { Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); @@ -608,7 +608,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { String instanceId = mySvc.storeNewInstance(instance); ArrayList chunkIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { - BatchWorkChunk chunk = new BatchWorkChunk( + WorkChunkCreateEvent chunk = new WorkChunkCreateEvent( "defId", 1, "stepId", @@ -616,7 +616,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { 0, "{}" ); - String id = mySvc.storeWorkChunk(chunk); + String id = mySvc.onWorkChunkCreate(chunk); chunkIds.add(id); } diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java index 33af84cfb25..ae48324c18d 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/Batch2JobHelper.java @@ -127,7 +127,6 @@ public class Batch2JobHelper { return true; } myJobMaintenanceService.runMaintenancePass(); - Thread.sleep(1000); return hasStatus(theBatchJobId, theExpectedStatuses); } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index 05e165a4c93..18f9d1e4182 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -21,11 +21,11 @@ package ca.uhn.hapi.fhir.batch2.test; */ import ca.uhn.fhir.batch2.api.IJobPersistence; -import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; @@ -92,7 +92,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertNull(chunk.getData()); } @@ -105,7 +105,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { assertNotNull(id); runInTransaction(() -> assertEquals(WorkChunkStatusEnum.QUEUED, freshFetchWorkChunk(id).getStatus())); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(id).orElseThrow(IllegalArgumentException::new); assertEquals(36, chunk.getInstanceId().length()); assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); @@ -151,7 +151,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { myChunkId = createChunk(); // the worker has received the chunk, and marks it started. - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); assertEquals(CHUNK_DATA, chunk.getData()); @@ -167,14 +167,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest { void setUp() { // setup - the worker has received the chunk, and has marked it IN_PROGRESS. myChunkId = createChunk(); - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); + mySvc.onWorkChunkDequeue(myChunkId); } @Test public void processingOk_inProgressToSuccess_clearsDataSavesRecordCount() { // execution ok - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 0)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 0)); // verify the db was updated var workChunkEntity = freshFetchWorkChunk(myChunkId); @@ -189,7 +189,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { public void processingRetryableError_inProgressToError_bumpsCountRecordsMessage() { // execution had a retryable error - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A)); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_A)); // verify the db was updated var workChunkEntity = freshFetchWorkChunk(myChunkId); @@ -202,7 +202,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { public void processingFailure_inProgressToFailed() { // execution had a failure - mySvc.markWorkChunkAsFailed(myChunkId, "some error"); + mySvc.onWorkChunkFailed(myChunkId, "some error"); // verify the db was updated var workChunkEntity = freshFetchWorkChunk(myChunkId); @@ -218,9 +218,9 @@ public abstract class AbstractIJobPersistenceSpecificationTest { void setUp() { // setup - the worker has received the chunk, and has marked it IN_PROGRESS. myChunkId = createChunk(); - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); + mySvc.onWorkChunkDequeue(myChunkId); // execution had a retryable error - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, FIRST_ERROR_MESSAGE)); } /** @@ -230,7 +230,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { void errorRetry_errorToInProgress() { // when consumer restarts chunk - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(myChunkId).orElseThrow(IllegalArgumentException::new); // then assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); @@ -245,11 +245,11 @@ public abstract class AbstractIJobPersistenceSpecificationTest { @Test void errorRetry_repeatError_increasesErrorCount() { // setup - the consumer is re-trying, and marks it IN_PROGRESS - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); + mySvc.onWorkChunkDequeue(myChunkId); // when another error happens - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); // verify the state, new message, and error count @@ -262,10 +262,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest { @Test void errorThenRetryAndComplete_addsErrorCounts() { // setup - the consumer is re-trying, and marks it IN_PROGRESS - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); + mySvc.onWorkChunkDequeue(myChunkId); // then it completes ok. - mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(myChunkId, 3, 1)); + mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(myChunkId, 3, 1)); // verify the state, new message, and error count var workChunkEntity = freshFetchWorkChunk(myChunkId); @@ -279,22 +279,22 @@ public abstract class AbstractIJobPersistenceSpecificationTest { // we start with 1 error already // 2nd try - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + mySvc.onWorkChunkDequeue(myChunkId); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); var chunk = freshFetchWorkChunk(myChunkId); assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); assertEquals(2, chunk.getErrorCount()); // 3rd try - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); + mySvc.onWorkChunkDequeue(myChunkId); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_B)); chunk = freshFetchWorkChunk(myChunkId); assertEquals(WorkChunkStatusEnum.ERRORED, chunk.getStatus()); assertEquals(3, chunk.getErrorCount()); // 4th try - mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(myChunkId); - mySvc.workChunkErrorEvent(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C)); + mySvc.onWorkChunkDequeue(myChunkId); + mySvc.onWorkChunkError(new WorkChunkErrorEvent(myChunkId, ERROR_MESSAGE_C)); chunk = freshFetchWorkChunk(myChunkId); assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus()); assertEquals(4, chunk.getErrorCount()); @@ -351,7 +351,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); assertNotNull(chunk.getCreateTime()); @@ -363,7 +363,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { sleepUntilTimeChanges(); - runInTransaction(() -> mySvc.workChunkCompletionEvent(new WorkChunkCompletionEvent(chunkId, 50, 0))); + runInTransaction(() -> mySvc.onWorkChunkCompletion(new WorkChunkCompletionEvent(chunkId, 50, 0))); WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.COMPLETED, entity.getStatus()); @@ -388,14 +388,14 @@ public abstract class AbstractIJobPersistenceSpecificationTest { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); WorkChunkErrorEvent request = new WorkChunkErrorEvent(chunkId, ERROR_MESSAGE_A); - mySvc.workChunkErrorEvent(request); + mySvc.onWorkChunkError(request); runInTransaction(() -> { WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); @@ -411,7 +411,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { // Mark errored again WorkChunkErrorEvent request2 = new WorkChunkErrorEvent(chunkId, "This is an error message 2"); - mySvc.workChunkErrorEvent(request2); + mySvc.onWorkChunkError(request2); runInTransaction(() -> { WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.ERRORED, entity.getStatus()); @@ -440,13 +440,13 @@ public abstract class AbstractIJobPersistenceSpecificationTest { sleepUntilTimeChanges(); - WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new); + WorkChunk chunk = mySvc.onWorkChunkDequeue(chunkId).orElseThrow(IllegalArgumentException::new); assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(WorkChunkStatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); - mySvc.markWorkChunkAsFailed(chunkId, "This is an error message"); + mySvc.onWorkChunkFailed(chunkId, "This is an error message"); runInTransaction(() -> { WorkChunk entity = freshFetchWorkChunk(chunkId); assertEquals(WorkChunkStatusEnum.FAILED, entity.getStatus()); @@ -465,7 +465,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { String instanceId = mySvc.storeNewInstance(instance); ArrayList chunkIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { - BatchWorkChunk chunk = new BatchWorkChunk( + WorkChunkCreateEvent chunk = new WorkChunkCreateEvent( "defId", 1, "stepId", @@ -473,7 +473,7 @@ public abstract class AbstractIJobPersistenceSpecificationTest { 0, "{}" ); - String id = mySvc.storeWorkChunk(chunk); + String id = mySvc.onWorkChunkCreate(chunk); chunkIds.add(id); } @@ -534,8 +534,8 @@ public abstract class AbstractIJobPersistenceSpecificationTest { } private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { - BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); - return mySvc.storeWorkChunk(batchWorkChunk); + WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); + return mySvc.onWorkChunkCreate(batchWorkChunk); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 4343f69400e..970c785a8dd 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -27,8 +27,6 @@ import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.i18n.Msg; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import java.util.Date; import java.util.Iterator; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java index d0c573a44a0..81201da3370 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWorkChunkPersistence.java @@ -22,6 +22,7 @@ package ca.uhn.fhir.batch2.api; import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; @@ -42,7 +43,6 @@ public interface IWorkChunkPersistence { // WorkChunk calls ////////////////////////////////// - /** * Stores a chunk of work for later retrieval. * The first state event, as the chunk is created. @@ -53,42 +53,90 @@ public interface IWorkChunkPersistence { * @param theBatchWorkChunk the batch work chunk to be stored * @return a globally unique identifier for this chunk. */ - String storeWorkChunk(BatchWorkChunk theBatchWorkChunk); + default String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { + // back-compat for one minor version + return storeWorkChunk(theBatchWorkChunk); + } + // wipmb for deletion + @Deprecated(since="6.5.6") + default String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) { + // dead code in 6.5.7 + return null; + } /** - * Fetches a chunk of work from storage, and update the stored status to {@link WorkChunkStatusEnum#IN_PROGRESS}. + * On arrival at a worker. * The second state event, as the worker starts processing. - * This will only fetch chunks which are currently QUEUED or ERRORRED. + * Transition to {@link WorkChunkStatusEnum#IN_PROGRESS} if unless not in QUEUED or ERRORRED state. * - * @param theChunkId The ID from {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)} - * @return The WorkChunk or empty if no chunk with that id exists in the QUEUED or ERRORRED states + * @param theChunkId The ID from {@link #onWorkChunkCreate(BatchWorkChunk theBatchWorkChunk)} + * @return The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED) */ - Optional fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId); + default Optional onWorkChunkDequeue(String theChunkId) { + // back-compat for one minor version + return fetchWorkChunkSetStartTimeAndMarkInProgress(theChunkId); + } + // wipmb for deletion + @Deprecated(since="6.5.6") + default Optional fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId) { + // dead code + return null; + } /** - * Marks a given chunk as having errored (ie, may be recoverable) - *

- * Returns the work chunk. + * A retryable error. + * Transition to {@link WorkChunkStatusEnum#ERRORED} unless max-retries passed, then + * transition to {@link WorkChunkStatusEnum#FAILED}. * - * @param theParameters - the parameters for marking the workchunk with error - * @return - workchunk optional, if available. + * @param theParameters - the error message and max retry count. + * @return - the new status - ERRORED or ERRORED, depending on retry count */ - WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters); + default WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { + // back-compat for one minor version + return workChunkErrorEvent(theParameters); + } + + // wipmb for deletion + @Deprecated(since="6.5.6") + default WorkChunkStatusEnum workChunkErrorEvent(WorkChunkErrorEvent theParameters) { + // dead code in 6.5.7 + return null; + } /** - * Marks a given chunk as having failed (i.e. probably not recoverable) + * An unrecoverable error. + * Transition to {@link WorkChunkStatusEnum#FAILED} * * @param theChunkId The chunk ID */ - void markWorkChunkAsFailed(String theChunkId, String theErrorMessage); + default void onWorkChunkFailed(String theChunkId, String theErrorMessage) { + // back-compat for one minor version + markWorkChunkAsFailed(theChunkId, theErrorMessage); + } + + + // wipmb for deletion + @Deprecated(since="6.5.6") + default void markWorkChunkAsFailed(String theChunkId, String theErrorMessage) { + // dead code in 6.5.7 + } /** * Report success and complete the chunk. + * Transition to {@link WorkChunkStatusEnum#COMPLETED} * * @param theEvent with record and error count */ - void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent); + default void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { + // back-compat for one minor version + workChunkCompletionEvent(theEvent); + } + // wipmb for deletion + @Deprecated(since="6.5.6") + default void workChunkCompletionEvent(WorkChunkCompletionEvent theEvent) { + // dead code in 6.5.7 + } /** * Marks all work chunks with the provided status and erases the data diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java index 4c19e44c98f..8832a5ae316 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BatchWorkChunk.java @@ -20,12 +20,16 @@ package ca.uhn.fhir.batch2.coordinator; import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import javax.annotation.Nonnull; import javax.annotation.Nullable; +/** + * wipmb delete, and push down to WorkChunkCreateEvent + */ public class BatchWorkChunk { public final String jobDefinitionId; @@ -54,11 +58,11 @@ public class BatchWorkChunk { serializedData = theSerializedData; } - public static BatchWorkChunk firstChunk(JobDefinition theJobDefinition, String theInstanceId) { + public static WorkChunkCreateEvent firstChunk(JobDefinition theJobDefinition, String theInstanceId) { String firstStepId = theJobDefinition.getFirstStepId(); String jobDefinitionId = theJobDefinition.getJobDefinitionId(); int jobDefinitionVersion = theJobDefinition.getJobDefinitionVersion(); - return new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null); + return new WorkChunkCreateEvent(jobDefinitionId, jobDefinitionVersion, firstStepId, theInstanceId, 0, null); } @Override diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index 9514feeb6fe..2a82241120e 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -30,6 +30,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.StatusEnum; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; @@ -126,8 +127,8 @@ public class JobCoordinatorImpl implements IJobCoordinator { ourLog.info("Stored new {} job {} with status {}", jobDefinition.getJobDefinitionId(), instanceId, instance.getStatus()); ourLog.debug("Job parameters: {}", instance.getParameters()); - BatchWorkChunk batchWorkChunk = BatchWorkChunk.firstChunk(jobDefinition, instanceId); - String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk); + WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(jobDefinition, instanceId); + String chunkId = myJobPersistence.onWorkChunkCreate(batchWorkChunk); JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(jobDefinition, instanceId, chunkId); myBatchJobSender.sendWorkChannelMessage(workNotification); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java index 4011d04206d..333fe0fe00b 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkData; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.util.Logs; @@ -67,12 +68,13 @@ class JobDataSink cursor = null; WorkChunk workChunk = null; - Optional chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId); + Optional chunkOpt = myJobPersistence.onWorkChunkDequeue(chunkId); if (chunkOpt.isEmpty()) { ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId); return; diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java index f82977df17b..43cb2e23c72 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java @@ -45,6 +45,7 @@ public class WorkChunk implements IModelJson { private String myId; @JsonProperty("sequence") + // wipmb this seems unused. private int mySequence; @JsonProperty("status") diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java new file mode 100644 index 00000000000..904b4f225df --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCreateEvent.java @@ -0,0 +1,27 @@ +package ca.uhn.fhir.batch2.model; + +import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * The data required for the create transition. + * Payload for the work-chunk creation event including all the job coordinates, the chunk data, and a sequence within the step. + * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md + */ +public class WorkChunkCreateEvent extends BatchWorkChunk { + /** + * Constructor + * + * @param theJobDefinitionId The job definition ID + * @param theJobDefinitionVersion The job definition version + * @param theTargetStepId The step ID that will be responsible for consuming this chunk + * @param theInstanceId The instance ID associated with this chunk + * @param theSequence + * @param theSerializedData The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON) + */ + public WorkChunkCreateEvent(@Nonnull String theJobDefinitionId, int theJobDefinitionVersion, @Nonnull String theTargetStepId, @Nonnull String theInstanceId, int theSequence, @Nullable String theSerializedData) { + super(theJobDefinitionId, theJobDefinitionVersion, theTargetStepId, theInstanceId, theSequence, theSerializedData); + } +} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java index b8affd8d346..684652e9d26 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java @@ -16,6 +16,7 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; @@ -35,7 +36,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -139,7 +139,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PARAM_2_VALUE, params.getParam2()); assertEquals(PASSWORD_VALUE, params.getPassword()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); verify(myJobInstancePersister, times(0)).fetchWorkChunksWithoutData(any(), anyInt(), anyInt()); verify(myBatchJobSender, times(2)).sendWorkChannelMessage(any()); } @@ -147,7 +147,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { private void setupMocks(JobDefinition theJobDefinition, WorkChunk theWorkChunk) { mockJobRegistry(theJobDefinition); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(theWorkChunk)); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(theWorkChunk)); } private void mockJobRegistry(JobDefinition theJobDefinition) { @@ -257,7 +257,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PARAM_2_VALUE, params.getParam2()); assertEquals(PASSWORD_VALUE, params.getPassword()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); verify(myBatchJobSender, times(0)).sendWorkChannelMessage(any()); } @@ -266,7 +266,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); when(myStep2Worker.run(any(), any())).thenReturn(new RunOutcome(50)); @@ -284,7 +284,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PARAM_2_VALUE, params.getParam2()); assertEquals(PASSWORD_VALUE, params.getPassword()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); } @Test @@ -293,7 +293,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup AtomicInteger counter = new AtomicInteger(); doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); when(myStep2Worker.run(any(), any())).thenAnswer(t -> { if (counter.getAndIncrement() == 0) { @@ -317,12 +317,12 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PASSWORD_VALUE, params.getPassword()); ArgumentCaptor parametersArgumentCaptor = ArgumentCaptor.forClass(WorkChunkErrorEvent.class); - verify(myJobInstancePersister, times(1)).workChunkErrorEvent(parametersArgumentCaptor.capture()); + verify(myJobInstancePersister, times(1)).onWorkChunkError(parametersArgumentCaptor.capture()); WorkChunkErrorEvent capturedParams = parametersArgumentCaptor.getValue(); assertEquals(CHUNK_ID, capturedParams.getChunkId()); assertEquals("This is an error message", capturedParams.getErrorMsg()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 0, 0)); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 0, 0)); } @@ -331,7 +331,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_2, new TestJobStep2InputType(DATA_1_VALUE, DATA_2_VALUE)))); doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); when(myStep2Worker.run(any(), any())).thenAnswer(t -> { @@ -354,7 +354,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PARAM_2_VALUE, params.getParam2()); assertEquals(PASSWORD_VALUE, params.getPassword()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(eq(new WorkChunkCompletionEvent(CHUNK_ID, 50, 2))); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(eq(new WorkChunkCompletionEvent(CHUNK_ID, 50, 2))); } @Test @@ -362,7 +362,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep3())); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep3())); doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); when(myStep3Worker.run(any(), any())).thenReturn(new RunOutcome(50)); @@ -380,7 +380,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(PARAM_2_VALUE, params.getParam2()); assertEquals(PASSWORD_VALUE, params.getPassword()); - verify(myJobInstancePersister, times(1)).workChunkCompletionEvent(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); + verify(myJobInstancePersister, times(1)).onWorkChunkCompletion(new WorkChunkCompletionEvent(CHUNK_ID, 50, 0)); } @SuppressWarnings("unchecked") @@ -389,7 +389,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE)))); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunk(STEP_3, new TestJobStep3InputType().setData3(DATA_3_VALUE).setData4(DATA_4_VALUE)))); doReturn(createJobDefinition()).when(myJobDefinitionRegistry).getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1)); when(myJobInstancePersister.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(createInstance())); when(myStep3Worker.run(any(), any())).thenAnswer(t -> { @@ -406,7 +406,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Verify verify(myStep3Worker, times(1)).run(myStep3ExecutionDetailsCaptor.capture(), any()); - verify(myJobInstancePersister, times(1)).markWorkChunkAsFailed(eq(CHUNK_ID), any()); + verify(myJobInstancePersister, times(1)).onWorkChunkFailed(eq(CHUNK_ID), any()); } @Test @@ -416,7 +416,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { String exceptionMessage = "badbadnotgood"; when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(eq(JOB_DEFINITION_ID), eq(1))).thenThrow(new JobExecutionFailedException(exceptionMessage)); - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep2())); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.of(createWorkChunkStep2())); mySvc.start(); // Execute @@ -442,7 +442,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { // Setup - when(myJobInstancePersister.fetchWorkChunkSetStartTimeAndMarkInProgress(eq(CHUNK_ID))).thenReturn(Optional.empty()); + when(myJobInstancePersister.onWorkChunkDequeue(eq(CHUNK_ID))).thenReturn(Optional.empty()); mySvc.start(); // Execute @@ -516,8 +516,8 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { assertEquals(1, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionVersion()); assertEquals(STEP_1, myJobWorkNotificationCaptor.getAllValues().get(0).getTargetStepId()); - BatchWorkChunk expectedWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null); - verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(expectedWorkChunk)); + WorkChunkCreateEvent expectedWorkChunk = new WorkChunkCreateEvent(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null); + verify(myJobInstancePersister, times(1)).onWorkChunkCreate(eq(expectedWorkChunk)); verifyNoMoreInteractions(myJobInstancePersister); verifyNoMoreInteractions(myStep1Worker); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java index 97960fc22d7..92c9f811f23 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.JsonUtil; import com.fasterxml.jackson.annotation.JsonProperty; @@ -52,7 +53,7 @@ class JobDataSinkTest { @Captor private ArgumentCaptor myJobWorkNotificationCaptor; @Captor - private ArgumentCaptor myBatchWorkChunkCaptor; + private ArgumentCaptor myBatchWorkChunkCaptor; @Test public void test_sink_accept() { @@ -93,7 +94,7 @@ class JobDataSinkTest { // execute // Let's test our first step worker by calling run on it: - when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID); + when(myJobPersistence.onWorkChunkCreate(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID); JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID); StepExecutionDetails details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID); JobWorkCursor cursor = new JobWorkCursor<>(job, true, firstStep, lastStep); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java index a8472065c44..638b80ec31b 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/ReductionStepExecutorServiceImplTest.java @@ -204,7 +204,7 @@ public class ReductionStepExecutorServiceImplTest { ArgumentCaptor chunkIdCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(String.class); verify(myJobPersistence, times(chunkIds.size())) - .markWorkChunkAsFailed(chunkIdCaptor.capture(), errorCaptor.capture()); + .onWorkChunkFailed(chunkIdCaptor.capture(), errorCaptor.capture()); List chunkIdsCaptured = chunkIdCaptor.getAllValues(); List errorsCaptured = errorCaptor.getAllValues(); for (int i = 0; i < chunkIds.size(); i++) { diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java index e5980df4436..83cc133cb95 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java @@ -150,12 +150,12 @@ public class WorkChunkProcessorTest { // verify assertTrue(result.isSuccessful()); verify(myJobPersistence) - .workChunkCompletionEvent(any(WorkChunkCompletionEvent.class)); + .onWorkChunkCompletion(any(WorkChunkCompletionEvent.class)); assertTrue(myDataSink.myActualDataSink instanceof JobDataSink); if (theRecoveredErrorsForDataSink > 0) { verify(myJobPersistence) - .workChunkCompletionEvent(any(WorkChunkCompletionEvent.class)); + .onWorkChunkCompletion(any(WorkChunkCompletionEvent.class)); //.workChunkErrorEvent(anyString(new WorkChunkErrorEvent(chunk.getId(), theRecoveredErrorsForDataSink))); } @@ -207,7 +207,7 @@ public class WorkChunkProcessorTest { runExceptionThrowingTest(new JobExecutionFailedException("Failure")); verify(myJobPersistence) - .markWorkChunkAsFailed(anyString(), anyString()); + .onWorkChunkFailed(anyString(), anyString()); } @Test @@ -239,7 +239,7 @@ public class WorkChunkProcessorTest { // when when(myNonReductionStep.run(any(), any())) .thenThrow(new RuntimeException(errorMsg)); - when(myJobPersistence.workChunkErrorEvent(any(WorkChunkErrorEvent.class))) + when(myJobPersistence.onWorkChunkError(any(WorkChunkErrorEvent.class))) .thenAnswer((p) -> { WorkChunk ec = new WorkChunk(); ec.setId(chunk.getId()); @@ -314,17 +314,17 @@ public class WorkChunkProcessorTest { private void verifyNoErrors(int theRecoveredErrorCount) { if (theRecoveredErrorCount == 0) { verify(myJobPersistence, never()) - .workChunkErrorEvent(any()); + .onWorkChunkError(any()); } verify(myJobPersistence, never()) - .markWorkChunkAsFailed(anyString(), anyString()); + .onWorkChunkFailed(anyString(), anyString()); verify(myJobPersistence, never()) - .workChunkErrorEvent(any(WorkChunkErrorEvent.class)); + .onWorkChunkError(any(WorkChunkErrorEvent.class)); } private void verifyNonReductionStep() { verify(myJobPersistence, never()) - .fetchWorkChunkSetStartTimeAndMarkInProgress(anyString()); + .onWorkChunkDequeue(anyString()); verify(myJobPersistence, never()) .markWorkChunksWithStatusAndWipeData(anyString(), anyList(), any(), any()); verify(myJobPersistence, never())