Add Payload Search Mode To Message Channel (#4325)

* Added possible solution - needs to be refined

* Small changes

* Refactoring

* Refactoring + Test

* Changelog

* Small edits to changelog

* Small changes

* Changes made based on comments

* Clean up

* Changes made based on comments

* Changes made based on comments

Co-authored-by: qingyixia <cherry.xia@smilecdr.com>
This commit is contained in:
karneet1212 2022-12-15 09:46:10 -05:00 committed by GitHub
parent 7b86eda8a9
commit 0035677563
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 32 deletions

View File

@ -0,0 +1,7 @@
---
type: add
issue: 4323
title: "Previously, the feature of Payload Search Result Mode only applied to Rest Hook subscriptions.
Now, this feature also applies to Message channel type. Message delivery module will perform a search using
user-defined criteria and the search results will be delivered to the user-specified endpoint channel as a bundle
resource when Payload Search Result Mode is used."

View File

@ -21,15 +21,25 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.util.BundleBuilder;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.text.StringSubstitutor;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -37,6 +47,11 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.HashMap;
import java.util.Map;
import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest;
public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler {
private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
@ -46,6 +61,10 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
protected SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private MatchUrlService myMatchUrlService;
@Override
public void handleMessage(Message theMessage) throws MessagingException {
@ -100,6 +119,26 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
protected IBaseBundle createDeliveryBundleForPayloadSearchCriteria(CanonicalSubscription theSubscription, IBaseResource thePayloadResource) {
String resType = theSubscription.getPayloadSearchCriteria().substring(0, theSubscription.getPayloadSearchCriteria().indexOf('?'));
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resType);
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resType);
String payloadUrl = theSubscription.getPayloadSearchCriteria();
Map<String, String> valueMap = new HashMap<>(1);
valueMap.put("matched_resource_id", thePayloadResource.getIdElement().toUnqualifiedVersionless().getValue());
payloadUrl = new StringSubstitutor(valueMap).replace(payloadUrl);
SearchParameterMap payloadSearchMap = myMatchUrlService.translateMatchUrl(payloadUrl, resourceDefinition, MatchUrlService.processIncludes());
payloadSearchMap.setLoadSynchronous(true);
IBundleProvider searchResults = dao.search(payloadSearchMap, createRequestDetailForPartitionedRequest(theSubscription));
BundleBuilder builder = new BundleBuilder(myFhirContext);
for (IBaseResource next : searchResults.getAllResources()) {
builder.addTransactionUpdateEntry(next);
}
return builder.getBundle();
}
@VisibleForTesting
public void setFhirContextForUnitTest(FhirContext theCtx) {
myFhirContext = theCtx;
@ -115,6 +154,16 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
mySubscriptionRegistry = theSubscriptionRegistry;
}
@VisibleForTesting
public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
myDaoRegistry = theDaoRegistry;
}
@VisibleForTesting
public void setMatchUrlServiceForUnitTest(MatchUrlService theMatchUrlService) {
myMatchUrlService = theMatchUrlService;
}
public IInterceptorBroadcaster getInterceptorBroadcaster() {
return myInterceptorBroadcaster;
}

View File

