From 40de855d17ad45819aa20e117a93d1b315cd312f Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Sat, 2 Apr 2022 16:34:26 -0400 Subject: [PATCH] Add tests. pull some inner classes out as package private classes. (#3516) * Add tests. pull some inner classes out as package private classes. * review feedback --- .../jpa/batch2/JpaJobPersistenceImpl.java | 16 +- .../jpa/batch2/JpaJobPersistenceImplTest.java | 63 ++++---- .../uhn/fhir/batch2/api/IJobPersistence.java | 13 +- .../fhir/batch2/config/BaseBatch2Config.java | 12 +- .../ca/uhn/fhir/batch2/impl/BaseDataSink.java | 35 +++++ .../uhn/fhir/batch2/impl/BatchJobSender.java | 24 +++ .../uhn/fhir/batch2/impl/BatchWorkChunk.java | 56 +++++++ .../fhir/batch2/impl/FinalStepDataSink.java | 34 +++++ .../fhir/batch2/impl/JobCoordinatorImpl.java | 119 ++------------- .../ca/uhn/fhir/batch2/impl/JobDataSink.java | 51 +++++++ .../SynchronizedJobPersistenceWrapper.java | 5 +- .../batch2/model/JobWorkNotification.java | 18 +++ .../batch2/impl/JobCoordinatorImplTest.java | 21 ++- .../uhn/fhir/batch2/impl/JobDataSinkTest.java | 139 ++++++++++++++++++ .../fhir/batch2/impl/JobDefinitionTest.java | 38 +++++ 15 files changed, 470 insertions(+), 174 deletions(-) create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java create mode 100644 hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java create mode 100644 hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 9d9afbee8f9..1e694f6c04a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.batch2; */ import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.impl.BatchWorkChunk; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; @@ -28,7 +29,6 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; -import ca.uhn.fhir.util.JsonUtil; import org.apache.commons.lang3.Validate; import org.springframework.data.domain.PageRequest; @@ -59,15 +59,15 @@ public class JpaJobPersistenceImpl implements IJobPersistence { } @Override - public String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized) { + public String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) { Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); entity.setId(UUID.randomUUID().toString()); - entity.setSequence(theSequence); - entity.setJobDefinitionId(theJobDefinitionId); - entity.setJobDefinitionVersion(theJobDefinitionVersion); - entity.setTargetStepId(theTargetStepId); - entity.setInstanceId(theInstanceId); - entity.setSerializedData(theDataSerialized); + entity.setSequence(theBatchWorkChunk.sequence); + entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); + entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); + entity.setTargetStepId(theBatchWorkChunk.targetStepId); + entity.setInstanceId(theBatchWorkChunk.instanceId); + entity.setSerializedData(theBatchWorkChunk.serializedData); entity.setCreateTime(new Date()); entity.setStatus(StatusEnum.QUEUED); myWorkChunkRepository.save(entity); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 43b9abda8ca..84e7fe6fe44 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -1,6 +1,7 @@ package ca.uhn.fhir.jpa.batch2; import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.impl.BatchWorkChunk; import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; @@ -8,11 +9,9 @@ import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test; -import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.util.JsonUtil; -import com.github.jsonldjava.shaded.com.google.common.collect.Lists; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; @@ -20,17 +19,13 @@ import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.Nonnull; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.in; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -40,7 +35,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @TestMethodOrder(MethodOrderer.MethodName.class) public class JpaJobPersistenceImplTest extends BaseJpaR4Test { + public static final String JOB_DEFINITION_ID = "definition-id"; + public static final String TARGET_STEP_ID = "step-id"; + public static final String DEF_CHUNK_ID = "definition-chunkId"; + public static final String STEP_CHUNK_ID = "step-chunkId"; + public static final int JOB_DEF_VER = 1; + public static final int SEQUENCE_NUMBER = 1; public static final String CHUNK_DATA = "{\"key\":\"value\"}"; + @Autowired private IJobPersistence mySvc; @Autowired @@ -55,7 +57,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); for (int i = 0; i < 10; i++) { - mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}"))); + storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, JsonUtil.serialize(new NdJsonFileJson().setNdJsonText("{}"))); } // Execute @@ -70,6 +72,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { }); } + private String storeWorkChunk(String theJobDefinitionId, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + BatchWorkChunk batchWorkChunk = new BatchWorkChunk(theJobDefinitionId, JOB_DEF_VER, theTargetStepId, theInstanceId, theSequence, theSerializedData); + return mySvc.storeWorkChunk(batchWorkChunk); + } + @Test public void testDeleteChunks() { // Setup @@ -77,7 +84,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); for (int i = 0; i < 10; i++) { - mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, CHUNK_DATA); + storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA); } // Execute @@ -104,8 +111,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException()); assertEquals(instanceId, foundInstance.getInstanceId()); - assertEquals("definition-id", foundInstance.getJobDefinitionId()); - assertEquals(1, foundInstance.getJobDefinitionVersion()); + assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion()); assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus()); assertEquals(CHUNK_DATA, foundInstance.getParameters()); @@ -131,8 +138,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException()); assertEquals(instanceId, foundInstance.getInstanceId()); - assertEquals("definition-id", foundInstance.getJobDefinitionId()); - assertEquals(1, foundInstance.getJobDefinitionVersion()); + assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion()); assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus()); assertTrue( foundInstance.isCancelled()); assertEquals(CHUNK_DATA, foundInstance.getParameters()); @@ -146,8 +153,8 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException()); assertEquals(36, foundInstance.getInstanceId().length()); - assertEquals("definition-id", foundInstance.getJobDefinitionId()); - assertEquals(1, foundInstance.getJobDefinitionVersion()); + assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion()); assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus()); assertEquals(CHUNK_DATA, foundInstance.getParameters()); } @@ -159,7 +166,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { List ids =new ArrayList<>(); for (int i = 0; i < 10; i++) { - String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, i, CHUNK_DATA); + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, i, CHUNK_DATA); ids.add(id); } @@ -197,7 +204,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, 0, null); + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null); WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException()); assertEquals(null, chunk.getData()); @@ -208,14 +215,14 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String id = mySvc.storeWorkChunk("definition-id", 1, "step-id", instanceId, 0, CHUNK_DATA); + String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA); assertNotNull(id); runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(() -> new IllegalArgumentException()).getStatus())); WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException()); assertEquals(36, chunk.getInstanceId().length()); - assertEquals("definition-id", chunk.getJobDefinitionId()); - assertEquals(1, chunk.getJobDefinitionVersion()); + assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion()); assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus()); assertEquals(CHUNK_DATA, chunk.getData()); @@ -226,7 +233,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { public void testMarkChunkAsCompleted_Success() { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, CHUNK_DATA); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA); assertNotNull(chunkId); runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus())); @@ -234,7 +241,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException()); - assertEquals(1, chunk.getSequence()); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus()); assertNotNull(chunk.getCreateTime()); assertNotNull(chunk.getStartTime()); @@ -267,7 +274,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); assertNotNull(chunkId); // Execute @@ -286,7 +293,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { public void testMarkChunkAsCompleted_Error() { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); assertNotNull(chunkId); runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus())); @@ -294,7 +301,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException()); - assertEquals(1, chunk.getSequence()); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); @@ -336,7 +343,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { public void testMarkChunkAsCompleted_Fail() { JobInstance instance = createInstance(); String instanceId = mySvc.storeNewInstance(instance); - String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null); + String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null); assertNotNull(chunkId); runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus())); @@ -344,7 +351,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { sleepUntilTimeChanges(); WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException()); - assertEquals(1, chunk.getSequence()); + assertEquals(SEQUENCE_NUMBER, chunk.getSequence()); assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus()); sleepUntilTimeChanges(); @@ -413,9 +420,9 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { @Nonnull private JobInstance createInstance() { JobInstance instance = new JobInstance(); - instance.setJobDefinitionId("definition-id"); + instance.setJobDefinitionId(JOB_DEFINITION_ID); instance.setStatus(StatusEnum.QUEUED); - instance.setJobDefinitionVersion(1); + instance.setJobDefinitionVersion(JOB_DEF_VER); instance.setParameters(CHUNK_DATA); return instance; } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index b5f282680d7..3e802c40976 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.api; * #L% */ +import ca.uhn.fhir.batch2.impl.BatchWorkChunk; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; @@ -34,20 +35,16 @@ public interface IJobPersistence { *

* Chunk should be stored with a status of {@link ca.uhn.fhir.batch2.model.StatusEnum#QUEUED} * - * @param theJobDefinitionId The job definition ID - * @param theJobDefinitionVersion The job definition version - * @param theTargetStepId The step ID that will be responsible for consuming this chunk - * @param theInstanceId The instance ID associated with this chunk - * @param theDataSerialized The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON) - * @return Returns a globally unique identifier for this chunk. This should be a sequentially generated ID, a UUID, or something like that which is guaranteed to never overlap across jobs or instances. + * @param theBatchWorkChunk the batch work chunk to be stored + * @return a globally unique identifier for this chunk. This should be a sequentially generated ID, a UUID, or something like that which is guaranteed to never overlap across jobs or instances. */ - String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized); + String storeWorkChunk(BatchWorkChunk theBatchWorkChunk); /** * Fetches a chunk of work from storage, and update the stored status * to {@link ca.uhn.fhir.batch2.model.StatusEnum#IN_PROGRESS} * - * @param theChunkId The ID, as returned by {@link #storeWorkChunk(String, int, String, String, int, String)} + * @param theChunkId The ID, as returned by {@link #storeWorkChunk(BatchWorkChunk theBatchWorkChunk)} * @return The chunk of work */ Optional fetchWorkChunkSetStartTimeAndMarkInProgress(String theChunkId); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 9ff90465498..30e8f035801 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.config; import ca.uhn.fhir.batch2.api.IJobCleanerService; import ca.uhn.fhir.batch2.api.IJobCoordinator; import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.impl.BatchJobSender; import ca.uhn.fhir.batch2.impl.JobCleanerServiceImpl; import ca.uhn.fhir.batch2.impl.JobCoordinatorImpl; import ca.uhn.fhir.batch2.impl.JobDefinitionRegistry; @@ -33,7 +34,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -48,9 +48,9 @@ public abstract class BaseBatch2Config { } @Bean - public IJobCoordinator batch2JobCoordinator(@Autowired IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) { + public IJobCoordinator batch2JobCoordinator(IChannelFactory theChannelFactory, IJobPersistence theJobInstancePersister, JobDefinitionRegistry theJobDefinitionRegistry) { return new JobCoordinatorImpl( - batch2ProcessingChannelProducer(theChannelFactory), + new BatchJobSender(batch2ProcessingChannelProducer(theChannelFactory)), batch2ProcessingChannelReceiver(theChannelFactory), theJobInstancePersister, theJobDefinitionRegistry @@ -58,19 +58,19 @@ public abstract class BaseBatch2Config { } @Bean - public IJobCleanerService batch2JobCleaner(@Autowired ISchedulerService theSchedulerService, @Autowired IJobPersistence theJobPersistence) { + public IJobCleanerService batch2JobCleaner(ISchedulerService theSchedulerService, IJobPersistence theJobPersistence) { return new JobCleanerServiceImpl(theSchedulerService, theJobPersistence); } @Bean - public IChannelProducer batch2ProcessingChannelProducer(@Autowired IChannelFactory theChannelFactory) { + public IChannelProducer batch2ProcessingChannelProducer(IChannelFactory theChannelFactory) { ChannelProducerSettings settings = new ChannelProducerSettings() .setConcurrentConsumers(getConcurrentConsumers()); return theChannelFactory.getOrCreateProducer(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings); } @Bean - public IChannelReceiver batch2ProcessingChannelReceiver(@Autowired IChannelFactory theChannelFactory) { + public IChannelReceiver batch2ProcessingChannelReceiver(IChannelFactory theChannelFactory) { ChannelConsumerSettings settings = new ChannelConsumerSettings() .setConcurrentConsumers(getConcurrentConsumers()); return theChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java new file mode 100644 index 00000000000..a469e493dbe --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BaseDataSink.java @@ -0,0 +1,35 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.model.api.IModelJson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class BaseDataSink implements IJobDataSink { + private static final Logger ourLog = LoggerFactory.getLogger(BaseDataSink.class); + + private final String myInstanceId; + private final String myCurrentStepId; + private int myRecoveredErrorCount; + + protected BaseDataSink(String theInstanceId, String theCurrentStepId) { + myInstanceId = theInstanceId; + myCurrentStepId = theCurrentStepId; + } + + public String getInstanceId() { + return myInstanceId; + } + + @Override + public void recoveredError(String theMessage) { + ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myCurrentStepId, theMessage); + myRecoveredErrorCount++; + } + + public int getRecoveredErrorCount() { + return myRecoveredErrorCount; + } + + public abstract int getWorkChunkCount(); +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java new file mode 100644 index 00000000000..f20b45251b2 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchJobSender.java @@ -0,0 +1,24 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchJobSender { + private static final Logger ourLog = LoggerFactory.getLogger(BatchJobSender.class); + private final IChannelProducer myWorkChannelProducer; + + public BatchJobSender(IChannelProducer theWorkChannelProducer) { + myWorkChannelProducer = theWorkChannelProducer; + } + + void sendWorkChannelMessage(JobWorkNotification theJobWorkNotification) { + JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage(); + message.setPayload(theJobWorkNotification); + + ourLog.info("Sending work notification for {}", theJobWorkNotification); + myWorkChannelProducer.send(message); + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java new file mode 100644 index 00000000000..2aa789122b8 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/BatchWorkChunk.java @@ -0,0 +1,56 @@ +package ca.uhn.fhir.batch2.impl; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +public class BatchWorkChunk { + + public final String jobDefinitionId; + public final int jobDefinitionVersion; + public final String targetStepId; + public final String instanceId; + public final int sequence; + public final String serializedData; + + /** + * Constructor + * + * @param theJobDefinitionId The job definition ID + * @param theJobDefinitionVersion The job definition version + * @param theTargetStepId The step ID that will be responsible for consuming this chunk + * @param theInstanceId The instance ID associated with this chunk + * @param theSerializedData The data. This will be in the form of a map where the values may be strings, lists, and other maps (i.e. JSON) + */ + + public BatchWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theSerializedData) { + jobDefinitionId = theJobDefinitionId; + jobDefinitionVersion = theJobDefinitionVersion; + targetStepId = theTargetStepId; + instanceId = theInstanceId; + sequence = theSequence; + serializedData = theSerializedData; + } + + @Override + public boolean equals(Object theO) { + if (this == theO) return true; + + if (theO == null || getClass() != theO.getClass()) return false; + + BatchWorkChunk that = (BatchWorkChunk) theO; + + return new EqualsBuilder() + .append(jobDefinitionVersion, that.jobDefinitionVersion) + .append(sequence, that.sequence) + .append(jobDefinitionId, that.jobDefinitionId) + .append(targetStepId, that.targetStepId) + .append(instanceId, that.instanceId) + .append(serializedData, that.serializedData) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(jobDefinitionId).append(jobDefinitionVersion).append(targetStepId).append(instanceId).append(sequence).append(serializedData).toHashCode(); + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java new file mode 100644 index 00000000000..331b17ffce4 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/FinalStepDataSink.java @@ -0,0 +1,34 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.model.WorkChunkData; +import ca.uhn.fhir.i18n.Msg; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class FinalStepDataSink extends BaseDataSink { + private static final Logger ourLog = LoggerFactory.getLogger(FinalStepDataSink.class); + + private final String myJobDefinitionId; + + /** + * Constructor + */ + FinalStepDataSink(String theJobDefinitionId, String theInstanceId, String theCurrentStepId) { + super(theInstanceId, theCurrentStepId); + myJobDefinitionId = theJobDefinitionId; + } + + @Override + public void accept(WorkChunkData theData) { + String msg = "Illegal attempt to store data during final step of job " + myJobDefinitionId; + ourLog.error(msg); + throw new JobExecutionFailedException(Msg.code(2045) + msg); + } + + @Override + public int getWorkChunkCount() { + return 0; + } +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java index 4d3c271a624..639503fcfe5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImpl.java @@ -21,7 +21,6 @@ package ca.uhn.fhir.batch2.impl; */ import ca.uhn.fhir.batch2.api.IJobCoordinator; -import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobParametersValidator; import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.api.IJobStepWorker; @@ -37,9 +36,7 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.fhir.batch2.model.WorkChunkData; import ca.uhn.fhir.i18n.Msg; -import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.annotation.PasswordField; @@ -68,7 +65,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; @@ -77,7 +73,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinator { private static final Logger ourLog = LoggerFactory.getLogger(JobCoordinatorImpl.class); - private final IChannelProducer myWorkChannelProducer; + private final BatchJobSender myBatchJobSender; private final IChannelReceiver myWorkChannelReceiver; private final JobDefinitionRegistry myJobDefinitionRegistry; private final MessageHandler myReceiverHandler = new WorkChannelMessageHandler(); @@ -87,12 +83,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato * Constructor */ public JobCoordinatorImpl( - @Nonnull IChannelProducer theWorkChannelProducer, + @Nonnull BatchJobSender theBatchJobSender, @Nonnull IChannelReceiver theWorkChannelReceiver, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) { super(theJobPersistence); - myWorkChannelProducer = theWorkChannelProducer; + myBatchJobSender = theBatchJobSender; myWorkChannelReceiver = theWorkChannelReceiver; myJobDefinitionRegistry = theJobDefinitionRegistry; } @@ -120,9 +116,12 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato instance.setParameters(theStartRequest.getParameters()); String instanceId = myJobPersistence.storeNewInstance(instance); - String chunkId = myJobPersistence.storeWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null); - sendWorkChannelMessage(jobDefinitionId, jobDefinitionVersion, instanceId, firstStepId, chunkId); + BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, firstStepId, instanceId, 0, null); + String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk); + + JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, firstStepId, chunkId); + myBatchJobSender.sendWorkChannelMessage(workNotification); return instanceId; } @@ -245,20 +244,6 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato myWorkChannelReceiver.unsubscribe(myReceiverHandler); } - private void sendWorkChannelMessage(String theJobDefinitionId, int jobDefinitionVersion, String theInstanceId, String theTargetStepId, String theChunkId) { - JobWorkNotificationJsonMessage message = new JobWorkNotificationJsonMessage(); - JobWorkNotification workNotification = new JobWorkNotification(); - workNotification.setJobDefinitionId(theJobDefinitionId); - workNotification.setJobDefinitionVersion(jobDefinitionVersion); - workNotification.setChunkId(theChunkId); - workNotification.setInstanceId(theInstanceId); - workNotification.setTargetStepId(theTargetStepId); - message.setPayload(workNotification); - - ourLog.info("Sending work notification for job[{}] instance[{}] step[{}] chunk[{}]", theJobDefinitionId, theInstanceId, theTargetStepId, theChunkId); - myWorkChannelProducer.send(message); - } - private void handleWorkChannelMessage(JobWorkNotificationJsonMessage theMessage) { JobWorkNotification payload = theMessage.getPayload(); @@ -321,7 +306,7 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato BaseDataSink dataSink; if (theSubsequentStep != null) { - dataSink = new JobDataSink<>(jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId()); + dataSink = new JobDataSink<>(myBatchJobSender, myJobPersistence, jobDefinitionId, jobDefinitionVersion, theSubsequentStep, instanceId, theStep.getStepId()); } else { dataSink = (BaseDataSink) new FinalStepDataSink(jobDefinitionId, instanceId, theStep.getStepId()); } @@ -382,90 +367,4 @@ public class JobCoordinatorImpl extends BaseJobService implements IJobCoordinato handleWorkChannelMessage((JobWorkNotificationJsonMessage) theMessage); } } - - private abstract class BaseDataSink implements IJobDataSink { - - private final String myInstanceId; - private final String myCurrentStepId; - private int myRecoveredErrorCount; - - protected BaseDataSink(String theInstanceId, String theCurrentStepId) { - myInstanceId = theInstanceId; - myCurrentStepId = theCurrentStepId; - } - - public String getInstanceId() { - return myInstanceId; - } - - @Override - public void recoveredError(String theMessage) { - ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myCurrentStepId, theMessage); - myRecoveredErrorCount++; - } - - public int getRecoveredErrorCount() { - return myRecoveredErrorCount; - } - - public abstract int getWorkChunkCount(); - } - - private class JobDataSink extends BaseDataSink { - private final String myJobDefinitionId; - private final int myJobDefinitionVersion; - private final JobDefinitionStep myTargetStep; - private final AtomicInteger myChunkCounter = new AtomicInteger(0); - - public JobDataSink(String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep theTargetStep, String theInstanceId, String theCurrentStepId) { - super(theInstanceId, theCurrentStepId); - myJobDefinitionId = theJobDefinitionId; - myJobDefinitionVersion = theJobDefinitionVersion; - myTargetStep = theTargetStep; - } - - @Override - public void accept(WorkChunkData theData) { - String jobDefinitionId = myJobDefinitionId; - int jobDefinitionVersion = myJobDefinitionVersion; - String instanceId = getInstanceId(); - String targetStepId = myTargetStep.getStepId(); - int sequence = myChunkCounter.getAndIncrement(); - OT dataValue = theData.getData(); - String dataValueString = JsonUtil.serialize(dataValue, false); - String chunkId = myJobPersistence.storeWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString); - - sendWorkChannelMessage(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId); - } - - @Override - public int getWorkChunkCount() { - return myChunkCounter.get(); - } - - } - - private class FinalStepDataSink extends BaseDataSink { - private final String myJobDefinitionId; - - /** - * Constructor - */ - private FinalStepDataSink(String theJobDefinitionId, String theInstanceId, String theCurrentStepId) { - super(theInstanceId, theCurrentStepId); - myJobDefinitionId = theJobDefinitionId; - } - - @Override - public void accept(WorkChunkData theData) { - String msg = "Illegal attempt to store data during final step of job " + myJobDefinitionId; - ourLog.error(msg); - throw new JobExecutionFailedException(Msg.code(2045) + msg); - } - - @Override - public int getWorkChunkCount() { - return 0; - } - } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java new file mode 100644 index 00000000000..4fa2a8c521b --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/JobDataSink.java @@ -0,0 +1,51 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.model.JobDefinitionStep; +import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.batch2.model.WorkChunkData; +import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.util.JsonUtil; + +import java.util.concurrent.atomic.AtomicInteger; + +class JobDataSink extends BaseDataSink { + private final BatchJobSender myBatchJobSender; + private final IJobPersistence myJobPersistence; + private final String myJobDefinitionId; + private final int myJobDefinitionVersion; + private final JobDefinitionStep myTargetStep; + private final AtomicInteger myChunkCounter = new AtomicInteger(0); + + JobDataSink(BatchJobSender theBatchJobSender, IJobPersistence theJobPersistence, String theJobDefinitionId, int theJobDefinitionVersion, JobDefinitionStep theTargetStep, String theInstanceId, String theCurrentStepId) { + super(theInstanceId, theCurrentStepId); + myBatchJobSender = theBatchJobSender; + myJobPersistence = theJobPersistence; + myJobDefinitionId = theJobDefinitionId; + myJobDefinitionVersion = theJobDefinitionVersion; + myTargetStep = theTargetStep; + } + + @Override + public void accept(WorkChunkData theData) { + String jobDefinitionId = myJobDefinitionId; + int jobDefinitionVersion = myJobDefinitionVersion; + String instanceId = getInstanceId(); + String targetStepId = myTargetStep.getStepId(); + int sequence = myChunkCounter.getAndIncrement(); + OT dataValue = theData.getData(); + String dataValueString = JsonUtil.serialize(dataValue, false); + + BatchWorkChunk batchWorkChunk = new BatchWorkChunk(jobDefinitionId, jobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString); + String chunkId = myJobPersistence.storeWorkChunk(batchWorkChunk); + + JobWorkNotification workNotification = new JobWorkNotification(jobDefinitionId, jobDefinitionVersion, instanceId, targetStepId, chunkId); + myBatchJobSender.sendWorkChannelMessage(workNotification); + } + + @Override + public int getWorkChunkCount() { + return myChunkCounter.get(); + } + +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java index 7b468c1af4c..0276a24daf2 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/impl/SynchronizedJobPersistenceWrapper.java @@ -24,7 +24,6 @@ import ca.uhn.fhir.batch2.api.IJobPersistence; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; -import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -40,8 +39,8 @@ public class SynchronizedJobPersistenceWrapper implements IJobPersistence { } @Override - public synchronized String storeWorkChunk(String theJobDefinitionId, int theJobDefinitionVersion, String theTargetStepId, String theInstanceId, int theSequence, String theDataSerialized) { - return myWrap.storeWorkChunk(theJobDefinitionId, theJobDefinitionVersion, theTargetStepId, theInstanceId, theSequence, theDataSerialized); + public synchronized String storeWorkChunk(BatchWorkChunk theBatchWorkChunk) { + return myWrap.storeWorkChunk(theBatchWorkChunk); } @Override diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkNotification.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkNotification.java index 96699fce296..5f948a8e2d2 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkNotification.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobWorkNotification.java @@ -23,6 +23,8 @@ package ca.uhn.fhir.batch2.model; import ca.uhn.fhir.model.api.IModelJson; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nonnull; + public class JobWorkNotification implements IModelJson { @JsonProperty(value = "jobDefinitionId") @@ -40,6 +42,17 @@ public class JobWorkNotification implements IModelJson { @JsonProperty(value = "instanceId") private String myInstanceId; + public JobWorkNotification() { + } + + public JobWorkNotification(@Nonnull String theJobDefinitionId, int jobDefinitionVersion, @Nonnull String theInstanceId, @Nonnull String theTargetStepId, @Nonnull String theChunkId) { + setJobDefinitionId(theJobDefinitionId); + setJobDefinitionVersion(jobDefinitionVersion); + setChunkId(theChunkId); + setInstanceId(theInstanceId); + setTargetStepId(theTargetStepId); + } + public String getJobDefinitionId() { return myJobDefinitionId; } @@ -79,4 +92,9 @@ public class JobWorkNotification implements IModelJson { public String getInstanceId() { return myInstanceId; } + + @Override + public String toString() { + return String.format("job[%s] instance[%s] step[%s] chunk[%s]", myJobDefinitionId, myInstanceId, myTargetStepId, myChunkId); + } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java index 0b0f613eba5..889bc569589 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobCoordinatorImplTest.java @@ -14,7 +14,6 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; -import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; import ca.uhn.fhir.model.api.IModelJson; @@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -70,7 +68,7 @@ public class JobCoordinatorImplTest { private final IChannelReceiver myWorkChannelReceiver = LinkedBlockingChannel.newSynchronous("receiver"); private JobCoordinatorImpl mySvc; @Mock - private IChannelProducer myWorkChannelProducer; + private BatchJobSender myBatchJobSender; @Mock private IJobPersistence myJobInstancePersister; @Mock @@ -88,7 +86,7 @@ public class JobCoordinatorImplTest { @Captor private ArgumentCaptor> myStep3ExecutionDetailsCaptor; @Captor - private ArgumentCaptor myMessageCaptor; + private ArgumentCaptor myJobWorkNotificationCaptor; @Captor private ArgumentCaptor myJobInstanceCaptor; @Captor @@ -96,7 +94,7 @@ public class JobCoordinatorImplTest { @BeforeEach public void beforeEach() { - mySvc = new JobCoordinatorImpl(myWorkChannelProducer, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry); + mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry); } @Test @@ -427,13 +425,14 @@ public class JobCoordinatorImplTest { assertEquals(PASSWORD_VALUE, myJobInstanceCaptor.getValue().getParameters(TestJobParameters.class).getPassword()); assertEquals(StatusEnum.QUEUED, myJobInstanceCaptor.getValue().getStatus()); - verify(myWorkChannelProducer, times(1)).send(myMessageCaptor.capture()); - assertNull(myMessageCaptor.getAllValues().get(0).getPayload().getChunkId()); - assertEquals(JOB_DEFINITION_ID, myMessageCaptor.getAllValues().get(0).getPayload().getJobDefinitionId()); - assertEquals(1, myMessageCaptor.getAllValues().get(0).getPayload().getJobDefinitionVersion()); - assertEquals(STEP_1, myMessageCaptor.getAllValues().get(0).getPayload().getTargetStepId()); + verify(myBatchJobSender, times(1)).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture()); + assertNull(myJobWorkNotificationCaptor.getAllValues().get(0).getChunkId()); + assertEquals(JOB_DEFINITION_ID, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionId()); + assertEquals(1, myJobWorkNotificationCaptor.getAllValues().get(0).getJobDefinitionVersion()); + assertEquals(STEP_1, myJobWorkNotificationCaptor.getAllValues().get(0).getTargetStepId()); - verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(JOB_DEFINITION_ID), eq(1), eq(STEP_1), eq(INSTANCE_ID), eq(0), isNull()); + BatchWorkChunk expectedWorkChunk = new BatchWorkChunk(JOB_DEFINITION_ID, 1, STEP_1, INSTANCE_ID, 0, null); + verify(myJobInstancePersister, times(1)).storeWorkChunk(eq(expectedWorkChunk)); verifyNoMoreInteractions(myJobInstancePersister); verifyNoMoreInteractions(myStep1Worker); diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java new file mode 100644 index 00000000000..1b05f82b61b --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDataSinkTest.java @@ -0,0 +1,139 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobPersistence; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.batch2.model.JobWorkNotification; +import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.util.JsonUtil; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class JobDataSinkTest { + private static final String JOB_DEF_ID = "Jeff"; + private static final String JOB_DESC = "Jeff is curious"; + private static final int JOB_DEF_VERSION = 1; + private static final int PID_COUNT = 729; + private static final String JOB_INSTANCE_ID = "17"; + private static final String CHUNK_ID = "289"; + public static final String FIRST_STEP_ID = "firstStep"; + public static final String LAST_STEP_ID = "lastStep"; + + @Mock + private BatchJobSender myBatchJobSender; + @Mock + private IJobPersistence myJobPersistence; + @Captor + private ArgumentCaptor myJobWorkNotificationCaptor; + @Captor + private ArgumentCaptor myBatchWorkChunkCaptor; + + @Test + public void test_sink_accept() { + // setup + + IJobStepWorker firstStepWorker = new IJobStepWorker() { + @NotNull + @Override + public RunOutcome run(@NotNull StepExecutionDetails theStepExecutionDetails, @NotNull IJobDataSink theDataSink) throws JobExecutionFailedException { + TestJobParameters params = theStepExecutionDetails.getParameters(); + int numPidsToGenerate = Integer.parseInt(params.getParam1()); + Step1Output output = new Step1Output(); + for (long i = 0; i < numPidsToGenerate; ++i) { + output.addPid(i); + } + theDataSink.accept(output); + return new RunOutcome(numPidsToGenerate); + } + }; + + IJobStepWorker lastStepWorker = (details, sink) -> { + // Our test does not call this worker + fail(); + return null; + }; + + JobDefinition job = JobDefinition.newBuilder() + .setJobDefinitionId(JOB_DEF_ID) + .setJobDescription(JOB_DESC) + .setJobDefinitionVersion(JOB_DEF_VERSION) + .setParametersType(TestJobParameters.class) + .addFirstStep(FIRST_STEP_ID, "s1desc", Step1Output.class, firstStepWorker) + .addLastStep(LAST_STEP_ID, "s2desc", lastStepWorker) + .build(); + + // execute + // Let's test our first step worker by calling run on it: + when(myJobPersistence.storeWorkChunk(myBatchWorkChunkCaptor.capture())).thenReturn(CHUNK_ID); + StepExecutionDetails details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, JOB_INSTANCE_ID, CHUNK_ID); + JobDataSink sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, JOB_DEF_ID, JOB_DEF_VERSION, job.getSteps().get(1), JOB_INSTANCE_ID, FIRST_STEP_ID); + + RunOutcome result = firstStepWorker.run(details, sink); + + // verify + assertEquals(PID_COUNT, result.getRecordsProcessed()); + + // theDataSink.accept(output) called by firstStepWorker above calls two services. Let's validate them both. + + verify(myBatchJobSender).sendWorkChannelMessage(myJobWorkNotificationCaptor.capture()); + + JobWorkNotification notification = myJobWorkNotificationCaptor.getValue(); + assertEquals(JOB_DEF_ID, notification.getJobDefinitionId()); + assertEquals(JOB_INSTANCE_ID, notification.getInstanceId()); + assertEquals(CHUNK_ID, notification.getChunkId()); + assertEquals(JOB_DEF_VERSION, notification.getJobDefinitionVersion()); + assertEquals(LAST_STEP_ID, notification.getTargetStepId()); + + BatchWorkChunk batchWorkChunk = myBatchWorkChunkCaptor.getValue(); + assertEquals(JOB_DEF_VERSION, batchWorkChunk.jobDefinitionVersion); + assertEquals(0, batchWorkChunk.sequence); + assertEquals(JOB_DEF_ID, batchWorkChunk.jobDefinitionId); + assertEquals(JOB_INSTANCE_ID, batchWorkChunk.instanceId); + assertEquals(LAST_STEP_ID, batchWorkChunk.targetStepId); + Step1Output stepOutput = JsonUtil.deserialize(batchWorkChunk.serializedData, Step1Output.class); + assertThat(stepOutput.getPids(), hasSize(PID_COUNT)); + } + + private static class Step1Output implements IModelJson { + @JsonProperty("pids") + private List myPids; + + public List getPids() { + if (myPids == null) { + myPids = new ArrayList<>(); + } + return myPids; + } + + public Step1Output setPids(List thePids) { + myPids = thePids; + return this; + } + + public void addPid(long thePid) { + getPids().add(thePid); + } + } +} diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java new file mode 100644 index 00000000000..07123ea0fd8 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/impl/JobDefinitionTest.java @@ -0,0 +1,38 @@ +package ca.uhn.fhir.batch2.impl; + +import ca.uhn.fhir.batch2.model.JobDefinition; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +class JobDefinitionTest { + private static final String JOB_DEF_ID = "Jeff"; + private static final String JOB_DESC = "Jeff is curious"; + + @Test + public void emptyBuilder_fails() { + try { + JobDefinition.newBuilder().build(); + fail(); + } catch (NullPointerException e) { + assertEquals("No job parameters type was supplied", e.getMessage()); + } + } + + @Test + public void builder_no_steps() { + try { + JobDefinition.newBuilder() + .setJobDefinitionId(JOB_DEF_ID) + .setJobDescription(JOB_DESC) + .setJobDefinitionVersion(1) + .setParametersType(TestJobParameters.class) + .build(); + fail(); + } catch (IllegalArgumentException e) { + assertEquals("At least 2 steps must be supplied", e.getMessage()); + } + } +}