SubscriptionTopicDispatcher (#4911)

* Extract SubscriptionTopicBundleDispatcher into a separate service for use by custom interceptors that need to dispatch their own notifications

* rename

* changelog

* rename

* review feedback

* Msg.code

* review feedback

---------

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-05-18 18:46:00 -04:00 committed by GitHub
parent d94627c382
commit a722cc46a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 286 additions and 95 deletions

View File

@ -0,0 +1,4 @@
---
type: add
issue: 4911
title: "Added a new SubscriptionTopicDispatcher service for use by java extensions that need to dispatch their own subscription topic notifications"

View File

@ -0,0 +1,87 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class SubscriptionDeliveryRequest {
// One of these two will be populated
private final IBaseResource myPayload;
private final IIdType myPayloadId;
private final ActiveSubscription myActiveSubscription;
private final RestOperationTypeEnum myRestOperationType;
private final RequestPartitionId myRequestPartitionId;
private final String myTransactionId;
public SubscriptionDeliveryRequest(@Nonnull IBaseBundle theBundlePayload, @Nonnull ActiveSubscription theActiveSubscription, @Nonnull RestOperationTypeEnum theOperationType, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) {
myPayload = theBundlePayload;
myPayloadId = null;
myActiveSubscription = theActiveSubscription;
myRestOperationType = theOperationType;
myRequestPartitionId = theRequestPartitionId;
myTransactionId = theTransactionId;
}
public SubscriptionDeliveryRequest(@Nonnull IBaseResource thePayload, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription) {
myPayload = thePayload;
myPayloadId = null;
myActiveSubscription = theActiveSubscription;
myRestOperationType = theMsg.getOperationType().asRestOperationType();
myRequestPartitionId = theMsg.getPartitionId();
myTransactionId = theMsg.getTransactionId();
}
public SubscriptionDeliveryRequest(@Nonnull IIdType thePayloadId, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription) {
myPayload = null;
myPayloadId = thePayloadId;
myActiveSubscription = theActiveSubscription;
myRestOperationType = theMsg.getOperationType().asRestOperationType();
myRequestPartitionId = theMsg.getPartitionId();
myTransactionId = theMsg.getTransactionId();
}
public IBaseResource getPayload() {
return myPayload;
}
public ActiveSubscription getActiveSubscription() {
return myActiveSubscription;
}
public RestOperationTypeEnum getRestOperationType() {
return myRestOperationType;
}
public BaseResourceModifiedMessage.OperationTypeEnum getOperationType() {
return BaseResourceModifiedMessage.OperationTypeEnum.from(myRestOperationType);
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public String getTransactionId() {
return myTransactionId;
}
public CanonicalSubscription getSubscription() {
return myActiveSubscription.getSubscription();
}
public IIdType getPayloadId() {
return myPayloadId;
}
public boolean hasPayload() {
return myPayload != null;
}
}

View File

@ -37,6 +37,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class SubscriptionMatchDeliverer { public class SubscriptionMatchDeliverer {
@ -51,11 +54,46 @@ public class SubscriptionMatchDeliverer {
mySubscriptionChannelRegistry = theSubscriptionChannelRegistry; mySubscriptionChannelRegistry = theSubscriptionChannelRegistry;
} }
public boolean deliverPayload(IBaseResource thePayload, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, InMemoryMatchResult matchResult) { public boolean deliverPayload(@Nullable IBaseResource thePayload, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription, @Nullable InMemoryMatchResult theInMemoryMatchResult) {
SubscriptionDeliveryRequest subscriptionDeliveryRequest;
if (thePayload != null) {
subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription);
} else {
subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription);
}
ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg);
}
public boolean deliverPayload(@Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest, @Nullable InMemoryMatchResult theInMemoryMatchResult) {
ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
return sendToDeliveryChannel(subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg);
}
private boolean sendToDeliveryChannel(@Nonnull ActiveSubscription theActiveSubscription, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nonnull ResourceDeliveryMessage deliveryMsg) {
if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) {
return false;
}
boolean retVal = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(theActiveSubscription.getChannelName());
if (deliveryChannel != null) {
retVal = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
} else {
ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId());
}
return retVal;
}
private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) {
EncodingEnum encoding = null; EncodingEnum encoding = null;
CanonicalSubscription subscription = theActiveSubscription.getSubscription(); CanonicalSubscription subscription = theRequest.getSubscription();
String subscriptionId = theActiveSubscription.getId();;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) { if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString()); encoding = EncodingEnum.forContentType(subscription.getPayloadString());
@ -63,42 +101,30 @@ public class SubscriptionMatchDeliverer {
encoding = defaultIfNull(encoding, EncodingEnum.JSON); encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId()); deliveryMsg.setPartitionId(theRequest.getRequestPartitionId());
if (thePayload != null) { if (theRequest.hasPayload()) {
deliveryMsg.setPayload(myFhirContext, thePayload, encoding); deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding);
} else { } else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext)); deliveryMsg.setPayloadId(theRequest.getPayloadId());
} }
deliveryMsg.setSubscription(subscription); deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType()); deliveryMsg.setOperationType(theRequest.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId()); deliveryMsg.setTransactionId(theRequest.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg); return deliveryMsg;
}
private boolean callHooks(ActiveSubscription theActiveSubscription, InMemoryMatchResult theInMemoryMatchResult, ResourceDeliveryMessage deliveryMsg) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams() HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription()) .add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg) .add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult); .add(InMemoryMatchResult.class, theInMemoryMatchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) { if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", subscriptionId); ourLog.info("Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId());
return false; return false;
} }
return true;
return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
}
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retVal = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName());
if (deliveryChannel != null) {
retVal = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
} else {
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
}
return retVal;
} }
private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) { private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -60,4 +62,9 @@ public class SubscriptionTopicConfig {
SubscriptionTopicValidatingInterceptor subscriptionTopicValidatingInterceptor(FhirContext theFhirContext, SubscriptionQueryValidator theSubscriptionQueryValidator) { SubscriptionTopicValidatingInterceptor subscriptionTopicValidatingInterceptor(FhirContext theFhirContext, SubscriptionQueryValidator theSubscriptionQueryValidator) {
return new SubscriptionTopicValidatingInterceptor(theFhirContext, theSubscriptionQueryValidator); return new SubscriptionTopicValidatingInterceptor(theFhirContext, theSubscriptionQueryValidator);
} }
@Bean
SubscriptionTopicDispatcher subscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
return new SubscriptionTopicDispatcher(theSubscriptionRegistry, theSubscriptionMatchDeliverer, theSubscriptionTopicPayloadBuilder);
}
} }