@ -41,6 +41,8 @@ import org.springframework.messaging.MessagingException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Scope("prototype")
public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDeliverySubscriber {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageSubscriber.class);
@ -56,12 +58,18 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
}
protected void doDelivery(ResourceDeliveryMessage theSourceMessage, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theWrappedMessageToSend) {
String payloadId = theSourceMessage.getPayloadId();
if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
ResourceModifiedJsonMessage newWrappedMessageToSend = convertDeliveryMessageToResourceModifiedMessage(theSourceMessage, payloadResource);
theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
payloadId = payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
}
theChannelProducer.send(theWrappedMessageToSend);
ourLog.debug("Delivering {} message payload {} for {}", theSourceMessage.getOperationType(), theSourceMessage.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
ourLog.debug("Delivering {} message payload {} for {}", theSourceMessage.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
}
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg) {
IBaseResource thePayloadResource = theMsg.getPayload(myFhirContext);
private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedMessage(ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) {
ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
payload.setMessageKey(theMsg.getMessageKeyOrNull());
payload.setTransactionId(theMsg.getTransactionId());
@ -72,7 +80,8 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
@Override
public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
CanonicalSubscription subscription = theMessage.getSubscription();
ResourceModifiedJsonMessage messageWrapperToSend = convertDeliveryMessageToResourceModifiedMessage(theMessage);
IBaseResource payloadResource = theMessage.getPayload(myFhirContext);
ResourceModifiedJsonMessage messageWrapperToSend = convertDeliveryMessageToResourceModifiedMessage(theMessage, payloadResource);
// Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
HookParams params = new HookParams()

View File

@ -29,13 +29,11 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IHttpClient;
@ -47,8 +45,7 @@ import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import ca.uhn.fhir.util.BundleBuilder;
import org.apache.commons.text.StringSubstitutor;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
@ -65,7 +62,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Scope("prototype")
@ -144,27 +140,8 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
}
private IClientExecutable<?, ?> createDeliveryRequestTransaction(CanonicalSubscription theSubscription, IGenericClient theClient, IBaseResource thePayloadResource) {
IClientExecutable<?, ?> operation;
String resType = theSubscription.getPayloadSearchCriteria().substring(0, theSubscription.getPayloadSearchCriteria().indexOf('?'));
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resType);
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resType);
String payloadUrl = theSubscription.getPayloadSearchCriteria();
Map<String, String> valueMap = new HashMap<>(1);
valueMap.put("matched_resource_id", thePayloadResource.getIdElement().toUnqualifiedVersionless().getValue());
payloadUrl = new StringSubstitutor(valueMap).replace(payloadUrl);
SearchParameterMap payloadSearchMap = myMatchUrlService.translateMatchUrl(payloadUrl, resourceDefinition, MatchUrlService.processIncludes());
payloadSearchMap.setLoadSynchronous(true);
IBundleProvider searchResults = dao.search(payloadSearchMap, createRequestDetailForPartitionedRequest(theSubscription));
BundleBuilder builder = new BundleBuilder(myFhirContext);
for (IBaseResource next : searchResults.getAllResources()) {
builder.addTransactionUpdateEntry(next);
}
operation = theClient.transaction().withBundle(builder.getBundle());
return operation;
IBaseBundle bundle = createDeliveryBundleForPayloadSearchCriteria(theSubscription, thePayloadResource);
return theClient.transaction().withBundle(bundle);
}
public IBaseResource getResource(IIdType payloadId, RequestPartitionId thePartitionId, boolean theDeletedOK) throws ResourceGoneException {

View File

@ -6,6 +6,10 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.match.deliver.message.SubscriptionDeliveringMessageSubscriber;
@ -17,10 +21,14 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.BeforeEach;
@ -41,7 +49,6 @@ import javax.annotation.Nonnull;
import java.net.URISyntaxException;
import java.time.LocalDate;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
@ -53,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -80,6 +88,15 @@ public class BaseSubscriptionDeliverySubscriberTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private IGenericClient myGenericClient;
@Mock
private DaoRegistry myDaoRegistry;
@Mock
private IFhirResourceDao myResourceDao;
@Mock
private MatchUrlService myMatchUrlService;
@BeforeEach
public void before() {
mySubscriber = new SubscriptionDeliveringRestHookSubscriber();
@ -91,7 +108,8 @@ public class BaseSubscriptionDeliverySubscriberTest {
myMessageSubscriber.setFhirContextForUnitTest(myCtx);
myMessageSubscriber.setInterceptorBroadcasterForUnitTest(myInterceptorBroadcaster);
myMessageSubscriber.setSubscriptionRegistryForUnitTest(mySubscriptionRegistry);
myMessageSubscriber.setDaoRegistryForUnitTest(myDaoRegistry);
myMessageSubscriber.setMatchUrlServiceForUnitTest(myMatchUrlService);
myCtx.setRestfulClientFactory(myRestfulClientFactory);
when(myRestfulClientFactory.newGenericClient(any())).thenReturn(myGenericClient);
}
@ -213,6 +231,45 @@ public class BaseSubscriptionDeliverySubscriberTest {
assertThat(foo, containsInAnyOrder("bar", "bar2"));
}
@Test
public void testMessageSubscriptionWithPayloadSearchMode() throws URISyntaxException {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY), ArgumentMatchers.any(HookParams.class))).thenReturn(true);
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY), any())).thenReturn(false);
when(myChannelFactory.getOrCreateProducer(any(), any(), any())).thenReturn(myChannelProducer);
when(myDaoRegistry.getResourceDao(anyString())).thenReturn(myResourceDao);
when(myMatchUrlService.translateMatchUrl(any(), any(), any())).thenReturn(new SearchParameterMap());
Patient p1 = generatePatient();
Patient p2 = generatePatient();
IBundleProvider bundleProvider = new SimpleBundleProvider(List.of(p1,p2));
when(myResourceDao.search(any(), any())).thenReturn(bundleProvider);
CanonicalSubscription subscription = generateSubscription();
subscription.setPayloadSearchCriteria("Patient?_include=*");
ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription);
payload.setPayload(myCtx, p1, EncodingEnum.JSON);
payload.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
myMessageSubscriber.handleMessage(payload);
ArgumentCaptor<ResourceModifiedJsonMessage> captor = ArgumentCaptor.forClass(ResourceModifiedJsonMessage.class);
verify(myChannelProducer).send(captor.capture());
final List<ResourceModifiedJsonMessage> messages = captor.getAllValues();
assertThat(messages, hasSize(1));
ResourceModifiedMessage receivedMessage = messages.get(0).getPayload();
assertEquals(receivedMessage.getPayloadId(), "Bundle");
Bundle receivedBundle = (Bundle) receivedMessage.getPayload(myCtx);
assertThat(receivedBundle.getEntry(), hasSize(2));
assertEquals(p1.getIdElement().getValue(), receivedBundle.getEntry().get(0).getResource().getIdElement().getValue());
assertEquals(p2.getIdElement().getValue(), receivedBundle.getEntry().get(1).getResource().getIdElement().getValue());
}
@Test
public void testRestHookDeliveryAbortedByInterceptor() {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY), any())).thenReturn(true);