diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4911-subscription-topic-dispatcher.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4911-subscription-topic-dispatcher.yaml new file mode 100644 index 00000000000..7688060a534 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4911-subscription-topic-dispatcher.yaml @@ -0,0 +1,4 @@ +--- +type: add +issue: 4911 +title: "Added a new SubscriptionTopicDispatcher service for use by java extensions that need to dispatch their own subscription topic notifications" diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionDeliveryRequest.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionDeliveryRequest.java new file mode 100644 index 00000000000..b804105ded6 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionDeliveryRequest.java @@ -0,0 +1,87 @@ +package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage; +import org.hl7.fhir.instance.model.api.IBaseBundle; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class SubscriptionDeliveryRequest { + // One of these two will be populated + private final IBaseResource myPayload; + private final IIdType myPayloadId; + private final ActiveSubscription myActiveSubscription; + private final RestOperationTypeEnum myRestOperationType; + private final RequestPartitionId myRequestPartitionId; + private final String myTransactionId; + + public SubscriptionDeliveryRequest(@Nonnull IBaseBundle theBundlePayload, @Nonnull ActiveSubscription theActiveSubscription, @Nonnull RestOperationTypeEnum theOperationType, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) { + myPayload = theBundlePayload; + myPayloadId = null; + myActiveSubscription = theActiveSubscription; + myRestOperationType = theOperationType; + myRequestPartitionId = theRequestPartitionId; + myTransactionId = theTransactionId; + } + + public SubscriptionDeliveryRequest(@Nonnull IBaseResource thePayload, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription) { + myPayload = thePayload; + myPayloadId = null; + myActiveSubscription = theActiveSubscription; + myRestOperationType = theMsg.getOperationType().asRestOperationType(); + myRequestPartitionId = theMsg.getPartitionId(); + myTransactionId = theMsg.getTransactionId(); + } + + public SubscriptionDeliveryRequest(@Nonnull IIdType thePayloadId, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription) { + myPayload = null; + myPayloadId = thePayloadId; + myActiveSubscription = theActiveSubscription; + myRestOperationType = theMsg.getOperationType().asRestOperationType(); + myRequestPartitionId = theMsg.getPartitionId(); + myTransactionId = theMsg.getTransactionId(); + } + + public IBaseResource getPayload() { + return myPayload; + } + + public ActiveSubscription getActiveSubscription() { + return myActiveSubscription; + } + + public RestOperationTypeEnum getRestOperationType() { + return myRestOperationType; + } + + public BaseResourceModifiedMessage.OperationTypeEnum getOperationType() { + return BaseResourceModifiedMessage.OperationTypeEnum.from(myRestOperationType); + } + + public RequestPartitionId getRequestPartitionId() { + return myRequestPartitionId; + } + + public String getTransactionId() { + return myTransactionId; + } + + public CanonicalSubscription getSubscription() { + return myActiveSubscription.getSubscription(); + } + + public IIdType getPayloadId() { + return myPayloadId; + } + + public boolean hasPayload() { + return myPayload != null; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java index 7bda78a60db..511678b2adb 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionMatchDeliverer.java @@ -37,6 +37,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.MessageChannel; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; public class SubscriptionMatchDeliverer { @@ -51,11 +54,46 @@ public class SubscriptionMatchDeliverer { mySubscriptionChannelRegistry = theSubscriptionChannelRegistry; } - public boolean deliverPayload(IBaseResource thePayload, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, InMemoryMatchResult matchResult) { - EncodingEnum encoding = null; + public boolean deliverPayload(@Nullable IBaseResource thePayload, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription, @Nullable InMemoryMatchResult theInMemoryMatchResult) { + SubscriptionDeliveryRequest subscriptionDeliveryRequest; + if (thePayload != null) { + subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription); + } else { + subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription); + } + ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); + deliveryMsg.copyAdditionalPropertiesFrom(theMsg); - CanonicalSubscription subscription = theActiveSubscription.getSubscription(); - String subscriptionId = theActiveSubscription.getId();; + return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg); + } + + public boolean deliverPayload(@Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest, @Nullable InMemoryMatchResult theInMemoryMatchResult) { + ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); + + return sendToDeliveryChannel(subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg); + } + + private boolean sendToDeliveryChannel(@Nonnull ActiveSubscription theActiveSubscription, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nonnull ResourceDeliveryMessage deliveryMsg) { + if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) { + return false; + } + + boolean retVal = false; + ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); + MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(theActiveSubscription.getChannelName()); + if (deliveryChannel != null) { + retVal = true; + trySendToDeliveryChannel(wrappedMsg, deliveryChannel); + } else { + ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId()); + } + return retVal; + } + + private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) { + EncodingEnum encoding = null; + + CanonicalSubscription subscription = theRequest.getSubscription(); if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) { encoding = EncodingEnum.forContentType(subscription.getPayloadString()); @@ -63,42 +101,30 @@ public class SubscriptionMatchDeliverer { encoding = defaultIfNull(encoding, EncodingEnum.JSON); ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); - deliveryMsg.setPartitionId(theMsg.getPartitionId()); + deliveryMsg.setPartitionId(theRequest.getRequestPartitionId()); - if (thePayload != null) { - deliveryMsg.setPayload(myFhirContext, thePayload, encoding); + if (theRequest.hasPayload()) { + deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding); } else { - deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext)); + deliveryMsg.setPayloadId(theRequest.getPayloadId()); } deliveryMsg.setSubscription(subscription); - deliveryMsg.setOperationType(theMsg.getOperationType()); - deliveryMsg.setTransactionId(theMsg.getTransactionId()); - deliveryMsg.copyAdditionalPropertiesFrom(theMsg); + deliveryMsg.setOperationType(theRequest.getOperationType()); + deliveryMsg.setTransactionId(theRequest.getTransactionId()); + return deliveryMsg; + } + private boolean callHooks(ActiveSubscription theActiveSubscription, InMemoryMatchResult theInMemoryMatchResult, ResourceDeliveryMessage deliveryMsg) { // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED HookParams params = new HookParams() .add(CanonicalSubscription.class, theActiveSubscription.getSubscription()) .add(ResourceDeliveryMessage.class, deliveryMsg) - .add(InMemoryMatchResult.class, matchResult); + .add(InMemoryMatchResult.class, theInMemoryMatchResult); if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) { - ourLog.info("Interceptor has decided to abort processing of subscription {}", subscriptionId); + ourLog.info("Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId()); return false; } - - return sendToDeliveryChannel(theActiveSubscription, deliveryMsg); - } - - private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { - boolean retVal = false; - ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); - MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName()); - if (deliveryChannel != null) { - retVal = true; - trySendToDeliveryChannel(wrappedMsg, deliveryChannel); - } else { - ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId()); - } - return retVal; + return true; } private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java index 5d8019dd676..bc626dfa5be 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicConfig.java @@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.topic; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; +import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; +import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator; import org.springframework.context.annotation.Bean; @@ -60,4 +62,9 @@ public class SubscriptionTopicConfig { SubscriptionTopicValidatingInterceptor subscriptionTopicValidatingInterceptor(FhirContext theFhirContext, SubscriptionQueryValidator theSubscriptionQueryValidator) { return new SubscriptionTopicValidatingInterceptor(theFhirContext, theSubscriptionQueryValidator); } + + @Bean + SubscriptionTopicDispatcher subscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) { + return new SubscriptionTopicDispatcher(theSubscriptionRegistry, theSubscriptionMatchDeliverer, theSubscriptionTopicPayloadBuilder); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicDispatcher.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicDispatcher.java new file mode 100644 index 00000000000..d2fd7a56cd0 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicDispatcher.java @@ -0,0 +1,71 @@ +package ca.uhn.fhir.jpa.topic; + +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; +import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionDeliveryRequest; +import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; +import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; +import org.hl7.fhir.instance.model.api.IBaseBundle; +import org.hl7.fhir.instance.model.api.IBaseResource; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; +import java.util.UUID; + +/** + * Subscription topic notifications are natively supported in R5, R4B. They are also partially supported and in R4 + * via the subscription backport spec Subscription Backport. + * In all versions, it is possible for a FHIR Repository to submit topic subscription notifications triggered by some + * arbitrary "business event". In R5 and R4B most subscription topic notifications will be triggered by a SubscriptionTopic + * match. However, in the R4 backport, the SubscriptionTopic is not supported and the SubscriptionTopicDispatcher service + * is provided to generate those notifications instead. Any custom java extension to the FHIR repository can @Autowire this service to + * send topic notifications to all Subscription resources subscribed to that topic. + */ +public class SubscriptionTopicDispatcher { + private final SubscriptionRegistry mySubscriptionRegistry; + private final SubscriptionMatchDeliverer mySubscriptionMatchDeliverer; + private final SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder; + + public SubscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) { + mySubscriptionRegistry = theSubscriptionRegistry; + mySubscriptionMatchDeliverer = theSubscriptionMatchDeliverer; + mySubscriptionTopicPayloadBuilder = theSubscriptionTopicPayloadBuilder; + } + + /** + * Deliver a Subscription topic notification to all subscriptions for the given topic. + * + * @param theTopicUrl Deliver to subscriptions for this topic + * @param theResources The list of resources to deliver. The first resource will be the primary "focus" resource per the Subscription documentation. + * This list should _not_ include the SubscriptionStatus. The SubscriptionStatus will be added as the first element to + * the delivered bundle. The reason for this is that the SubscriptionStatus needs to reference the subscription ID, which is + * not known until the bundle is delivered. + * @param theInMemoryMatchResult Information about the match event that led to this dispatch that is sent to SUBSCRIPTION_RESOURCE_MATCHED + * @param theRequestPartitionId The request partitions of the request, if any. This is used by subscriptions that need to perform repository + * operations as a part of their delivery. Those repository operations will be performed on the supplied request partitions + * @param theTransactionId The transaction ID of the request, if any. This is used for logging. + * @return The number of subscription notifications that were successfully queued for delivery + */ + public int dispatch(@Nonnull String theTopicUrl, @Nonnull List theResources, @Nonnull RestOperationTypeEnum theRequestType, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) { + int count = 0; + + List topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(theTopicUrl); + if (!topicSubscriptions.isEmpty()) { + for (ActiveSubscription activeSubscription : topicSubscriptions) { + // WIP STR5 apply subscription filters + IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(theResources, activeSubscription, theTopicUrl, theRequestType); + // WIP STR5 do we need to add a total? If so can do that with R5BundleFactory + bundlePayload.setId(UUID.randomUUID().toString()); + SubscriptionDeliveryRequest subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(bundlePayload, activeSubscription, theRequestType, theRequestPartitionId, theTransactionId); + boolean success = mySubscriptionMatchDeliverer.deliverPayload(subscriptionDeliveryRequest, theInMemoryMatchResult); + if (success) { + count++; + } + } + } + return count; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicMatchingSubscriber.java index 6014c018df5..4c2284d43ba 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicMatchingSubscriber.java @@ -25,12 +25,11 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; -import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.util.Logs; -import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r5.model.SubscriptionTopic; import org.slf4j.Logger; @@ -41,8 +40,8 @@ import org.springframework.messaging.MessagingException; import javax.annotation.Nonnull; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.UUID; public class SubscriptionTopicMatchingSubscriber implements MessageHandler { private static final Logger ourLog = Logs.getSubscriptionTopicLog(); @@ -60,6 +59,8 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler { SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder; @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; + @Autowired + private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher; public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) { myFhirContext = theFhirContext; @@ -97,24 +98,17 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler { SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic); InMemoryMatchResult result = matcher.match(theMsg); if (result.matched()) { - ourLog.info("Matched topic {} to message {}", topic.getUrl(), theMsg); - deliverToTopicSubscriptions(theMsg, topic, result); + int deliveries = deliverToTopicSubscriptions(theMsg, topic, result); + ourLog.info("Matched topic {} to message {}. Notifications sent to {} subscriptions for delivery.", topic.getUrl(), theMsg, deliveries); } } } - private void deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic topic, InMemoryMatchResult result) { - List topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(topic.getUrl()); - if (!topicSubscriptions.isEmpty()) { - IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext); + private int deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic theSubscriptionTopic, InMemoryMatchResult theInMemoryMatchResult) { + String topicUrl = theSubscriptionTopic.getUrl(); + List matchedResource = Collections.singletonList(theMsg.getNewPayload(myFhirContext)); + RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType(); - for (ActiveSubscription activeSubscription : topicSubscriptions) { - // WIP STR5 apply subscription filters - IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(matchedResource, theMsg, activeSubscription, topic); - // WIP STR5 do we need to add a total? If so can do that with R5BundleFactory - bundlePayload.setId(UUID.randomUUID().toString()); - mySubscriptionMatchDeliverer.deliverPayload(bundlePayload, theMsg, activeSubscription, result); - } - } + return mySubscriptionTopicDispatcher.dispatch(topicUrl, matchedResource, restOperationType, theInMemoryMatchResult, theMsg.getPartitionId(), theMsg.getTransactionId()); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilder.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilder.java index 5be71ba84e1..4788d66e210 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilder.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilder.java @@ -23,7 +23,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.util.BundleBuilder; import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50; import org.hl7.fhir.instance.model.api.IBaseBundle; @@ -32,8 +32,8 @@ import org.hl7.fhir.r5.model.Bundle; import org.hl7.fhir.r5.model.Enumerations; import org.hl7.fhir.r5.model.Reference; import org.hl7.fhir.r5.model.SubscriptionStatus; -import org.hl7.fhir.r5.model.SubscriptionTopic; +import java.util.List; import java.util.UUID; public class SubscriptionTopicPayloadBuilder { @@ -43,12 +43,12 @@ public class SubscriptionTopicPayloadBuilder { myFhirContext = theFhirContext; } - public IBaseBundle buildPayload(IBaseResource theMatchedResource, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic) { + public IBaseBundle buildPayload(List theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, RestOperationTypeEnum theRestOperationType) { BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext); // WIP STR5 set eventsSinceSubscriptionStart from the database int eventsSinceSubscriptionStart = 1; - IBaseResource subscriptionStatus = buildSubscriptionStatus(theMatchedResource, theActiveSubscription, theTopic, eventsSinceSubscriptionStart); + IBaseResource subscriptionStatus = buildSubscriptionStatus(theResources, theActiveSubscription, theTopicUrl, eventsSinceSubscriptionStart); FhirVersionEnum fhirVersion = myFhirContext.getVersion().getVersion(); @@ -65,21 +65,25 @@ public class SubscriptionTopicPayloadBuilder { // WIP STR5 is this the right type of entry? see http://hl7.org/fhir/subscriptionstatus-examples.html // WIP STR5 Also see http://hl7.org/fhir/R4B/notification-full-resource.json.html need to conform to these bundleBuilder.addCollectionEntry(subscriptionStatus); - switch (theMsg.getOperationType()) { - case CREATE: - bundleBuilder.addTransactionCreateEntry(theMatchedResource); - break; - case UPDATE: - bundleBuilder.addTransactionUpdateEntry(theMatchedResource); - break; - case DELETE: - bundleBuilder.addTransactionDeleteEntry(theMatchedResource); - break; + for (IBaseResource resource : theResources) { + switch(theRestOperationType) { + case CREATE: + bundleBuilder.addTransactionCreateEntry(resource); + break; + case UPDATE: + bundleBuilder.addTransactionUpdateEntry(resource); + break; + case DELETE: + bundleBuilder.addTransactionDeleteEntry(resource); + break; + } } + return bundleBuilder.getBundle(); } - private SubscriptionStatus buildSubscriptionStatus(IBaseResource theMatchedResource, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic, int theEventsSinceSubscriptionStart) { + + private SubscriptionStatus buildSubscriptionStatus(List theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, int theEventsSinceSubscriptionStart) { SubscriptionStatus subscriptionStatus = new SubscriptionStatus(); subscriptionStatus.setId(UUID.randomUUID().toString()); subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE); @@ -87,9 +91,14 @@ public class SubscriptionTopicPayloadBuilder { // WIP STR5 count events since subscription start and set eventsSinceSubscriptionStart // store counts by subscription id subscriptionStatus.setEventsSinceSubscriptionStart(theEventsSinceSubscriptionStart); - subscriptionStatus.addNotificationEvent().setEventNumber(theEventsSinceSubscriptionStart).setFocus(new Reference(theMatchedResource.getIdElement())); + SubscriptionStatus.SubscriptionStatusNotificationEventComponent event = subscriptionStatus.addNotificationEvent(); + event.setEventNumber(theEventsSinceSubscriptionStart); + if (theResources.size() > 0) { + event.setFocus(new Reference(theResources.get(0).getIdElement())); + } subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext))); - subscriptionStatus.setTopic(theTopic.getUrl()); + subscriptionStatus.setTopic(theTopicUrl); return subscriptionStatus; } + } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR4BTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR4BTest.java index c6d86f2bda1..20a6dd42e45 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR4BTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR4BTest.java @@ -3,13 +3,11 @@ package ca.uhn.fhir.jpa.topic; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; -import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.util.BundleUtil; import org.hl7.fhir.r4b.model.Bundle; import org.hl7.fhir.r4b.model.Encounter; import org.hl7.fhir.r4b.model.Resource; -import org.hl7.fhir.r5.model.SubscriptionTopic; import org.junit.jupiter.api.Test; import java.util.List; @@ -17,6 +15,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; class SubscriptionTopicPayloadBuilderR4BTest { + private static final String TEST_TOPIC_URL = "test-builder-topic-url"; FhirContext ourFhirContext = FhirContext.forR4BCached(); @Test public void testBuildPayloadDelete() { @@ -24,14 +23,11 @@ class SubscriptionTopicPayloadBuilderR4BTest { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.DELETE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); @@ -47,14 +43,11 @@ class SubscriptionTopicPayloadBuilderR4BTest { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.UPDATE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); @@ -71,14 +64,11 @@ class SubscriptionTopicPayloadBuilderR4BTest { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.CREATE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR5Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR5Test.java index 486bebdb72e..808ed42d8e9 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR5Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/topic/SubscriptionTopicPayloadBuilderR5Test.java @@ -3,13 +3,11 @@ package ca.uhn.fhir.jpa.topic; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; -import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; +import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.util.BundleUtil; import org.hl7.fhir.r5.model.Bundle; import org.hl7.fhir.r5.model.Encounter; import org.hl7.fhir.r5.model.Resource; -import org.hl7.fhir.r5.model.SubscriptionTopic; import org.junit.jupiter.api.Test; import java.util.List; @@ -17,6 +15,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; class SubscriptionTopicPayloadBuilderR5Test { + private static final String TEST_TOPIC_URL = "test-builder-topic-url"; FhirContext ourFhirContext = FhirContext.forR5Cached(); @Test public void testBuildPayloadDelete() { @@ -24,14 +23,11 @@ class SubscriptionTopicPayloadBuilderR5Test { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.DELETE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); @@ -47,14 +43,11 @@ class SubscriptionTopicPayloadBuilderR5Test { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.UPDATE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); @@ -71,14 +64,11 @@ class SubscriptionTopicPayloadBuilderR5Test { var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var encounter = new Encounter(); encounter.setId("Encounter/1"); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); CanonicalSubscription sub = new CanonicalSubscription(); ActiveSubscription subscription = new ActiveSubscription(sub, "test"); - SubscriptionTopic topic = new SubscriptionTopic(); - msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE); // run - Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); + Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.CREATE); // verify List resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java index acb52bed436..9fe9c51606b 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/messaging/BaseResourceMessage.java @@ -20,7 +20,7 @@ package ca.uhn.fhir.rest.server.messaging; - +import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import com.fasterxml.jackson.annotation.JsonProperty; @@ -146,7 +146,7 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso /** * Adds a transaction ID to this message. This ID can be used for many purposes. For example, performing tracing * across asynchronous hooks, tying data together, or downstream logging purposes. - * + *

* One current internal implementation uses this field to tie back MDM processing results (which are asynchronous) * to the original transaction log that caused the MDM processing to occur. * @@ -195,7 +195,7 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso * @return null by default */ @Nullable - protected String getMessageKeyDefaultValue(){ + protected String getMessageKeyDefaultValue() { return null; } @@ -212,6 +212,19 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso myRestOperationTypeEnum = theRestOperationTypeEnum; } + public static OperationTypeEnum from(RestOperationTypeEnum theRestOperationType) { + switch (theRestOperationType) { + case CREATE: + return CREATE; + case UPDATE: + return UPDATE; + case DELETE: + return DELETE; + default: + throw new IllegalArgumentException(Msg.code(2348) + "Unsupported operation type: " + theRestOperationType); + } + } + public RestOperationTypeEnum asRestOperationType() { return myRestOperationTypeEnum; }