Adds queue submitter class, for simplicity of manual triggers of Empi Runs

This commit is contained in:
Tadgh 2020-07-15 15:20:26 -07:00
parent f965b26f0d
commit acd881ca76
5 changed files with 80 additions and 12 deletions

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.empi.api.IEmpiLinkSvc;
import ca.uhn.fhir.empi.api.IEmpiLinkUpdaterSvc; import ca.uhn.fhir.empi.api.IEmpiLinkUpdaterSvc;
import ca.uhn.fhir.empi.api.IEmpiMatchFinderSvc; import ca.uhn.fhir.empi.api.IEmpiMatchFinderSvc;
import ca.uhn.fhir.empi.api.IEmpiPersonMergerSvc; import ca.uhn.fhir.empi.api.IEmpiPersonMergerSvc;
import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc;
import ca.uhn.fhir.empi.api.IEmpiSettings; import ca.uhn.fhir.empi.api.IEmpiSettings;
import ca.uhn.fhir.empi.log.Logs; import ca.uhn.fhir.empi.log.Logs;
import ca.uhn.fhir.empi.provider.EmpiProviderLoader; import ca.uhn.fhir.empi.provider.EmpiProviderLoader;
@ -51,6 +52,7 @@ import ca.uhn.fhir.jpa.empi.svc.EmpiMatchFinderSvcImpl;
import ca.uhn.fhir.jpa.empi.svc.EmpiMatchLinkSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiMatchLinkSvc;
import ca.uhn.fhir.jpa.empi.svc.EmpiPersonFindingSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiPersonFindingSvc;
import ca.uhn.fhir.jpa.empi.svc.EmpiPersonMergerSvcImpl; import ca.uhn.fhir.jpa.empi.svc.EmpiPersonMergerSvcImpl;
import ca.uhn.fhir.jpa.empi.svc.EmpiQueueSubmitterSvcImpl;
import ca.uhn.fhir.jpa.empi.svc.EmpiResourceDaoSvc; import ca.uhn.fhir.jpa.empi.svc.EmpiResourceDaoSvc;
import ca.uhn.fhir.rest.server.util.ISearchParamRetriever; import ca.uhn.fhir.rest.server.util.ISearchParamRetriever;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -100,6 +102,11 @@ public class EmpiConsumerConfig {
return new EmpiBatchSvcImpl(); return new EmpiBatchSvcImpl();
} }
@Bean
IEmpiQueueSubmitterSvc empiQueueSubmitterSvc() {
return new EmpiQueueSubmitterSvcImpl();
}
@Bean @Bean
EmpiEidUpdateService eidUpdateService() { EmpiEidUpdateService eidUpdateService() {
return new EmpiEidUpdateService(); return new EmpiEidUpdateService();

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.empi.svc;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.empi.api.IEmpiBatchService; import ca.uhn.fhir.empi.api.IEmpiBatchService;
import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.entity.EmpiTargetType; import ca.uhn.fhir.jpa.entity.EmpiTargetType;
@ -10,7 +11,6 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants; import ca.uhn.fhir.rest.server.provider.ProviderConstants;
@ -40,8 +40,12 @@ public class EmpiBatchSvcImpl implements IEmpiBatchService {
@Autowired @Autowired
private EmpiSearchParamSvc myEmpiSearchParamSvc; private EmpiSearchParamSvc myEmpiSearchParamSvc;
@Autowired
private IEmpiQueueSubmitterSvc myEmpiQueueSubmitterSvc;
@Autowired @Autowired
private IChannelFactory myChannelFactory; private IChannelFactory myChannelFactory;
private static final int queueAddingPageSize = 100;
@Override @Override
@ -55,16 +59,16 @@ public class EmpiBatchSvcImpl implements IEmpiBatchService {
getTargetTypeOrThrowException(theTargetType); getTargetTypeOrThrowException(theTargetType);
SearchParameterMap spMap = getSearchParameterMapFromCriteria(theTargetType, theCriteria); SearchParameterMap spMap = getSearchParameterMapFromCriteria(theTargetType, theCriteria);
IFhirResourceDao patientDao = myDaoRegistry.getResourceDao(theTargetType); IFhirResourceDao patientDao = myDaoRegistry.getResourceDao(theTargetType);
IBundleProvider search = patientDao.search(spMap.setLoadSynchronous(true)); IBundleProvider search = patientDao.search(spMap);
List<IBaseResource> resources = search.getResources(0, search.size());
int lowIndex = 0;
for (IBaseResource resource : resources) { List<IBaseResource> resources = search.getResources(lowIndex, lowIndex + queueAddingPageSize);
ResourceModifiedJsonMessage rmjm = new ResourceModifiedJsonMessage(); while(!resources.isEmpty()) {
ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, resource, ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); for (IBaseResource resource : resources) {
resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED); myEmpiQueueSubmitterSvc.manuallySubmitResourceToEmpi(resource);
rmjm.setPayload(resourceModifiedMessage); }
myEmpiChannelProducer.send(rmjm); lowIndex += queueAddingPageSize;
resources = search.getResources(lowIndex, lowIndex + queueAddingPageSize);
} }
} }