View File

@ -0,0 +1,71 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionDeliveryRequest;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.UUID;
/**
* Subscription topic notifications are natively supported in R5, R4B. They are also partially supported and in R4
* via the subscription backport spec <a href="http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/components.html">Subscription Backport</a>.
* In all versions, it is possible for a FHIR Repository to submit topic subscription notifications triggered by some
* arbitrary "business event". In R5 and R4B most subscription topic notifications will be triggered by a SubscriptionTopic
* match. However, in the R4 backport, the SubscriptionTopic is not supported and the SubscriptionTopicDispatcher service
* is provided to generate those notifications instead. Any custom java extension to the FHIR repository can @Autowire this service to
* send topic notifications to all Subscription resources subscribed to that topic.
*/
public class SubscriptionTopicDispatcher {
private final SubscriptionRegistry mySubscriptionRegistry;
private final SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
private final SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder;
public SubscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
mySubscriptionRegistry = theSubscriptionRegistry;
mySubscriptionMatchDeliverer = theSubscriptionMatchDeliverer;
mySubscriptionTopicPayloadBuilder = theSubscriptionTopicPayloadBuilder;
}
/**
* Deliver a Subscription topic notification to all subscriptions for the given topic.
*
* @param theTopicUrl Deliver to subscriptions for this topic
* @param theResources The list of resources to deliver. The first resource will be the primary "focus" resource per the Subscription documentation.
* This list should _not_ include the SubscriptionStatus. The SubscriptionStatus will be added as the first element to
* the delivered bundle. The reason for this is that the SubscriptionStatus needs to reference the subscription ID, which is
* not known until the bundle is delivered.
* @param theInMemoryMatchResult Information about the match event that led to this dispatch that is sent to SUBSCRIPTION_RESOURCE_MATCHED
* @param theRequestPartitionId The request partitions of the request, if any. This is used by subscriptions that need to perform repository
* operations as a part of their delivery. Those repository operations will be performed on the supplied request partitions
* @param theTransactionId The transaction ID of the request, if any. This is used for logging.
* @return The number of subscription notifications that were successfully queued for delivery
*/
public int dispatch(@Nonnull String theTopicUrl, @Nonnull List<IBaseResource> theResources, @Nonnull RestOperationTypeEnum theRequestType, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) {
int count = 0;
List<ActiveSubscription> topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(theTopicUrl);
if (!topicSubscriptions.isEmpty()) {
for (ActiveSubscription activeSubscription : topicSubscriptions) {
// WIP STR5 apply subscription filters
IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(theResources, activeSubscription, theTopicUrl, theRequestType);
// WIP STR5 do we need to add a total? If so can do that with R5BundleFactory
bundlePayload.setId(UUID.randomUUID().toString());
SubscriptionDeliveryRequest subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(bundlePayload, activeSubscription, theRequestType, theRequestPartitionId, theTransactionId);
boolean success = mySubscriptionMatchDeliverer.deliverPayload(subscriptionDeliveryRequest, theInMemoryMatchResult);
if (success) {
count++;
}
}
}
return count;
}
}

