From acd881ca76d3883853977b1dc4493b06e3e8c8d7 Mon Sep 17 00:00:00 2001 From: Tadgh Date: Wed, 15 Jul 2020 15:20:26 -0700 Subject: [PATCH] Adds queue submitter class, for simplicity of manual triggers of Empi Runs --- .../jpa/empi/config/EmpiConsumerConfig.java | 7 +++ .../fhir/jpa/empi/svc/EmpiBatchSvcImpl.java | 24 +++++---- .../empi/svc/EmpiQueueSubmitterSvcImpl.java | 49 +++++++++++++++++++ .../jpa/empi/svc/EmpiBatchSvcImplTest.java | 4 +- .../fhir/empi/api/IEmpiQueueSubmitterSvc.java | 8 +++ 5 files changed, 80 insertions(+), 12 deletions(-) create mode 100644 hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiQueueSubmitterSvcImpl.java create mode 100644 hapi-fhir-server-empi/src/main/java/ca/uhn/fhir/empi/api/IEmpiQueueSubmitterSvc.java diff --git a/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/config/EmpiConsumerConfig.java b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/config/EmpiConsumerConfig.java index 613e9386473..d4af64abb54 100644 --- a/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/config/EmpiConsumerConfig.java +++ b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/config/EmpiConsumerConfig.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.empi.api.IEmpiLinkSvc; import ca.uhn.fhir.empi.api.IEmpiLinkUpdaterSvc; import ca.uhn.fhir.empi.api.IEmpiMatchFinderSvc; import ca.uhn.fhir.empi.api.IEmpiPersonMergerSvc; +import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc; import ca.uhn.fhir.empi.api.IEmpiSettings; import ca.uhn.fhir.empi.log.Logs; import ca.uhn.fhir.empi.provider.EmpiProviderLoader; @@ -51,6 +52,7 @@ import ca.uhn.fhir.jpa.empi.svc.EmpiMatchFinderSvcImpl; import ca.uhn.fhir.jpa.empi.svc.EmpiMatchLinkSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiPersonFindingSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiPersonMergerSvcImpl; +import ca.uhn.fhir.jpa.empi.svc.EmpiQueueSubmitterSvcImpl; import ca.uhn.fhir.jpa.empi.svc.EmpiResourceDaoSvc; import ca.uhn.fhir.rest.server.util.ISearchParamRetriever; import org.slf4j.Logger; @@ -100,6 +102,11 @@ public class EmpiConsumerConfig { return new EmpiBatchSvcImpl(); } + @Bean + IEmpiQueueSubmitterSvc empiQueueSubmitterSvc() { + return new EmpiQueueSubmitterSvcImpl(); + } + @Bean EmpiEidUpdateService eidUpdateService() { return new EmpiEidUpdateService(); diff --git a/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImpl.java b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImpl.java index a2f92f0c9eb..0489095b851 100644 --- a/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImpl.java +++ b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImpl.java @@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.empi.svc; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.empi.api.IEmpiBatchService; +import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.entity.EmpiTargetType; @@ -10,7 +11,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.provider.ProviderConstants; @@ -40,8 +40,12 @@ public class EmpiBatchSvcImpl implements IEmpiBatchService { @Autowired private EmpiSearchParamSvc myEmpiSearchParamSvc; + @Autowired + private IEmpiQueueSubmitterSvc myEmpiQueueSubmitterSvc; + @Autowired private IChannelFactory myChannelFactory; + private static final int queueAddingPageSize = 100; @Override @@ -55,16 +59,16 @@ public class EmpiBatchSvcImpl implements IEmpiBatchService { getTargetTypeOrThrowException(theTargetType); SearchParameterMap spMap = getSearchParameterMapFromCriteria(theTargetType, theCriteria); IFhirResourceDao patientDao = myDaoRegistry.getResourceDao(theTargetType); - IBundleProvider search = patientDao.search(spMap.setLoadSynchronous(true)); - List resources = search.getResources(0, search.size()); + IBundleProvider search = patientDao.search(spMap); - - for (IBaseResource resource : resources) { - ResourceModifiedJsonMessage rmjm = new ResourceModifiedJsonMessage(); - ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, resource, ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); - resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); - rmjm.setPayload(resourceModifiedMessage); - myEmpiChannelProducer.send(rmjm); + int lowIndex = 0; + List resources = search.getResources(lowIndex, lowIndex + queueAddingPageSize); + while(!resources.isEmpty()) { + for (IBaseResource resource : resources) { + myEmpiQueueSubmitterSvc.manuallySubmitResourceToEmpi(resource); + } + lowIndex += queueAddingPageSize; + resources = search.getResources(lowIndex, lowIndex + queueAddingPageSize); } } diff --git a/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiQueueSubmitterSvcImpl.java b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiQueueSubmitterSvcImpl.java new file mode 100644 index 00000000000..b1b04302b9f --- /dev/null +++ b/hapi-fhir-jpaserver-empi/src/main/java/ca/uhn/fhir/jpa/empi/svc/EmpiQueueSubmitterSvcImpl.java @@ -0,0 +1,49 @@ +package ca.uhn.fhir.jpa.empi.svc; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc; +import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; +import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; +import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import org.hl7.fhir.instance.model.api.IAnyResource; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageChannel; + +import javax.annotation.PostConstruct; + +import static ca.uhn.fhir.empi.api.IEmpiSettings.EMPI_CHANNEL_NAME; + +/** + * This class is responsible for manual submissions of {@link IAnyResource} resources onto the Empi Queue. + */ +public class EmpiQueueSubmitterSvcImpl implements IEmpiQueueSubmitterSvc { + @Autowired + private IChannelNamer myChannelNamer; + + private MessageChannel myEmpiChannelProducer; + + @Autowired + private FhirContext myFhirContext; + + @Autowired + private IChannelFactory myChannelFactory; + + @Override + public void manuallySubmitResourceToEmpi(IBaseResource theResource) { + ResourceModifiedJsonMessage resourceModifiedJsonMessage = new ResourceModifiedJsonMessage(); + ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); + resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); + resourceModifiedJsonMessage.setPayload(resourceModifiedMessage); + myEmpiChannelProducer.send(resourceModifiedJsonMessage); + } + + @PostConstruct + private void init() { + ChannelProducerSettings channelSettings = new ChannelProducerSettings(); + String channelName = myChannelNamer.getChannelName(EMPI_CHANNEL_NAME, channelSettings); + myEmpiChannelProducer= myChannelFactory.getOrCreateProducer(channelName, ResourceModifiedJsonMessage.class, channelSettings); + } +} diff --git a/hapi-fhir-jpaserver-empi/src/test/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImplTest.java b/hapi-fhir-jpaserver-empi/src/test/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImplTest.java index e43f1ad446d..0285f6e6b6d 100644 --- a/hapi-fhir-jpaserver-empi/src/test/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImplTest.java +++ b/hapi-fhir-jpaserver-empi/src/test/java/ca/uhn/fhir/jpa/empi/svc/EmpiBatchSvcImplTest.java @@ -58,14 +58,14 @@ class EmpiBatchSvcImplTest extends BaseEmpiR4Test { public void testEmpiBatchOnPatientType() throws Exception { for (int i =0; i < 10; i++) { - createPractitioner(buildPractitionerWithNameAndId("test", "id")); + createPatient(buildPatientWithNameAndId("test", "id")); } assertLinkCount(0); afterEmpiLatch.setExpectedCount(10); //SUT - myEmpiBatchSvc.runEmpiOnAllTargets(null); + myEmpiBatchSvc.runEmpiOnTargetType("Patient", null); afterEmpiLatch.awaitExpected(); assertLinkCount(10); diff --git a/hapi-fhir-server-empi/src/main/java/ca/uhn/fhir/empi/api/IEmpiQueueSubmitterSvc.java b/hapi-fhir-server-empi/src/main/java/ca/uhn/fhir/empi/api/IEmpiQueueSubmitterSvc.java new file mode 100644 index 00000000000..d956207628f --- /dev/null +++ b/hapi-fhir-server-empi/src/main/java/ca/uhn/fhir/empi/api/IEmpiQueueSubmitterSvc.java @@ -0,0 +1,8 @@ +package ca.uhn.fhir.empi.api; + +import org.hl7.fhir.instance.model.api.IBaseResource; + +public interface IEmpiQueueSubmitterSvc { + + void manuallySubmitResourceToEmpi(IBaseResource theResource); +}