View File

@ -0,0 +1,49 @@
package ca.uhn.fhir.jpa.empi.svc;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.empi.api.IEmpiQueueSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import javax.annotation.PostConstruct;
import static ca.uhn.fhir.empi.api.IEmpiSettings.EMPI_CHANNEL_NAME;
/**
* This class is responsible for manual submissions of {@link IAnyResource} resources onto the Empi Queue.
*/
public class EmpiQueueSubmitterSvcImpl implements IEmpiQueueSubmitterSvc {
@Autowired
private IChannelNamer myChannelNamer;
private MessageChannel myEmpiChannelProducer;
@Autowired
private FhirContext myFhirContext;
@Autowired
private IChannelFactory myChannelFactory;
@Override
public void manuallySubmitResourceToEmpi(IBaseResource theResource) {
ResourceModifiedJsonMessage resourceModifiedJsonMessage = new ResourceModifiedJsonMessage();
ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED);
resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED);
resourceModifiedJsonMessage.setPayload(resourceModifiedMessage);
myEmpiChannelProducer.send(resourceModifiedJsonMessage);
}
@PostConstruct
private void init() {
ChannelProducerSettings channelSettings = new ChannelProducerSettings();
String channelName = myChannelNamer.getChannelName(EMPI_CHANNEL_NAME, channelSettings);
myEmpiChannelProducer= myChannelFactory.getOrCreateProducer(channelName, ResourceModifiedJsonMessage.class, channelSettings);
}
}

View File

@ -58,14 +58,14 @@ class EmpiBatchSvcImplTest extends BaseEmpiR4Test {
public void testEmpiBatchOnPatientType() throws Exception { public void testEmpiBatchOnPatientType() throws Exception {
for (int i =0; i < 10; i++) { for (int i =0; i < 10; i++) {
createPractitioner(buildPractitionerWithNameAndId("test", "id")); createPatient(buildPatientWithNameAndId("test", "id"));
} }
assertLinkCount(0); assertLinkCount(0);
afterEmpiLatch.setExpectedCount(10); afterEmpiLatch.setExpectedCount(10);
//SUT //SUT
myEmpiBatchSvc.runEmpiOnAllTargets(null); myEmpiBatchSvc.runEmpiOnTargetType("Patient", null);
afterEmpiLatch.awaitExpected(); afterEmpiLatch.awaitExpected();
assertLinkCount(10); assertLinkCount(10);

View File

@ -0,0 +1,8 @@
package ca.uhn.fhir.empi.api;
import org.hl7.fhir.instance.model.api.IBaseResource;
public interface IEmpiQueueSubmitterSvc {
void manuallySubmitResourceToEmpi(IBaseResource theResource);
}