View File

@ -25,12 +25,11 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic; import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -41,8 +40,8 @@ import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
public class SubscriptionTopicMatchingSubscriber implements MessageHandler { public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
private static final Logger ourLog = Logs.getSubscriptionTopicLog(); private static final Logger ourLog = Logs.getSubscriptionTopicLog();
@ -60,6 +59,8 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder; SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder;
@Autowired @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher;
public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) { public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
myFhirContext = theFhirContext; myFhirContext = theFhirContext;
@ -97,24 +98,17 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic); SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic);
InMemoryMatchResult result = matcher.match(theMsg); InMemoryMatchResult result = matcher.match(theMsg);
if (result.matched()) { if (result.matched()) {
ourLog.info("Matched topic {} to message {}", topic.getUrl(), theMsg); int deliveries = deliverToTopicSubscriptions(theMsg, topic, result);
deliverToTopicSubscriptions(theMsg, topic, result); ourLog.info("Matched topic {} to message {}. Notifications sent to {} subscriptions for delivery.", topic.getUrl(), theMsg, deliveries);
} }
} }
} }
private void deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic topic, InMemoryMatchResult result) { private int deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic theSubscriptionTopic, InMemoryMatchResult theInMemoryMatchResult) {
List<ActiveSubscription> topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(topic.getUrl()); String topicUrl = theSubscriptionTopic.getUrl();
if (!topicSubscriptions.isEmpty()) { List<IBaseResource> matchedResource = Collections.singletonList(theMsg.getNewPayload(myFhirContext));
IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext); RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType();
for (ActiveSubscription activeSubscription : topicSubscriptions) { return mySubscriptionTopicDispatcher.dispatch(topicUrl, matchedResource, restOperationType, theInMemoryMatchResult, theMsg.getPartitionId(), theMsg.getTransactionId());
// WIP STR5 apply subscription filters
IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(matchedResource, theMsg, activeSubscription, topic);
// WIP STR5 do we need to add a total? If so can do that with R5BundleFactory
bundlePayload.setId(UUID.randomUUID().toString());
mySubscriptionMatchDeliverer.deliverPayload(bundlePayload, theMsg, activeSubscription, result);
}
}
} }
} }

View File

