From fa4bbe368517b2255c96544e6574619bb59deabb Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 30 Jun 2020 14:29:09 -0400 Subject: [PATCH] Allow search criteria as subscription delivery mechanism (#1951) * Terser should create correct Enumeration on create * Start work * Work on subscriptions * Work on seed bundles * Bundle transmission * Add changelog * Test fix * Fix LGTM warning --- .../ca/uhn/fhir/rest/param/ParameterUtil.java | 7 ++ .../ca/uhn/fhir/util/TransactionBuilder.java | 110 ++++++++++++++++++ .../1951-subscription-search-criteria.yaml | 5 + .../subscription/BaseSubscriptionsR4Test.java | 64 ++++++---- ...SubscriptionValidatingInterceptorTest.java | 4 + .../resthook/RestHookTestR4Test.java | 35 ++++++ .../uhn/fhir/jpa/model/util/JpaConstants.java | 4 + .../fhir/jpa/searchparam/MatchUrlService.java | 54 ++++++++- ...bscriptionDeliveringMessageSubscriber.java | 4 +- ...scriptionDeliveringRestHookSubscriber.java | 107 +++++++++++------ .../registry/SubscriptionCanonicalizer.java | 3 + .../model/CanonicalSubscription.java | 53 +++++---- .../SubscriptionValidatingInterceptor.java | 46 +++++--- .../rest/server/method/IncludeParameter.java | 23 ++-- .../uhn/fhir/util/TransactionBuilderTest.java | 40 +++++++ 15 files changed, 446 insertions(+), 113 deletions(-) create mode 100644 hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TransactionBuilder.java create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_1_0/1951-subscription-search-criteria.yaml create mode 100644 hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/util/TransactionBuilderTest.java diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/ParameterUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/ParameterUtil.java index 086c6d9e624..8d6a9d8a6d4 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/ParameterUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/rest/param/ParameterUtil.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.model.api.IQueryParameterType; import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.model.primitive.IntegerDt; import ca.uhn.fhir.rest.annotation.IdParam; +import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.QualifiedParamList; import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; import ca.uhn.fhir.rest.param.binder.QueryParameterAndBinder; @@ -386,4 +387,10 @@ public class ParameterUtil { return b.toString(); } + /** + * Returns true if the value is :iterate or :recurse (the former name of :iterate) for an _include parameter + */ + public static boolean isIncludeIterate(String theQualifier) { + return Constants.PARAM_INCLUDE_QUALIFIER_RECURSE.equals(theQualifier) || Constants.PARAM_INCLUDE_QUALIFIER_ITERATE.equals(theQualifier); + } } diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TransactionBuilder.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TransactionBuilder.java new file mode 100644 index 00000000000..80202430033 --- /dev/null +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/TransactionBuilder.java @@ -0,0 +1,110 @@ +package ca.uhn.fhir.util; + +import ca.uhn.fhir.context.BaseRuntimeChildDefinition; +import ca.uhn.fhir.context.BaseRuntimeElementDefinition; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import org.hl7.fhir.instance.model.api.IBase; +import org.hl7.fhir.instance.model.api.IBaseBundle; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.thymeleaf.util.Validate; + +/** + * This class can be used to build a Bundle resource to be used as a FHIR transaction. + * + * This is not yet complete, and doesn't support all FHIR features. USE WITH CAUTION as the API + * may change. + * + * @since 5.1.0 + */ +public class TransactionBuilder { + + private final FhirContext myContext; + private final IBaseBundle myBundle; + private final RuntimeResourceDefinition myBundleDef; + private final BaseRuntimeChildDefinition myEntryChild; + private final BaseRuntimeElementDefinition myEntryDef; + private final BaseRuntimeChildDefinition myEntryResourceChild; + private final BaseRuntimeChildDefinition myEntryFullUrlChild; + private final BaseRuntimeChildDefinition myEntryRequestChild; + private final BaseRuntimeElementDefinition myEntryRequestDef; + private final BaseRuntimeChildDefinition myEntryRequestUrlChild; + private final BaseRuntimeChildDefinition myEntryRequestMethodChild; + private final BaseRuntimeElementDefinition myEntryRequestMethodDef; + + /** + * Constructor + */ + public TransactionBuilder(FhirContext theContext) { + myContext = theContext; + + myBundleDef = myContext.getResourceDefinition("Bundle"); + myBundle = (IBaseBundle) myBundleDef.newInstance(); + + BaseRuntimeChildDefinition typeChild = myBundleDef.getChildByName("type"); + IPrimitiveType type = (IPrimitiveType) typeChild.getChildByName("type").newInstance(typeChild.getInstanceConstructorArguments()); + type.setValueAsString("transaction"); + typeChild.getMutator().setValue(myBundle, type); + + myEntryChild = myBundleDef.getChildByName("entry"); + myEntryDef = myEntryChild.getChildByName("entry"); + + myEntryResourceChild = myEntryDef.getChildByName("resource"); + myEntryFullUrlChild = myEntryDef.getChildByName("fullUrl"); + + myEntryRequestChild = myEntryDef.getChildByName("request"); + myEntryRequestDef = myEntryRequestChild.getChildByName("request"); + + myEntryRequestUrlChild = myEntryRequestDef.getChildByName("url"); + + myEntryRequestMethodChild = myEntryRequestDef.getChildByName("method"); + myEntryRequestMethodDef = myEntryRequestMethodChild.getChildByName("method"); + + + } + + /** + * Adds an entry containing an update (PUT) request + * + * @param theResource The resource to update + */ + public TransactionBuilder addUpdateEntry(IBaseResource theResource) { + Validate.notNull(theResource, "theResource must not be null"); + Validate.notEmpty(theResource.getIdElement().getValue(), "theResource must have an ID"); + + IBase entry = myEntryDef.newInstance(); + myEntryChild.getMutator().addValue(myBundle, entry); + + // Bundle.entry.fullUrl + IPrimitiveType fullUrl = (IPrimitiveType) myContext.getElementDefinition("uri").newInstance(); + fullUrl.setValueAsString(theResource.getIdElement().getValue()); + myEntryFullUrlChild.getMutator().setValue(entry, fullUrl); + + // Bundle.entry.resource + myEntryResourceChild.getMutator().setValue(entry, theResource); + + // Bundle.entry.request + IBase request = myEntryRequestDef.newInstance(); + myEntryRequestChild.getMutator().setValue(entry, request); + + // Bundle.entry.request.url + IPrimitiveType url = (IPrimitiveType) myContext.getElementDefinition("uri").newInstance(); + url.setValueAsString(theResource.getIdElement().toUnqualifiedVersionless().getValue()); + myEntryRequestUrlChild.getMutator().setValue(request, url); + + // Bundle.entry.request.url + IPrimitiveType method = (IPrimitiveType) myEntryRequestMethodDef.newInstance(myEntryRequestMethodChild.getInstanceConstructorArguments()); + method.setValueAsString("PUT"); + myEntryRequestMethodChild.getMutator().setValue(request, method); + + return this; + } + + + + public IBaseBundle getBundle() { + return myBundle; + } + +} diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_1_0/1951-subscription-search-criteria.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_1_0/1951-subscription-search-criteria.yaml new file mode 100644 index 00000000000..7adf974c426 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_1_0/1951-subscription-search-criteria.yaml @@ -0,0 +1,5 @@ +--- +type: add +issue: 1951 +title: "REST Hook subscriptions may now specify a search expression to be used to fetch a collection of + resources to deliver when a subscription matches." diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java index fe28b223e84..4f3f6c0b38a 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java @@ -7,13 +7,15 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.ResourceParam; +import ca.uhn.fhir.rest.annotation.Transaction; +import ca.uhn.fhir.rest.annotation.TransactionParam; import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; -import ca.uhn.fhir.util.BundleUtil; import ca.uhn.fhir.test.utilities.JettyUtil; +import ca.uhn.fhir.util.BundleUtil; import com.google.common.collect.Lists; import net.ttddyy.dsproxy.QueryCount; import net.ttddyy.dsproxy.listener.SingleQueryCountHolder; @@ -22,9 +24,19 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import org.hl7.fhir.r4.model.*; -import org.junit.*; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.CodeableConcept; +import org.hl7.fhir.r4.model.Coding; +import org.hl7.fhir.r4.model.IdType; +import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Subscription; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestParam; import javax.annotation.PostConstruct; import javax.servlet.http.HttpServletRequest; @@ -36,28 +48,23 @@ import java.util.List; @Ignore public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseSubscriptionsR4Test.class); - - private static Server ourListenerServer; - protected static int ourListenerPort; + protected static int ourListenerPort; protected static List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); protected static List ourHeaders = Collections.synchronizedList(new ArrayList<>()); + protected static List ourTransactions = Collections.synchronizedList(Lists.newArrayList()); + protected static List ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); + protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); + private static Server ourListenerServer; private static SingleQueryCountHolder ourCountHolder; - - @Autowired - private SingleQueryCountHolder myCountHolder; + private static String ourListenerServerBase; @Autowired protected SubscriptionTestUtil mySubscriptionTestUtil; @Autowired protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; - protected CountingInterceptor myCountingInterceptor; - - protected static List ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); - protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); - private static String ourListenerServerBase; - protected List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); - + @Autowired + private SingleQueryCountHolder myCountHolder; @After public void afterUnregisterRestHookListener() { @@ -87,6 +94,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test public void beforeReset() throws Exception { ourCreatedObservations.clear(); ourUpdatedObservations.clear(); + ourTransactions.clear(); ourContentTypes.clear(); ourHeaders.clear(); @@ -162,8 +170,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test } - - public static class ObservationListener implements IResourceProvider { + public static class ObservationResourceProvider implements IResourceProvider { @Create public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { @@ -202,6 +209,17 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test } + public static class PlainProvider { + + @Transaction + public Bundle transaction(@TransactionParam Bundle theInput) { + ourLog.info("Received transaction update"); + ourTransactions.add(theInput); + return theInput; + } + + } + @AfterClass public static void reportTotalSelects() { ourLog.info("Total database select queries: {}", getQueryCount().getSelect()); @@ -214,9 +232,9 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test @BeforeClass public static void startListenerServer() throws Exception { RestfulServer ourListenerRestServer = new RestfulServer(FhirContext.forR4()); - - ObservationListener obsListener = new ObservationListener(); - ourListenerRestServer.setResourceProviders(obsListener); + + ourListenerRestServer.registerProvider(new ObservationResourceProvider()); + ourListenerRestServer.registerProvider(new PlainProvider()); ourListenerServer = new Server(0); @@ -229,8 +247,8 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test ourListenerServer.setHandler(proxyHandler); JettyUtil.startServer(ourListenerServer); - ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); - ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; } @AfterClass diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionValidatingInterceptorTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionValidatingInterceptorTest.java index b15e699ce7d..bf8ee7f83c3 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionValidatingInterceptorTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionValidatingInterceptorTest.java @@ -85,6 +85,8 @@ public class SubscriptionValidatingInterceptorTest { @Test public void testValidate_RestHook_NoEndpoint() { + when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true); + Subscription subscription = new Subscription(); subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); subscription.setCriteria("Patient?identifier=foo"); @@ -102,6 +104,8 @@ public class SubscriptionValidatingInterceptorTest { @Test public void testValidate_RestHook_NoType() { + when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true); + Subscription subscription = new Subscription(); subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); subscription.setCriteria("Patient?identifier=foo"); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java index 2f5e5fc32c2..308b5ac638c 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java @@ -1013,4 +1013,39 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test { } + @Test + public void testDeliverSearchResult() throws Exception { + { + Subscription subscription = newSubscription("Observation?", "application/json"); + subscription.addExtension(JpaConstants.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT, new StringType("Observation?_id=${matched_resource_id}&_include=*")); + ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(subscription)); + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + mySubscriptionIds.add(methodOutcome.getId()); + waitForActivatedSubscriptionCount(1); + } + + { + Patient patient = new Patient(); + patient.setActive(true); + IIdType patientId = ourClient.create().resource(patient).execute().getId(); + + Observation observation = new Observation(); + observation.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("Catheter")); + observation.getSubject().setReferenceElement(patientId.toUnqualifiedVersionless()); + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + assertEquals(true, methodOutcome.getCreated()); + + waitForQueueToDrain(); + waitForSize(1, ourTransactions); + + ourLog.info("Received transaction: {}", myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(ourTransactions.get(0))); + + Bundle xact = ourTransactions.get(0); + assertEquals(2, xact.getEntry().size()); + + ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(ourTransactions.get(0))); + } + + } + } diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java index 671e0fbfc1b..ab7e5c45324 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/util/JpaConstants.java @@ -214,6 +214,10 @@ public class JpaConstants { * Extension ID for external binary references */ public static final String EXT_EXTERNALIZED_BINARY_ID = "http://hapifhir.io/fhir/StructureDefinition/externalized-binary-id"; + /** + * For subscription, deliver a bundle containing a search result instead of just a single resource + */ + public static final String EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT = "http://hapifhir.io/fhir/StructureDefinition/subscription-payload-search-result"; /** * Placed in system-generated extensions */ diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/MatchUrlService.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/MatchUrlService.java index 6ef228f30c1..45ed8c78fd7 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/MatchUrlService.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/MatchUrlService.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; import ca.uhn.fhir.model.api.IQueryParameterAnd; import ca.uhn.fhir.model.api.IQueryParameterType; +import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.QualifiedParamList; import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; @@ -50,7 +51,7 @@ public class MatchUrlService { @Autowired private ISearchParamRegistry mySearchParamRegistry; - public SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition theResourceDefinition) { + public SearchParameterMap translateMatchUrl(String theMatchUrl, RuntimeResourceDefinition theResourceDefinition, Flag... theFlags) { SearchParameterMap paramMap = new SearchParameterMap(); List parameters = translateMatchUrl(theMatchUrl); @@ -79,6 +80,13 @@ public class MatchUrlService { for (String nextParamName : nameToParamLists.keySet()) { List paramList = nameToParamLists.get(nextParamName); + + if (theFlags != null && theFlags.length > 0) { + for (Flag next : theFlags) { + next.process(nextParamName, paramList, paramMap); + } + } + if (Constants.PARAM_LASTUPDATED.equals(nextParamName)) { if (paramList != null && paramList.size() > 0) { if (paramList.size() > 2) { @@ -131,8 +139,8 @@ public class MatchUrlService { return UrlUtil.translateMatchUrl(theMatchUrl); } - private IQueryParameterAnd newInstanceAnd(String theParamType) { - Class clazz = ResourceMetaParams.RESOURCE_META_AND_PARAMS.get(theParamType); + private IQueryParameterAnd newInstanceAnd(String theParamType) { + Class> clazz = ResourceMetaParams.RESOURCE_META_AND_PARAMS.get(theParamType); return ReflectionUtil.newInstance(clazz); } @@ -140,4 +148,44 @@ public class MatchUrlService { Class clazz = ResourceMetaParams.RESOURCE_META_PARAMS.get(theParamType); return ReflectionUtil.newInstance(clazz); } + + public abstract static class Flag { + + /** + * Constructor + */ + Flag() { + // nothing + } + + abstract void process(String theParamName, List theValues, SearchParameterMap theMapToPopulate); + + } + + /** + * Indicates that the parser should process _include and _revinclude (by default these are not handled) + */ + public static Flag processIncludes() { + return new Flag() { + + @Override + void process(String theParamName, List theValues, SearchParameterMap theMapToPopulate) { + if (Constants.PARAM_INCLUDE.equals(theParamName)) { + for (QualifiedParamList nextQualifiedList : theValues) { + for (String nextValue : nextQualifiedList) { + theMapToPopulate.addInclude(new Include(nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier()))); + } + } + } else if (Constants.PARAM_REVINCLUDE.equals(theParamName)) { + for (QualifiedParamList nextQualifiedList : theValues) { + for (String nextValue : nextQualifiedList) { + theMapToPopulate.addInclude(new Include(nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier()))); + } + } + } + + } + }; + } + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java index 9c27cd42584..561904a423f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/message/SubscriptionDeliveringMessageSubscriber.java @@ -74,7 +74,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException { CanonicalSubscription subscription = theMessage.getSubscription(); - // Interceptor call: SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY + // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY HookParams params = new HookParams() .add(CanonicalSubscription.class, subscription) .add(ResourceDeliveryMessage.class, theMessage); @@ -104,7 +104,7 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel deliverPayload(theMessage, subscription, channelProducer); - // Interceptor call: SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY + // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY params = new HookParams() .add(CanonicalSubscription.class, subscription) .add(ResourceDeliveryMessage.class, theMessage); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/resthook/SubscriptionDeliveringRestHookSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/resthook/SubscriptionDeliveringRestHookSubscriber.java index cc5dfc428af..0b7283aa05f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/resthook/SubscriptionDeliveringRestHookSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/deliver/resthook/SubscriptionDeliveringRestHookSubscriber.java @@ -25,11 +25,14 @@ import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.searchparam.MatchUrlService; +import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.RequestTypeEnum; +import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.client.api.Header; import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IHttpClient; @@ -40,7 +43,9 @@ import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; import ca.uhn.fhir.rest.gclient.IClientExecutable; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import ca.uhn.fhir.util.TransactionBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; @@ -49,6 +54,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.messaging.MessagingException; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -65,6 +71,9 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe @Autowired private DaoRegistry myDaoRegistry; + @Autowired + private MatchUrlService myMatchUrlService; + /** * Constructor */ @@ -81,55 +90,82 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient, IBaseResource thePayloadResource) { IClientExecutable operation; + + if (isNotBlank(theSubscription.getPayloadSearchResult())) { + operation = createDeliveryRequestTransaction(theSubscription, theClient, thePayloadResource); + } else if (thePayloadType != null) { + operation = createDeliveryRequestNormal(theMsg, theClient, thePayloadResource); + } else { + sendNotification(theMsg); + operation = null; + } + + if (operation != null) { + + if (thePayloadType != null) { + operation.encoded(thePayloadType); + } + + String payloadId = thePayloadResource.getIdElement().toUnqualified().getValue(); + ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); + + try { + operation.execute(); + } catch (ResourceNotFoundException e) { + ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl()); + ourLog.error("Exception: ", e); + throw e; + } + + } + } + + @Nullable + private IClientExecutable createDeliveryRequestNormal(ResourceDeliveryMessage theMsg, IGenericClient theClient, IBaseResource thePayloadResource) { + IClientExecutable operation; switch (theMsg.getOperationType()) { case CREATE: case UPDATE: - if (thePayloadResource == null || thePayloadResource.isEmpty()) { - if (thePayloadType != null) { - operation = theClient.create().resource(thePayloadResource); - } else { - sendNotification(theMsg); - return; - } - } else { - if (thePayloadType != null) { - operation = theClient.update().resource(thePayloadResource); - } else { - sendNotification(theMsg); - return; - } - } + operation = theClient.update().resource(thePayloadResource); break; case DELETE: operation = theClient.delete().resourceById(theMsg.getPayloadId(myFhirContext)); break; default: ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType()); - return; + operation = null; + break; + } + return operation; + } + + private IClientExecutable createDeliveryRequestTransaction(CanonicalSubscription theSubscription, IGenericClient theClient, IBaseResource thePayloadResource) { + IClientExecutable operation; + String resType = theSubscription.getPayloadSearchResult().substring(0, theSubscription.getPayloadSearchResult().indexOf('?')); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resType); + RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resType); + + String payloadUrl = theSubscription.getPayloadSearchResult(); + Map valueMap = new HashMap<>(1); + valueMap.put("matched_resource_id", thePayloadResource.getIdElement().toUnqualifiedVersionless().getValue()); + payloadUrl = new StringSubstitutor(valueMap).replace(payloadUrl); + SearchParameterMap payloadSearchMap = myMatchUrlService.translateMatchUrl(payloadUrl, resourceDefinition, MatchUrlService.processIncludes()); + payloadSearchMap.setLoadSynchronous(true); + + IBundleProvider searchResults = dao.search(payloadSearchMap); + + TransactionBuilder builder = new TransactionBuilder(myFhirContext); + for (IBaseResource next : searchResults.getResources(0, searchResults.size())) { + builder.addUpdateEntry(next); } - if (thePayloadType != null) { - operation.encoded(thePayloadType); - } - - String payloadId = null; - if (thePayloadResource != null) { - payloadId = thePayloadResource.getIdElement().toUnqualified().getValue(); - } - ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue()); - - try { - operation.execute(); - } catch (ResourceNotFoundException e) { - ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl()); - ourLog.error("Exception: ", e); - throw e; - } + operation = theClient.transaction().withBundle(builder.getBundle()); + return operation; } public IBaseResource getResource(IIdType payloadId) throws ResourceGoneException { RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(payloadId.getResourceType()); - IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass()); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass()); return dao.read(payloadId.toVersionless()); } @@ -237,8 +273,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe // close connection in order to return a possible cached connection to the connection pool response.close(); } catch (IOException e) { - ourLog.error("Error trying to reach " + theMsg.getSubscription().getEndpointUrl()); - e.printStackTrace(); + ourLog.error("Error trying to reach {}: {}", theMsg.getSubscription().getEndpointUrl(), e.toString()); throw new ResourceNotFoundException(e.getMessage()); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java index ea6b9b3ff52..8440c9f7e6e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionCanonicalizer.java @@ -113,6 +113,7 @@ public class SubscriptionCanonicalizer { retVal.setChannelExtensions(extractExtension(subscription)); retVal.setIdElement(subscription.getIdElement()); retVal.setPayloadString(subscription.getChannel().getPayload()); + retVal.setPayloadSearchResult(getExtensionString(subscription, JpaConstants.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT)); if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { String from; @@ -208,6 +209,7 @@ public class SubscriptionCanonicalizer { retVal.setChannelExtensions(extractExtension(subscription)); retVal.setIdElement(subscription.getIdElement()); retVal.setPayloadString(subscription.getChannel().getPayload()); + retVal.setPayloadSearchResult(getExtensionString(subscription, JpaConstants.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT)); if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { String from; @@ -261,6 +263,7 @@ public class SubscriptionCanonicalizer { retVal.setChannelExtensions(extractExtension(subscription)); retVal.setIdElement(subscription.getIdElement()); retVal.setPayloadString(subscription.getContentType()); + retVal.setPayloadSearchResult(getExtensionString(subscription, JpaConstants.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT)); if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { String from; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java index a8ba6d41f77..c78780153e4 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/CanonicalSubscription.java @@ -34,7 +34,11 @@ import org.hl7.fhir.r4.model.Subscription; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -64,6 +68,8 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso private RestHookDetails myRestHookDetails; @JsonProperty("extensions") private Map> myChannelExtensions; + @JsonProperty("payloadSearchResult") + private String myPayloadSearchResult; /** * Constructor @@ -72,6 +78,14 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso super(); } + public String getPayloadSearchResult() { + return myPayloadSearchResult; + } + + public void setPayloadSearchResult(String thePayloadSearchResult) { + myPayloadSearchResult = thePayloadSearchResult; + } + /** * For now we're using the R4 TriggerDefinition, but this * may change in the future when things stabilize @@ -80,7 +94,6 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso myTrigger = theTrigger; } - public CanonicalSubscriptionChannelType getChannelType() { return myChannelType; } @@ -136,7 +149,7 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso public String getChannelExtension(String theUrl) { String retVal = null; List strings = myChannelExtensions.get(theUrl); - if (strings != null && strings.isEmpty()==false) { + if (strings != null && strings.isEmpty() == false) { retVal = strings.get(0); } return retVal; @@ -276,6 +289,23 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso } } + @Override + public String toString() { + return new ToStringBuilder(this) + .append("myIdElement", myIdElement) + .append("myStatus", myStatus) + .append("myCriteriaString", myCriteriaString) + .append("myEndpointUrl", myEndpointUrl) + .append("myPayloadString", myPayloadString) +// .append("myHeaders", myHeaders) + .append("myChannelType", myChannelType) +// .append("myTrigger", myTrigger) +// .append("myEmailDetails", myEmailDetails) +// .append("myRestHookDetails", myRestHookDetails) +// .append("myChannelExtensions", myChannelExtensions) + .toString(); + } + public static class EmailDetails implements IModelJson { @JsonProperty("from") @@ -394,21 +424,4 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso } } - - @Override - public String toString() { - return new ToStringBuilder(this) - .append("myIdElement", myIdElement) - .append("myStatus", myStatus) - .append("myCriteriaString", myCriteriaString) - .append("myEndpointUrl", myEndpointUrl) - .append("myPayloadString", myPayloadString) -// .append("myHeaders", myHeaders) - .append("myChannelType", myChannelType) -// .append("myTrigger", myTrigger) -// .append("myEmailDetails", myEmailDetails) -// .append("myRestHookDetails", myRestHookDetails) -// .append("myChannelExtensions", myChannelExtensions) - .toString(); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java index 267308f9b4a..3c5b7a1dc91 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java @@ -25,6 +25,7 @@ import ca.uhn.fhir.interceptor.api.Hook; import ca.uhn.fhir.interceptor.api.Interceptor; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; @@ -96,32 +97,19 @@ public class SubscriptionValidatingInterceptor { if (!finished) { - String query = subscription.getCriteriaString(); - if (isBlank(query)) { - throw new UnprocessableEntityException("Subscription.criteria must be populated"); - } + validateQuery(subscription.getCriteriaString(), "Subscription.criteria"); - int sep = query.indexOf('?'); - if (sep <= 1) { - throw new UnprocessableEntityException("Subscription.criteria must be in the form \"{Resource Type}?[params]\""); - } - - String resType = query.substring(0, sep); - if (resType.contains("/")) { - throw new UnprocessableEntityException("Subscription.criteria must be in the form \"{Resource Type}?[params]\""); + if (subscription.getPayloadSearchResult() != null) { + validateQuery(subscription.getPayloadSearchResult(), "Subscription.extension(url='" + JpaConstants.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_RESULT + "')"); } validateChannelType(subscription); - if (!myDaoRegistry.isResourceTypeSupported(resType)) { - throw new UnprocessableEntityException("Subscription.criteria contains invalid/unsupported resource type: " + resType); - } - try { - SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(query); + SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription.getCriteriaString()); mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy); } catch (InvalidRequestException | DataFormatException e) { - throw new UnprocessableEntityException("Invalid subscription criteria submitted: " + query + " " + e.getMessage()); + throw new UnprocessableEntityException("Invalid subscription criteria submitted: " + subscription.getCriteriaString() + " " + e.getMessage()); } if (subscription.getChannelType() == null) { @@ -129,6 +117,28 @@ public class SubscriptionValidatingInterceptor { } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) { validateMessageSubscriptionEndpoint(subscription.getEndpointUrl()); } + + + } + } + + public void validateQuery(String theQuery, String theFieldName) { + if (isBlank(theQuery)) { + throw new UnprocessableEntityException(theFieldName + " must be populated"); + } + + int sep = theQuery.indexOf('?'); + if (sep <= 1) { + throw new UnprocessableEntityException(theFieldName + " must be in the form \"{Resource Type}?[params]\""); + } + + String resType = theQuery.substring(0, sep); + if (resType.contains("/")) { + throw new UnprocessableEntityException(theFieldName + " must be in the form \"{Resource Type}?[params]\""); + } + + if (!myDaoRegistry.isResourceTypeSupported(resType)) { + throw new UnprocessableEntityException(theFieldName + " contains invalid/unsupported resource type: " + resType); } } diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/IncludeParameter.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/IncludeParameter.java index 2dfc38d4fc7..2d91ec631b7 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/IncludeParameter.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/method/IncludeParameter.java @@ -20,14 +20,6 @@ package ca.uhn.fhir.rest.server.method; * #L% */ -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.model.api.Include; @@ -35,9 +27,18 @@ import ca.uhn.fhir.rest.annotation.IncludeParam; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.QualifiedParamList; import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; +import ca.uhn.fhir.rest.param.ParameterUtil; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + class IncludeParameter extends BaseQueryParameter { private Set myAllow; @@ -142,7 +143,7 @@ class IncludeParameter extends BaseQueryParameter { } String qualifier = nextParamList.getQualifier(); - boolean recurse = Constants.PARAM_INCLUDE_QUALIFIER_RECURSE.equals(qualifier) || Constants.PARAM_INCLUDE_QUALIFIER_ITERATE.equals(qualifier); + boolean iterate = ParameterUtil.isIncludeIterate(qualifier); String value = nextParamList.get(0); if (myAllow != null && !myAllow.isEmpty()) { @@ -157,10 +158,10 @@ class IncludeParameter extends BaseQueryParameter { if (mySpecType == String.class) { return value; } - return new Include(value, recurse); + return new Include(value, iterate); } - retValCollection.add(new Include(value, recurse)); + retValCollection.add(new Include(value, iterate)); } return retValCollection; diff --git a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/util/TransactionBuilderTest.java b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/util/TransactionBuilderTest.java new file mode 100644 index 00000000000..2d7887131a2 --- /dev/null +++ b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/util/TransactionBuilderTest.java @@ -0,0 +1,40 @@ +package ca.uhn.fhir.util; + +import ca.uhn.fhir.context.FhirContext; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.codesystems.HttpVerb; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +public class TransactionBuilderTest { + private static final Logger ourLog = LoggerFactory.getLogger(TransactionBuilderTest.class); + private FhirContext myFhirContext = FhirContext.forR4(); + + @Test + public void testAddEntryUpdate() { + TransactionBuilder builder = new TransactionBuilder(myFhirContext); + + Patient patient = new Patient(); + patient.setId("http://foo/Patient/123"); + patient.setActive(true); + builder.addUpdateEntry(patient); + + Bundle bundle = (Bundle) builder.getBundle(); + ourLog.info("Bundle:\n{}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(bundle)); + + assertEquals(Bundle.BundleType.TRANSACTION, bundle.getType()); + assertEquals(1, bundle.getEntry().size()); + assertSame(patient, bundle.getEntry().get(0).getResource()); + assertEquals("http://foo/Patient/123", bundle.getEntry().get(0).getFullUrl()); + assertEquals("Patient/123", bundle.getEntry().get(0).getRequest().getUrl()); + assertEquals(Bundle.HTTPVerb.PUT, bundle.getEntry().get(0).getRequest().getMethod()); + + + } + +}