@ -23,7 +23,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.util.BundleBuilder; import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50; import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50;
import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseBundle;
@ -32,8 +32,8 @@ import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Enumerations; import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Reference; import org.hl7.fhir.r5.model.Reference;
import org.hl7.fhir.r5.model.SubscriptionStatus; import org.hl7.fhir.r5.model.SubscriptionStatus;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import java.util.List;
import java.util.UUID; import java.util.UUID;
public class SubscriptionTopicPayloadBuilder { public class SubscriptionTopicPayloadBuilder {
@ -43,12 +43,12 @@ public class SubscriptionTopicPayloadBuilder {
myFhirContext = theFhirContext; myFhirContext = theFhirContext;
} }
public IBaseBundle buildPayload(IBaseResource theMatchedResource, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic) { public IBaseBundle buildPayload(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, RestOperationTypeEnum theRestOperationType) {
BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext); BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext);
// WIP STR5 set eventsSinceSubscriptionStart from the database // WIP STR5 set eventsSinceSubscriptionStart from the database
int eventsSinceSubscriptionStart = 1; int eventsSinceSubscriptionStart = 1;
IBaseResource subscriptionStatus = buildSubscriptionStatus(theMatchedResource, theActiveSubscription, theTopic, eventsSinceSubscriptionStart); IBaseResource subscriptionStatus = buildSubscriptionStatus(theResources, theActiveSubscription, theTopicUrl, eventsSinceSubscriptionStart);
FhirVersionEnum fhirVersion = myFhirContext.getVersion().getVersion(); FhirVersionEnum fhirVersion = myFhirContext.getVersion().getVersion();
@ -65,21 +65,25 @@ public class SubscriptionTopicPayloadBuilder {
// WIP STR5 is this the right type of entry? see http://hl7.org/fhir/subscriptionstatus-examples.html // WIP STR5 is this the right type of entry? see http://hl7.org/fhir/subscriptionstatus-examples.html
// WIP STR5 Also see http://hl7.org/fhir/R4B/notification-full-resource.json.html need to conform to these // WIP STR5 Also see http://hl7.org/fhir/R4B/notification-full-resource.json.html need to conform to these
bundleBuilder.addCollectionEntry(subscriptionStatus); bundleBuilder.addCollectionEntry(subscriptionStatus);
switch (theMsg.getOperationType()) { for (IBaseResource resource : theResources) {
switch(theRestOperationType) {
case CREATE: case CREATE:
bundleBuilder.addTransactionCreateEntry(theMatchedResource); bundleBuilder.addTransactionCreateEntry(resource);
break; break;
case UPDATE: case UPDATE:
bundleBuilder.addTransactionUpdateEntry(theMatchedResource); bundleBuilder.addTransactionUpdateEntry(resource);
break; break;
case DELETE: case DELETE:
bundleBuilder.addTransactionDeleteEntry(theMatchedResource); bundleBuilder.addTransactionDeleteEntry(resource);
break; break;
} }
}
return bundleBuilder.getBundle(); return bundleBuilder.getBundle();
} }
private SubscriptionStatus buildSubscriptionStatus(IBaseResource theMatchedResource, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic, int theEventsSinceSubscriptionStart) {
private SubscriptionStatus buildSubscriptionStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, int theEventsSinceSubscriptionStart) {
SubscriptionStatus subscriptionStatus = new SubscriptionStatus(); SubscriptionStatus subscriptionStatus = new SubscriptionStatus();
subscriptionStatus.setId(UUID.randomUUID().toString()); subscriptionStatus.setId(UUID.randomUUID().toString());
subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE); subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
@ -87,9 +91,14 @@ public class SubscriptionTopicPayloadBuilder {
// WIP STR5 count events since subscription start and set eventsSinceSubscriptionStart // WIP STR5 count events since subscription start and set eventsSinceSubscriptionStart
// store counts by subscription id // store counts by subscription id
subscriptionStatus.setEventsSinceSubscriptionStart(theEventsSinceSubscriptionStart); subscriptionStatus.setEventsSinceSubscriptionStart(theEventsSinceSubscriptionStart);
subscriptionStatus.addNotificationEvent().setEventNumber(theEventsSinceSubscriptionStart).setFocus(new Reference(theMatchedResource.getIdElement())); SubscriptionStatus.SubscriptionStatusNotificationEventComponent event = subscriptionStatus.addNotificationEvent();
event.setEventNumber(theEventsSinceSubscriptionStart);
if (theResources.size() > 0) {
event.setFocus(new Reference(theResources.get(0).getIdElement()));
}
subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext))); subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext)));
subscriptionStatus.setTopic(theTopic.getUrl()); subscriptionStatus.setTopic(theTopicUrl);
return subscriptionStatus; return subscriptionStatus;
} }
} }

View File

@ -3,13 +3,11 @@ package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.BundleUtil; import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.r4b.model.Bundle; import org.hl7.fhir.r4b.model.Bundle;
import org.hl7.fhir.r4b.model.Encounter; import org.hl7.fhir.r4b.model.Encounter;
import org.hl7.fhir.r4b.model.Resource; import org.hl7.fhir.r4b.model.Resource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
@ -17,6 +15,7 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
class SubscriptionTopicPayloadBuilderR4BTest { class SubscriptionTopicPayloadBuilderR4BTest {
private static final String TEST_TOPIC_URL = "test-builder-topic-url";
FhirContext ourFhirContext = FhirContext.forR4BCached(); FhirContext ourFhirContext = FhirContext.forR4BCached();
@Test @Test
public void testBuildPayloadDelete() { public void testBuildPayloadDelete() {
@ -24,14 +23,11 @@ class SubscriptionTopicPayloadBuilderR4BTest {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.DELETE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
@ -47,14 +43,11 @@ class SubscriptionTopicPayloadBuilderR4BTest {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.UPDATE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
@ -71,14 +64,11 @@ class SubscriptionTopicPayloadBuilderR4BTest {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.CREATE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);

View File

@ -3,13 +3,11 @@ package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.BundleUtil; import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.r5.model.Bundle; import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Encounter; import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.Resource; import org.hl7.fhir.r5.model.Resource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
@ -17,6 +15,7 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
class SubscriptionTopicPayloadBuilderR5Test { class SubscriptionTopicPayloadBuilderR5Test {
private static final String TEST_TOPIC_URL = "test-builder-topic-url";
FhirContext ourFhirContext = FhirContext.forR5Cached(); FhirContext ourFhirContext = FhirContext.forR5Cached();
@Test @Test
public void testBuildPayloadDelete() { public void testBuildPayloadDelete() {
@ -24,14 +23,11 @@ class SubscriptionTopicPayloadBuilderR5Test {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.DELETE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
@ -47,14 +43,11 @@ class SubscriptionTopicPayloadBuilderR5Test {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.UPDATE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
@ -71,14 +64,11 @@ class SubscriptionTopicPayloadBuilderR5Test {
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext); var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter(); var encounter = new Encounter();
encounter.setId("Encounter/1"); encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription(); CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test"); ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE);
// run // run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic); Bundle payload = (Bundle)svc.buildPayload(List.of(encounter), subscription, TEST_TOPIC_URL, RestOperationTypeEnum.CREATE);
// verify // verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class); List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);

View File

@ -20,7 +20,7 @@
package ca.uhn.fhir.rest.server.messaging; package ca.uhn.fhir.rest.server.messaging;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum; import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -146,7 +146,7 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
/** /**
* Adds a transaction ID to this message. This ID can be used for many purposes. For example, performing tracing * Adds a transaction ID to this message. This ID can be used for many purposes. For example, performing tracing
* across asynchronous hooks, tying data together, or downstream logging purposes. * across asynchronous hooks, tying data together, or downstream logging purposes.
* * <p>
* One current internal implementation uses this field to tie back MDM processing results (which are asynchronous) * One current internal implementation uses this field to tie back MDM processing results (which are asynchronous)
* to the original transaction log that caused the MDM processing to occur. * to the original transaction log that caused the MDM processing to occur.
* *
@ -195,7 +195,7 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
* @return null by default * @return null by default
*/ */
@Nullable @Nullable
protected String getMessageKeyDefaultValue(){ protected String getMessageKeyDefaultValue() {
return null; return null;
} }
@ -212,6 +212,19 @@ public abstract class BaseResourceMessage implements IResourceMessage, IModelJso
myRestOperationTypeEnum = theRestOperationTypeEnum; myRestOperationTypeEnum = theRestOperationTypeEnum;
} }
public static OperationTypeEnum from(RestOperationTypeEnum theRestOperationType) {
switch (theRestOperationType) {
case CREATE:
return CREATE;
case UPDATE:
return UPDATE;
case DELETE:
return DELETE;
default:
throw new IllegalArgumentException(Msg.code(2348) + "Unsupported operation type: " + theRestOperationType);
}
}
public RestOperationTypeEnum asRestOperationType() { public RestOperationTypeEnum asRestOperationType() {
return myRestOperationTypeEnum; return myRestOperationTypeEnum;
} }