R4B SubscriptionTopic support (#4724)

* add tests for r4b subscriptions

* begin with failing test

* prepare for SubscriptionTopicLoader

* backwards compatibility

* subscription topic registry done

* topic matching is working

* all but delivery is working now

* yay test passes with FIXMEs

* FIXME -> WIP

* switch notification to bundle

* message codes

* fixme

* disable services for fhir versions below R4B

* fix regression

* fix intermittent

* this change will likely break some other tests

* try a safer option

* fix tests

* fix intermittent (I hope)

* unit test

* improve logic around topic subscription categorization

* moar test

* moar test

* changed to support both r4b and r5

* moar test

* cleanup for test

* moar test

* moar test

* moar test

* changelog

* comment

* Msg.code

* fix mock

* add update test

* fix test cleanup

* tracking link for version converter issue

* review

* fix test

* fix test

---------

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-04-11 20:44:08 -04:00 committed by GitHub
parent 450ccb5599
commit b88ccbc7b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 3747 additions and 318 deletions

View File

@ -1031,6 +1031,43 @@ public enum Pointcut implements IPointcut {
"org.hl7.fhir.instance.model.api.IBaseResource"
),
/**
* <b>Subscription Topic Hook:</b>
* Invoked whenever a persisted resource (a resource that has just been stored in the
* database via a create/update/patch/etc.) is about to be checked for whether any subscription topics
* were triggered as a result of the operation.
* <p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage - Hooks may modify this parameter. This will affect the checking process.</li>
* </ul>
* </p>
* <p>
* Hooks may return <code>void</code> or may return a <code>boolean</code>. If the method returns
* <code>void</code> or <code>true</code>, processing will continue normally. If the method
* returns <code>false</code>, processing will be aborted.
* </p>
*/
SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED(boolean.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage"),
/**
* <b>Subscription Topic Hook:</b>
* Invoked whenever a persisted resource (a resource that has just been stored in the
* database via a create/update/patch/etc.) has been checked for whether any subscription topics
* were triggered as a result of the operation.
* <p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage - This parameter should not be modified as processing is complete when this hook is invoked.</li>
* </ul>
* </p>
* <p>
* Hooks should return <code>void</code>.
* </p>
*/
SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage"),
/**
* <b>Storage Hook:</b>

View File

@ -0,0 +1,5 @@
---
type: add
issue: 4724
title: "Preliminary support for R5 and R4B SubscriptionTopic matching has been added. This is not yet complete, but the
simplest use cases now work. Comments in the code with prefix 'WIP STR5' indicate areas that need to be extended."

View File

@ -24,13 +24,13 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.api.MdmConstants;
import ca.uhn.fhir.mdm.log.Logs;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.HapiExtensions;
@ -88,7 +88,7 @@ public class MdmSubscriptionLoader {
}
//After loading all the subscriptions, sync the subscriptions to the registry.
if (subscriptions != null && subscriptions.size() > 0) {
mySubscriptionLoader.syncSubscriptions();
mySubscriptionLoader.syncDatabaseToCache();
}
}

View File

@ -19,6 +19,8 @@
*/
package ca.uhn.fhir.jpa.searchparam.matcher;
import java.util.List;
public class InMemoryMatchResult {
public static final String PARSE_FAIL = "Failed to translate parse query string";
public static final String STANDARD_PARAMETER = "Standard parameters not supported";
@ -76,6 +78,10 @@ public class InMemoryMatchResult {
return new InMemoryMatchResult(theUnsupportedParameter, theUnsupportedReason);
}
public static InMemoryMatchResult noMatch() {
return new InMemoryMatchResult(false);
}
public boolean supported() {
return mySupported;
}
@ -98,4 +104,43 @@ public class InMemoryMatchResult {
public void setInMemory(boolean theInMemory) {
myInMemory = theInMemory;
}
public static InMemoryMatchResult and(InMemoryMatchResult theLeft, InMemoryMatchResult theRight) {
if (theLeft == null) {
return theRight;
}
if (theRight == null) {
return theLeft;
}
if (theLeft.supported() && theRight.supported()) {
return InMemoryMatchResult.fromBoolean(theLeft.matched() && theRight.matched());
}
if (!theLeft.supported() && !theRight.supported()) {
return InMemoryMatchResult.unsupportedFromReason(List.of(theLeft.getUnsupportedReason(), theRight.getUnsupportedReason()).toString());
}
if (!theLeft.supported()) {
return theLeft;
}
return theRight;
}
public static InMemoryMatchResult or(InMemoryMatchResult theLeft, InMemoryMatchResult theRight) {
if (theLeft == null) {
return theRight;
}
if (theRight == null) {
return theLeft;
}
if (theLeft.matched() || theRight.matched()) {
return InMemoryMatchResult.successfulMatch();
}
if (!theLeft.supported() && !theRight.supported()) {
return InMemoryMatchResult.unsupportedFromReason(List.of(theLeft.getUnsupportedReason(), theRight.getUnsupportedReason()).toString());
}
if (!theLeft.supported()) {
return theLeft;
}
return theRight;
}
}

View File

@ -0,0 +1,59 @@
package ca.uhn.fhir.jpa.searchparam.matcher;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class InMemoryMatchResultTest {
InMemoryMatchResult success = InMemoryMatchResult.successfulMatch();
InMemoryMatchResult noMatch = InMemoryMatchResult.noMatch();
InMemoryMatchResult unsupported1 = InMemoryMatchResult.unsupportedFromParameterAndReason("param1", "reason1");
InMemoryMatchResult unsupported2 = InMemoryMatchResult.unsupportedFromParameterAndReason("param2", "reason2");
@Test
public void testMergeAnd() {
assertMatch(InMemoryMatchResult.and(success, success));
assertNoMatch(InMemoryMatchResult.and(success, noMatch));
assertNoMatchWithReason(InMemoryMatchResult.and(success, unsupported1), unsupported1.getUnsupportedReason());
assertNoMatch(InMemoryMatchResult.and(noMatch, success));
assertNoMatch(InMemoryMatchResult.and(noMatch, noMatch));
assertNoMatchWithReason(InMemoryMatchResult.and(noMatch, unsupported1), unsupported1.getUnsupportedReason());
assertNoMatchWithReason(InMemoryMatchResult.and(unsupported1, success), unsupported1.getUnsupportedReason());
assertNoMatchWithReason(InMemoryMatchResult.and(unsupported1, noMatch), unsupported1.getUnsupportedReason());
assertNoMatchWithReason(InMemoryMatchResult.and(unsupported1, unsupported2), List.of(unsupported1.getUnsupportedReason(), unsupported2.getUnsupportedReason()).toString());
}
@Test
public void testMergeOr() {
assertMatch(InMemoryMatchResult.or(success, success));
assertMatch(InMemoryMatchResult.or(success, noMatch));
assertMatch(InMemoryMatchResult.or(success, unsupported1));
assertMatch(InMemoryMatchResult.or(noMatch, success));
assertNoMatch(InMemoryMatchResult.or(noMatch, noMatch));
assertNoMatchWithReason(InMemoryMatchResult.or(noMatch, unsupported1), unsupported1.getUnsupportedReason());
assertMatch(InMemoryMatchResult.or(unsupported1, success));
assertNoMatchWithReason(InMemoryMatchResult.or(unsupported1, noMatch), unsupported1.getUnsupportedReason());
assertNoMatchWithReason(InMemoryMatchResult.or(unsupported1, unsupported2), List.of(unsupported1.getUnsupportedReason(), unsupported2.getUnsupportedReason()).toString());
}
private void assertNoMatchWithReason(InMemoryMatchResult theMerged, String theExpectedUnsupportedReason) {
assertNoMatch(theMerged);
assertEquals(theExpectedUnsupportedReason, theMerged.getUnsupportedReason());
}
private void assertMatch(InMemoryMatchResult theMerged) {
assertTrue(theMerged.matched());
}
private void assertNoMatch(InMemoryMatchResult theMerged) {
assertFalse(theMerged.matched());
}
}

View File

@ -128,7 +128,7 @@
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
</dependencies>
</dependencies>
<build>
<pluginManagement>
<plugins>

View File

@ -38,6 +38,7 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegiste
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
@ -47,7 +48,7 @@ import org.springframework.context.annotation.Scope;
* This Spring config should be imported by a system that pulls messages off of the
* matching queue for processing, and handles delivery
*/
@Import(SubscriptionModelConfig.class)
@Import({SubscriptionModelConfig.class, SubscriptionTopicConfig.class})
public class SubscriptionProcessorConfig {
@Bean

View File

@ -26,13 +26,12 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IHttpClient;
@ -70,9 +69,6 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private MatchUrlService myMatchUrlService;
/**
* Constructor
*/
@ -90,7 +86,9 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient, IBaseResource thePayloadResource) {
IClientExecutable<?, ?> operation;
if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
if (theSubscription.isTopicSubscription()) {
operation = createDeliveryRequestTopic((IBaseBundle) theMsg.getPayload(myFhirContext), theClient, thePayloadResource);
} else if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
operation = createDeliveryRequestTransaction(theSubscription, theClient, thePayloadResource);
} else if (thePayloadType != null) {
operation = createDeliveryRequestNormal(theMsg, theClient, thePayloadResource);
@ -143,6 +141,10 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
return theClient.transaction().withBundle(bundle);
}
private IClientExecutable<?, ?> createDeliveryRequestTopic(IBaseBundle theBundle, IGenericClient theClient, IBaseResource thePayloadResource) {
return theClient.transaction().withBundle(theBundle);
}
public IBaseResource getResource(IIdType payloadId, RequestPartitionId thePartitionId, boolean theDeletedOK) throws ResourceGoneException {
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(payloadId.getResourceType());
SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(thePartitionId);

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import org.springframework.beans.factory.annotation.Autowired;
public class SubscriptionStrategyEvaluator {
@ -36,18 +37,28 @@ public class SubscriptionStrategyEvaluator {
super();
}
public SubscriptionMatchingStrategy determineStrategy(String theCriteria) {
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(theCriteria);
if (criteria != null) {
if (criteria.getCriteria() != null) {
InMemoryMatchResult result = myInMemoryResourceMatcher.canBeEvaluatedInMemory(theCriteria);
if (result.supported()) {
return SubscriptionMatchingStrategy.IN_MEMORY;
}
} else {
public SubscriptionMatchingStrategy determineStrategy(CanonicalSubscription theSubscription) {
if (theSubscription.isTopicSubscription()) {
return SubscriptionMatchingStrategy.TOPIC;
}
String criteriaString = theSubscription.getCriteriaString();
return determineStrategy(criteriaString);
}
public SubscriptionMatchingStrategy determineStrategy(String criteriaString) {
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(criteriaString);
if (criteria == null) {
return SubscriptionMatchingStrategy.DATABASE;
}
if (criteria.getCriteria() == null) {
return SubscriptionMatchingStrategy.IN_MEMORY;
} else {
InMemoryMatchResult result = myInMemoryResourceMatcher.canBeEvaluatedInMemory(criteriaString);
if (result.supported()) {
return SubscriptionMatchingStrategy.IN_MEMORY;
} else {
return SubscriptionMatchingStrategy.DATABASE;
}
}
return SubscriptionMatchingStrategy.DATABASE;
}
}

View File

@ -20,10 +20,13 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.IHapiBootOrder;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicMatchingSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -39,8 +42,12 @@ public class MatchingQueueSubscriberLoader {
protected IChannelReceiver myMatchingChannel;
private static final Logger ourLog = LoggerFactory.getLogger(MatchingQueueSubscriberLoader.class);
@Autowired
FhirContext myFhirContext;
@Autowired
private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
@Autowired
private SubscriptionTopicMatchingSubscriber mySubscriptionTopicMatchingSubscriber;
@Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@Autowired
private SubscriptionRegisteringSubscriber mySubscriptionRegisteringSubscriber;
@ -60,6 +67,10 @@ public class MatchingQueueSubscriberLoader {
myMatchingChannel.subscribe(mySubscriptionActivatingSubscriber);
myMatchingChannel.subscribe(mySubscriptionRegisteringSubscriber);
ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
if (myFhirContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.R4B)) {
ourLog.info("Starting SubscriptionTopic Matching Subscriber");
myMatchingChannel.subscribe(mySubscriptionTopicMatchingSubscriber);
}
}
}

View File

@ -0,0 +1,96 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class SubscriptionMatchDeliverer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchDeliverer.class);
private final FhirContext myFhirContext;
private final IInterceptorBroadcaster myInterceptorBroadcaster;
private final SubscriptionChannelRegistry mySubscriptionChannelRegistry;
public SubscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
myFhirContext = theFhirContext;
myInterceptorBroadcaster = theInterceptorBroadcaster;
mySubscriptionChannelRegistry = theSubscriptionChannelRegistry;
}
public boolean deliverPayload(IBaseResource thePayload, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, InMemoryMatchResult matchResult) {
EncodingEnum encoding = null;
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
String subscriptionId = theActiveSubscription.getId();;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
if (thePayload != null) {
deliveryMsg.setPayload(myFhirContext, thePayload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", subscriptionId);
return false;
}
return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
}
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retVal = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName());
if (deliveryChannel != null) {
retVal = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
} else {
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
}
return retVal;
}
private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {
try {
boolean success = theDeliveryChannel.send(theWrappedMsg);
if (!success) {
ourLog.warn("Failed to send message to Delivery Channel.");
}
} catch (RuntimeException e) {
ourLog.error("Failed to send message to Delivery Channel", e);
throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e);
}
}
}

View File

@ -20,28 +20,22 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@ -49,7 +43,6 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionMatchingSubscriber implements MessageHandler {
@ -65,7 +58,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
private SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
/**
* Constructor
@ -147,7 +140,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
!theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
return false;
}
String nextSubscriptionId = getId(theActiveSubscription);
String nextSubscriptionId = theActiveSubscription.getId();
if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
@ -191,71 +184,13 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
}
IBaseResource payload = theMsg.getNewPayload(myFhirContext);
EncodingEnum encoding = null;
if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString());
}
encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
if (payload != null) {
deliveryMsg.setPayload(myFhirContext, payload, encoding);
} else {
deliveryMsg.setPayloadId(theMsg.getPayloadId(myFhirContext));
}
deliveryMsg.setSubscription(subscription);
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setTransactionId(theMsg.getTransactionId());
deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
// Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
HookParams params = new HookParams()
.add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
.add(ResourceDeliveryMessage.class, deliveryMsg)
.add(InMemoryMatchResult.class, matchResult);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
ourLog.info("Interceptor has decided to abort processing of subscription {}", nextSubscriptionId);
return false;
}
return sendToDeliveryChannel(theActiveSubscription, deliveryMsg);
return mySubscriptionMatchDeliverer.deliverPayload(payload, theMsg, theActiveSubscription, matchResult);
}
private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
boolean retVal = false;
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName());
if (deliveryChannel != null) {
retVal = true;
trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
} else {
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
}
return retVal;
}
private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {
try {
boolean success = theDeliveryChannel.send(theWrappedMsg);
if (!success) {
ourLog.warn("Failed to send message to Delivery Channel.");
}
} catch (RuntimeException e) {
ourLog.error("Failed to send message to Delivery Channel", e);
throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e);
}
}
private String getId(ActiveSubscription theActiveSubscription) {
return theActiveSubscription.getId();
}
private boolean resourceTypeIsAppropriateForSubscription(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
SubscriptionCriteriaParser.SubscriptionCriteria criteria = theActiveSubscription.getCriteria();
String subscriptionId = getId(theActiveSubscription);
String subscriptionId = theActiveSubscription.getId();
String resourceType = theResourceId.getResourceType();
// see if the criteria matches the created object

View File

@ -23,8 +23,15 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
class ActiveSubscriptionCache {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscriptionCache.class);
@ -77,4 +84,12 @@ class ActiveSubscriptionCache {
}
return retval;
}
public List<ActiveSubscription> getTopicSubscriptionsForUrl(String theUrl) {
assert !isBlank(theUrl);
return getAll().stream()
.filter(as -> as.getSubscription().isTopicSubscription())
.filter(as -> theUrl.equals(as.getSubscription().getCriteriaString()))
.collect(Collectors.toList());
}
}

View File

@ -19,173 +19,50 @@
*/
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.cache.BaseResourceCacheSynchronizer;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
public class SubscriptionLoader implements IResourceChangeListener {
public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
private static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
private final Object mySyncSubscriptionsLock = new Object();
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
DaoRegistry myDaoRegistry;
private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
@Autowired
private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
private SearchParameterMap mySearchParameterMap;
private SystemRequestDetails mySystemRequestDetails;
private boolean myStopping;
/**
* Constructor
*/
public SubscriptionLoader() {
super();
super("Subscription");
}
@PostConstruct
public void registerListener() {
mySearchParameterMap = getSearchParameterMap();
mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
IResourceChangeListenerCache subscriptionCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("Subscription", mySearchParameterMap, this, REFRESH_INTERVAL);
subscriptionCache.forceRefresh();
}
@PreDestroy
public void unregisterListener() {
myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
}
private boolean subscriptionsDaoExists() {
return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported("Subscription");
}
/**
* Read the existing subscriptions from the database
*/
public void syncSubscriptions() {
if (!subscriptionsDaoExists()) {
return;
}
if (!mySyncSubscriptionsSemaphore.tryAcquire()) {
return;
}
try {
doSyncSubscriptionsWithRetry();
} finally {
mySyncSubscriptionsSemaphore.release();
}
}
@VisibleForTesting
public void acquireSemaphoreForUnitTest() throws InterruptedException {
mySyncSubscriptionsSemaphore.acquire();
}
@VisibleForTesting
public int doSyncSubscriptionsForUnitTest() {
// Two passes for delete flag to take effect
int first = doSyncSubscriptionsWithRetry();
int second = doSyncSubscriptionsWithRetry();
return first + second;
}
synchronized int doSyncSubscriptionsWithRetry() {
// retry runs MAX_RETRIES times
// and if errors result every time, it will fail
Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES);
return syncSubscriptionRetrier.runWithRetry();
}
private int doSyncSubscriptions() {
if (isStopping()) {
return 0;
}
synchronized (mySyncSubscriptionsLock) {
ourLog.debug("Starting sync subscriptions");
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails);
Integer subscriptionCount = subscriptionBundleList.size();
assert subscriptionCount != null;
if (subscriptionCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) {
ourLog.error("Currently over " + SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
}
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount);
return updateSubscriptionRegistry(resourceList);
}
}
@EventListener(ContextRefreshedEvent.class)
public void start() {
myStopping = false;
}
@EventListener(ContextClosedEvent.class)
public void shutdown() {
myStopping = true;
}
private boolean isStopping() {
return myStopping;
}
private IFhirResourceDao<?> getSubscriptionDao() {
return myDaoRegistry.getSubscriptionDao();
return super.doSyncResourcessForUnitTest();
}
@Override
@Nonnull
private SearchParameterMap getSearchParameterMap() {
protected SearchParameterMap getSearchParameterMap() {
SearchParameterMap map = new SearchParameterMap();
if (mySearchParamRegistry.getActiveSearchParam("Subscription", "status") != null) {
@ -197,6 +74,16 @@ public class SubscriptionLoader implements IResourceChangeListener {
return map;
}
@Override
protected void handleInit(List<IBaseResource> resourceList) {
updateSubscriptionRegistry(resourceList);
}
@Override
protected int syncResourcesIntoCache(List<IBaseResource> resourceList) {
return updateSubscriptionRegistry(resourceList);
}
private int updateSubscriptionRegistry(List<IBaseResource> theResourceList) {
Set<String> allIds = new HashSet<>();
int activatedCount = 0;
@ -271,23 +158,8 @@ public class SubscriptionLoader implements IResourceChangeListener {
);
}
@Override
public void handleInit(Collection<IIdType> theResourceIds) {
if (!subscriptionsDaoExists()) {
ourLog.warn("Subsriptions are enabled on this server, but there is no Subscription DAO configured.");
return;
}
IFhirResourceDao<?> subscriptionDao = getSubscriptionDao();
SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
List<IBaseResource> resourceList = theResourceIds.stream().map(n -> subscriptionDao.read(n, systemRequestDetails)).collect(Collectors.toList());
updateSubscriptionRegistry(resourceList);
}
@Override
public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
// For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based on
// known subscriptions that have been created, updated & deleted
syncSubscriptions();
public void syncSubscriptions() {
super.syncDatabaseToCache();
}
}

View File

@ -74,6 +74,10 @@ public class SubscriptionRegistry {
return myActiveSubscriptionCache.getAll();
}
public synchronized List<ActiveSubscription> getTopicSubscriptionsByUrl(String theUrl) {
return myActiveSubscriptionCache.getTopicSubscriptionsForUrl(theUrl);
}
private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());

View File

@ -178,6 +178,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@VisibleForTesting
public LinkedBlockingChannel getProcessingChannelForUnitTest() {
startIfNeeded();
return (LinkedBlockingChannel) myMatchingChannel;
}
}

View File

@ -27,9 +27,10 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
@ -38,7 +39,10 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -46,10 +50,12 @@ import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.SubscriptionUtil;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Interceptor
@ -139,16 +145,23 @@ public class SubscriptionValidatingInterceptor {
if (!finished) {
validateQuery(subscription.getCriteriaString(), "Subscription.criteria");
if (subscription.isTopicSubscription()) {
Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(subscription.getCriteriaString());
if (!oTopic.isPresent()) {
throw new UnprocessableEntityException(Msg.code(2322) + "No SubscriptionTopic exists with url: " + subscription.getCriteriaString());
}
} else {
validateQuery(subscription.getCriteriaString(), "Subscription.criteria");
if (subscription.getPayloadSearchCriteria() != null) {
validateQuery(subscription.getPayloadSearchCriteria(), "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA + "')");
if (subscription.getPayloadSearchCriteria() != null) {
validateQuery(subscription.getPayloadSearchCriteria(), "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA + "')");
}
}
validateChannelType(subscription);
try {
SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription.getCriteriaString());
SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription);
mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy);
} catch (InvalidRequestException | DataFormatException e) {
throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: " + subscription.getCriteriaString() + " " + e.getMessage());
@ -239,6 +252,15 @@ public class SubscriptionValidatingInterceptor {
}
private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) {
myDaoRegistry.getResourceDao("SubscriptionTopic");
SearchParameterMap map = SearchParameterMap.newSynchronous();
map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria));
IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic");
IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails());
return search.getResources(0, 1).stream().findFirst();
}
public void validateMessageSubscriptionEndpoint(String theEndpointUrl) {
if (theEndpointUrl == null) {
throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription");

View File

@ -0,0 +1,47 @@
package ca.uhn.fhir.jpa.topic;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ActiveSubscriptionTopicCache {
// We canonicalize on R5 SubscriptionTopic and convert back to R4B when necessary
private final Map<String, SubscriptionTopic> myCache = new ConcurrentHashMap<>();
public int size() {
return myCache.size();
}
/**
* @return true if the subscription topic was added, false if it was already present
*/
public boolean add(SubscriptionTopic theSubscriptionTopic) {
String key = theSubscriptionTopic.getIdElement().getIdPart();
SubscriptionTopic previousValue = myCache.put(key, theSubscriptionTopic);
return previousValue == null;
}
/**
* @return the number of entries removed
*/
public int removeIdsNotInCollection(Set<String> theIdsToRetain) {
int retval = 0;
HashSet<String> safeCopy = new HashSet<>(myCache.keySet());
for (String next : safeCopy) {
if (!theIdsToRetain.contains(next)) {
myCache.remove(next);
++retval;
}
}
return retval;
}
public Collection<SubscriptionTopic> getAll() {
return myCache.values();
}
}

View File

@ -0,0 +1,41 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import org.springframework.context.annotation.Bean;
public class SubscriptionTopicConfig {
@Bean
public SubscriptionMatchDeliverer subscriptionMatchDeliverer(FhirContext theFhirContext, IInterceptorBroadcaster theInterceptorBroadcaster, SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
return new SubscriptionMatchDeliverer(theFhirContext, theInterceptorBroadcaster, theSubscriptionChannelRegistry);
}
@Bean
public SubscriptionTopicMatchingSubscriber subscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
return new SubscriptionTopicMatchingSubscriber(theFhirContext);
}
@Bean
public SubscriptionTopicPayloadBuilder subscriptionTopicPayloadBuilder(FhirContext theFhirContext) {
return new SubscriptionTopicPayloadBuilder(theFhirContext);
}
@Bean
public SubscriptionTopicRegistry subscriptionTopicRegistry() {
return new SubscriptionTopicRegistry();
}
@Bean
public SubscriptionTopicSupport subscriptionTopicSupport(FhirContext theFhirContext, DaoRegistry theDaoRegistry, SearchParamMatcher theSearchParamMatcher) {
return new SubscriptionTopicSupport(theFhirContext, theDaoRegistry, theSearchParamMatcher);
}
@Bean
public SubscriptionTopicLoader subscriptionTopicLoader() {
return new SubscriptionTopicLoader();
}
}

View File

@ -0,0 +1,119 @@
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.cache.BaseResourceCacheSynchronizer;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.param.TokenParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SubscriptionTopicLoader extends BaseResourceCacheSynchronizer {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicLoader.class);
@Autowired
private FhirContext myFhirContext;
@Autowired
private SubscriptionTopicRegistry mySubscriptionTopicRegistry;
/**
* Constructor
*/
public SubscriptionTopicLoader() {
super("SubscriptionTopic");
}
@Override
public void registerListener() {
if (!myFhirContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.R4B)) {
return;
}
super.registerListener();
}
@Override
@Nonnull
protected SearchParameterMap getSearchParameterMap() {
SearchParameterMap map = new SearchParameterMap();
if (mySearchParamRegistry.getActiveSearchParam("SubscriptionTopic", "status") != null) {
map.add(SubscriptionTopic.SP_STATUS, new TokenParam(null, Enumerations.PublicationStatus.ACTIVE.toCode()));
}
map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
return map;
}
@Override
protected void handleInit(List<IBaseResource> resourceList) {
updateSubscriptionTopicRegistry(resourceList);
}
@Override
protected int syncResourcesIntoCache(List<IBaseResource> resourceList) {
return updateSubscriptionTopicRegistry(resourceList);
}
private int updateSubscriptionTopicRegistry(List<IBaseResource> theResourceList) {
Set<String> allIds = new HashSet<>();
int registeredCount = 0;
for (IBaseResource resource : theResourceList) {
String nextId = resource.getIdElement().getIdPart();
allIds.add(nextId);
boolean registered = mySubscriptionTopicRegistry.register(normalizeToR5(resource));
if (registered) {
registeredCount++;
}
}
mySubscriptionTopicRegistry.unregisterAllIdsNotInCollection(allIds);
ourLog.debug("Finished sync subscription topics - registered {}", registeredCount);
return registeredCount;
}
private SubscriptionTopic normalizeToR5(IBaseResource theResource) {
if (theResource instanceof SubscriptionTopic) {
return (SubscriptionTopic) theResource;
} else if (theResource instanceof org.hl7.fhir.r4b.model.SubscriptionTopic) {
return myFhirContext.newJsonParser().parseResource(SubscriptionTopic.class, FhirContext.forR4BCached().newJsonParser().encodeResourceToString(theResource));
// WIP STR5 VersionConvertorFactory_43_50 when it supports SubscriptionTopic
// track here: https://github.com/hapifhir/org.hl7.fhir.core/issues/1212
// return (SubscriptionTopic) VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r4b.model.SubscriptionTopic) theResource);
} else {
throw new IllegalArgumentException(Msg.code(2332) + "Only R4B and R5 SubscriptionTopic is currently supported. Found " + theResource.getClass());
}
}
}

View File

@ -0,0 +1,37 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import java.util.List;
public class SubscriptionTopicMatcher {
private final SubscriptionTopicSupport mySubscriptionTopicSupport;
private final SubscriptionTopic myTopic;
public SubscriptionTopicMatcher(SubscriptionTopicSupport theSubscriptionTopicSupport, SubscriptionTopic theTopic) {
mySubscriptionTopicSupport = theSubscriptionTopicSupport;
myTopic = theTopic;
}
public InMemoryMatchResult match(ResourceModifiedMessage theMsg) {
IBaseResource resource = theMsg.getPayload(mySubscriptionTopicSupport.getFhirContext());
String resourceName = resource.fhirType();
List<SubscriptionTopic.SubscriptionTopicResourceTriggerComponent> triggers = myTopic.getResourceTrigger();
for (SubscriptionTopic.SubscriptionTopicResourceTriggerComponent next : triggers) {
if (resourceName.equals(next.getResource())) {
SubscriptionTriggerMatcher matcher = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, theMsg, next);
InMemoryMatchResult result = matcher.match();
if (result.matched()) {
return result;
}
// WIP STR5 should we check the other triggers?
}
}
// WIP STR5 add support for event triggers
return InMemoryMatchResult.noMatch();
}
}

View File

@ -0,0 +1,97 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicMatchingSubscriber.class);
private final FhirContext myFhirContext;
@Autowired
SubscriptionTopicSupport mySubscriptionTopicSupport;
@Autowired
SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@Autowired
SubscriptionRegistry mySubscriptionRegistry;
@Autowired
SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
@Autowired
SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder;
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}
@Override
public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
ourLog.trace("Handling resource modified message: {}", theMessage);
if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
ourLog.warn("Unexpected message payload type: {}", theMessage);
return;
}
ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
// Interceptor call: SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED
HookParams params = new HookParams()
.add(ResourceModifiedMessage.class, msg);
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
return;
}
try {
matchActiveSubscriptionTopicsAndDeliver(msg);
} finally {
// Interceptor call: SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, params);
}
}
private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage theMsg) {
Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll();
for (SubscriptionTopic topic : topics) {
SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic);
InMemoryMatchResult result = matcher.match(theMsg);
if (result.matched()) {
ourLog.info("Matched topic {} to message {}", topic.getIdElement().toUnqualifiedVersionless(), theMsg);
deliverToTopicSubscriptions(theMsg, topic, result);
}
}
}
private void deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic topic, InMemoryMatchResult result) {
List<ActiveSubscription> topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByUrl(topic.getUrl());
if (!topicSubscriptions.isEmpty()) {
IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext);
for (ActiveSubscription activeSubscription : topicSubscriptions) {
// WIP STR5 apply subscription filter
IBaseResource payload = mySubscriptionTopicPayloadBuilder.buildPayload(matchedResource, theMsg, activeSubscription, topic);
mySubscriptionMatchDeliverer.deliverPayload(payload, theMsg, activeSubscription, result);
}
}
}
}

View File

@ -0,0 +1,71 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Reference;
import org.hl7.fhir.r5.model.SubscriptionStatus;
import org.hl7.fhir.r5.model.SubscriptionTopic;
public class SubscriptionTopicPayloadBuilder {
private final FhirContext myFhirContext;
public SubscriptionTopicPayloadBuilder(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}
public IBaseResource buildPayload(IBaseResource theMatchedResource, ResourceModifiedMessage theMsg, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic) {
BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext);
// WIP STR5 set eventsSinceSubscriptionStart from the database
int eventsSinceSubscriptionStart = 1;
IBaseResource subscriptionStatus = buildSubscriptionStatus(theMatchedResource, theActiveSubscription, theTopic, eventsSinceSubscriptionStart);
FhirVersionEnum fhirVersion = myFhirContext.getVersion().getVersion();
if (fhirVersion == FhirVersionEnum.R4B) {
bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode());
String serializedSubscriptionStatus = FhirContext.forR5Cached().newJsonParser().encodeResourceToString(subscriptionStatus);
subscriptionStatus = myFhirContext.newJsonParser().parseResource(org.hl7.fhir.r4b.model.SubscriptionStatus.class, serializedSubscriptionStatus);
// WIP STR5 VersionConvertorFactory_43_50 when it supports SubscriptionStatus
// track here: https://github.com/hapifhir/org.hl7.fhir.core/issues/1212
// subscriptionStatus = (SubscriptionStatus) VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r4b.model.SubscriptionStatus) subscriptionStatus);
} else if (fhirVersion == FhirVersionEnum.R5) {
bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode());
} else {
throw new IllegalStateException(Msg.code(2331) + "SubscriptionTopic subscriptions are not supported on FHIR version: " + fhirVersion);
}
// WIP STR5 is this the right type of entry?
bundleBuilder.addCollectionEntry(subscriptionStatus);
switch (theMsg.getOperationType()) {
case CREATE:
bundleBuilder.addTransactionCreateEntry(theMatchedResource);
break;
case UPDATE:
bundleBuilder.addTransactionUpdateEntry(theMatchedResource);
break;
case DELETE:
bundleBuilder.addTransactionDeleteEntry(theMatchedResource);
break;
}
return bundleBuilder.getBundle();
}
private SubscriptionStatus buildSubscriptionStatus(IBaseResource theMatchedResource, ActiveSubscription theActiveSubscription, SubscriptionTopic theTopic, int theEventsSinceSubscriptionStart) {
SubscriptionStatus subscriptionStatus = new SubscriptionStatus();
subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscriptionStatus.setType(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION);
// WIP STR5 count events since subscription start and set eventsSinceSubscriptionStart
subscriptionStatus.setEventsSinceSubscriptionStart(theEventsSinceSubscriptionStart);
subscriptionStatus.addNotificationEvent().setEventNumber(theEventsSinceSubscriptionStart).setFocus(new Reference(theMatchedResource.getIdElement()));
subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext)));
subscriptionStatus.setTopic(theTopic.getUrl());
return subscriptionStatus;
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.jpa.topic;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import java.util.Collection;
import java.util.Set;
public class SubscriptionTopicRegistry {
private final ActiveSubscriptionTopicCache myActiveSubscriptionTopicCache = new ActiveSubscriptionTopicCache();
public SubscriptionTopicRegistry() {
}
public int size() {
return myActiveSubscriptionTopicCache.size();
}
public boolean register(SubscriptionTopic resource) {
return myActiveSubscriptionTopicCache.add(resource);
}
public void unregisterAllIdsNotInCollection(Set<String> theIdsToRetain) {
myActiveSubscriptionTopicCache.removeIdsNotInCollection(theIdsToRetain);
}
public Collection<SubscriptionTopic> getAll() {
return myActiveSubscriptionTopicCache.getAll();
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
public class SubscriptionTopicSupport {
private final FhirContext myFhirContext;
private final DaoRegistry myDaoRegistry;
private final SearchParamMatcher mySearchParamMatcher;
public SubscriptionTopicSupport(FhirContext theFhirContext, DaoRegistry theDaoRegistry, SearchParamMatcher theSearchParamMatcher) {
myFhirContext = theFhirContext;
myDaoRegistry = theDaoRegistry;
mySearchParamMatcher = theSearchParamMatcher;
}
public FhirContext getFhirContext() {
return myFhirContext;
}
public DaoRegistry getDaoRegistry() {
return myDaoRegistry;
}
public SearchParamMatcher getSearchParamMatcher() {
return mySearchParamMatcher;
}
}

View File

@ -0,0 +1,24 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import org.hl7.fhir.r5.model.Enumeration;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import java.util.List;
public class SubscriptionTopicUtil {
public static boolean matches(BaseResourceMessage.OperationTypeEnum theOperationType, List<Enumeration<SubscriptionTopic.InteractionTrigger>> theSupportedInteractions) {
for (Enumeration<SubscriptionTopic.InteractionTrigger> next : theSupportedInteractions) {
if (next.getValue() == SubscriptionTopic.InteractionTrigger.CREATE && theOperationType == BaseResourceMessage.OperationTypeEnum.CREATE) {
return true;
}
if (next.getValue() == SubscriptionTopic.InteractionTrigger.UPDATE && theOperationType == BaseResourceMessage.OperationTypeEnum.UPDATE) {
return true;
}
if (next.getValue() == SubscriptionTopic.InteractionTrigger.DELETE && theOperationType == BaseResourceMessage.OperationTypeEnum.DELETE) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,87 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r5.model.Enumeration;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class SubscriptionTriggerMatcher {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggerMatcher.class);
private final SubscriptionTopicSupport mySubscriptionTopicSupport;
private final BaseResourceMessage.OperationTypeEnum myOperation;
private final SubscriptionTopic.SubscriptionTopicResourceTriggerComponent myTrigger;
private final String myResourceName;
private final IBaseResource myResource;
private final IFhirResourceDao myDao;
private final SystemRequestDetails mySrd;
public SubscriptionTriggerMatcher(SubscriptionTopicSupport theSubscriptionTopicSupport, ResourceModifiedMessage theMsg, SubscriptionTopic.SubscriptionTopicResourceTriggerComponent theTrigger) {
mySubscriptionTopicSupport = theSubscriptionTopicSupport;
myOperation = theMsg.getOperationType();
myResource = theMsg.getPayload(theSubscriptionTopicSupport.getFhirContext());
myResourceName = myResource.fhirType();
myDao = mySubscriptionTopicSupport.getDaoRegistry().getResourceDao(myResourceName);
myTrigger = theTrigger;
mySrd = new SystemRequestDetails();
}
public InMemoryMatchResult match() {
List<Enumeration<SubscriptionTopic.InteractionTrigger>> supportedInteractions = myTrigger.getSupportedInteraction();
if (SubscriptionTopicUtil.matches(myOperation, supportedInteractions)) {
SubscriptionTopic.SubscriptionTopicResourceTriggerQueryCriteriaComponent queryCriteria = myTrigger.getQueryCriteria();
InMemoryMatchResult result = match(queryCriteria);
if (result.matched()) {
return result;
}
}
return InMemoryMatchResult.noMatch();
}
private InMemoryMatchResult match(SubscriptionTopic.SubscriptionTopicResourceTriggerQueryCriteriaComponent theQueryCriteria) {
InMemoryMatchResult previousMatches = InMemoryMatchResult.successfulMatch();
InMemoryMatchResult currentMatches = InMemoryMatchResult.successfulMatch();
String previousCriteria = theQueryCriteria.getPrevious();
String currentCriteria = theQueryCriteria.getCurrent();
if (previousCriteria != null) {
if (myOperation == ResourceModifiedMessage.OperationTypeEnum.UPDATE ||
myOperation == ResourceModifiedMessage.OperationTypeEnum.DELETE) {
Long currentVersion = myResource.getIdElement().getVersionIdPartAsLong();
if (currentVersion > 1) {
IIdType previousVersionId = myResource.getIdElement().withVersion("" + (currentVersion - 1));
// WIP STR5 should we use the partition id from the resource? Ideally we should have a "previous version" service we can use for this
IBaseResource previousVersion = myDao.read(previousVersionId, new SystemRequestDetails());
previousMatches = matchResource(previousVersion, previousCriteria);
} else {
ourLog.warn("Resource {} has a version of 1, which should not be the case for a create or delete operation", myResource.getIdElement().toUnqualifiedVersionless());
}
}
}
if (currentCriteria != null) {
currentMatches = matchResource(myResource, currentCriteria);
}
// WIP STR5 is this the correct interpretation of requireBoth?
if (theQueryCriteria.getRequireBoth()) {
return InMemoryMatchResult.and(previousMatches, currentMatches);
} else {
return InMemoryMatchResult.or(previousMatches, currentMatches);
}
}
private InMemoryMatchResult matchResource(IBaseResource theResource, String theCriteria) {
InMemoryMatchResult result = mySubscriptionTopicSupport.getSearchParamMatcher().match(theCriteria, theResource, mySrd);
if (!result.supported()) {
ourLog.warn("Subscription topic {} has a query criteria that is not supported in-memory: {}", myTrigger.getId(), theCriteria);
}
return result;
}
}

View File

@ -2,21 +2,26 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.model.primitive.IdDt;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ActiveSubscriptionCacheTest {
static final String ID1 = "id1";
static final String ID2 = "id2";
static final String ID3 = "id3";
public static final String TEST_TOPIC_URL = "http://test.topic";
public static final String TEST_TOPIC_URL_OTHER = "http://test.topic.other";
@Test
public void twoPhaseDelete() {
@ -103,4 +108,36 @@ public class ActiveSubscriptionCacheTest {
assertFalse(activeSub2.isFlagForDeletion());
}
@Test
public void getTopicSubscriptionsForUrl() {
ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache();
ActiveSubscription activeSub1 = buildActiveSubscription(ID1);
activeSubscriptionCache.put(ID1, activeSub1);
assertThat(activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL), hasSize(0));
ActiveSubscription activeSub2 = buildTopicSubscription(ID2, TEST_TOPIC_URL);
activeSubscriptionCache.put(ID2, activeSub2);
assertThat(activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL), hasSize(1));
ActiveSubscription match = activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL).get(0);
assertEquals(ID2, match.getId());
ActiveSubscription activeSub3 = buildTopicSubscription(ID3, TEST_TOPIC_URL_OTHER);
activeSubscriptionCache.put(ID3, activeSub3);
assertThat(activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL), hasSize(1));
match = activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL).get(0);
assertEquals(ID2, match.getId());
assertThat(activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL_OTHER), hasSize(1));
match = activeSubscriptionCache.getTopicSubscriptionsForUrl(TEST_TOPIC_URL_OTHER).get(0);
assertEquals(ID3, match.getId());
}
@NotNull
private ActiveSubscription buildTopicSubscription(String theId, String theTopicUrl) {
ActiveSubscription activeSub2 = buildActiveSubscription(theId);
activeSub2.getSubscription().setTopicSubscription(true);
activeSub2.getSubscription().setCriteriaString(theTopicUrl);
return activeSub2;
}
}

View File

@ -6,10 +6,10 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ch.qos.logback.classic.Level;
@ -36,7 +36,6 @@ import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -55,7 +54,7 @@ public class SubscriptionLoaderTest {
private SubscriptionRegistry mySubscriptionRegistry;
@Mock
private DaoRegistry myDaoRegistery;
private DaoRegistry myDaoRegistry;
@Mock
private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
@ -76,6 +75,8 @@ public class SubscriptionLoaderTest {
@Mock
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Mock
private IFhirResourceDao mySubscriptionDao;
@InjectMocks
private SubscriptionLoader mySubscriptionLoader;
@ -95,6 +96,8 @@ public class SubscriptionLoaderTest {
anyLong()
)).thenReturn(mySubscriptionCache);
when(myDaoRegistry.getResourceDaoOrNull("Subscription")).thenReturn(mySubscriptionDao);
mySubscriptionLoader.registerListener();
}
@ -117,16 +120,15 @@ public class SubscriptionLoaderTest {
Subscription subscription = new Subscription();
subscription.setId("Subscription/123");
subscription.setError("THIS IS AN ERROR");
IFhirResourceDao<Subscription> subscriptionDao = mock(IFhirResourceDao.class);
ourLogger.setLevel(Level.ERROR);
// when
when(myDaoRegistery.getSubscriptionDao())
.thenReturn(subscriptionDao);
when(myDaoRegistery.isResourceTypeSupported(anyString()))
when(myDaoRegistry.getResourceDao("Subscription"))
.thenReturn(mySubscriptionDao);
when(myDaoRegistry.isResourceTypeSupported("Subscription"))
.thenReturn(true);
when(subscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
when(mySubscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
.thenReturn(getSubscriptionList(
Collections.singletonList(subscription)
));
@ -140,10 +142,10 @@ public class SubscriptionLoaderTest {
when(mySubscriptionCanonicalizer.getSubscriptionStatus(any())).thenReturn(SubscriptionConstants.REQUESTED_STATUS);
// test
mySubscriptionLoader.syncSubscriptions();
mySubscriptionLoader.syncDatabaseToCache();
// verify
verify(subscriptionDao)
verify(mySubscriptionDao)
.search(any(SearchParameterMap.class), any(SystemRequestDetails.class));
ArgumentCaptor<ILoggingEvent> eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);

View File

@ -2,18 +2,11 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
@Test
public void testMultipleThreadsDontBlock() throws InterruptedException {
@ -29,6 +22,6 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel
}).start();
latch.await(10, TimeUnit.SECONDS);
svc.syncSubscriptions();
svc.syncDatabaseToCache();
}
}

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
@ -26,9 +27,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collections;
import java.util.List;
@ -405,7 +406,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@Nested
public class TestDeleteMessages {
private final SubscriptionMatchingSubscriber subscriber = new SubscriptionMatchingSubscriber();
@Mock
ResourceModifiedMessage message;
@Mock
@ -422,11 +422,14 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
CanonicalSubscription myNonDeleteCanonicalSubscription;
@Mock
SubscriptionCriteriaParser.SubscriptionCriteria mySubscriptionCriteria;
@Mock
SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
@InjectMocks
SubscriptionMatchingSubscriber subscriber;
@Test
public void testAreNotIgnored() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
@ -443,9 +446,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@Test
public void matchActiveSubscriptionsChecksSendDeleteMessagesExtensionFlag() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
@ -463,9 +463,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@Test
public void testMultipleSubscriptionsDoNotEarlyReturn() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);
@ -488,9 +485,6 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@Test
public void matchActiveSubscriptionsAndDeliverSetsPartitionId() {
ReflectionTestUtils.setField(subscriber, "myInterceptorBroadcaster", myInterceptorBroadcaster);
ReflectionTestUtils.setField(subscriber, "mySubscriptionRegistry", mySubscriptionRegistry);
when(message.getOperationType()).thenReturn(BaseResourceModifiedMessage.OperationTypeEnum.DELETE);
when(myInterceptorBroadcaster.callHooks(
eq(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED), any(HookParams.class))).thenReturn(true);

View File

@ -5,15 +5,21 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4b.model.CanonicalType;
import org.hl7.fhir.r4b.model.Enumerations;
import org.hl7.fhir.r4b.model.Subscription;
import org.hl7.fhir.r4b.model.SubscriptionTopic;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -23,6 +29,10 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Nonnull;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@ -33,6 +43,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(SpringExtension.class)
public class SubscriptionValidatingInterceptorTest {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionValidatingInterceptorTest.class);
public static final String TEST_SUBSCRIPTION_TOPIC_URL = "http://test.topic";
@Autowired
private SubscriptionValidatingInterceptor mySubscriptionValidatingInterceptor;
@ -44,6 +55,8 @@ public class SubscriptionValidatingInterceptorTest {
private JpaStorageSettings myStorageSettings;
@MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IFhirResourceDao<SubscriptionTopic> mySubscriptionTopicDao;
@BeforeEach
public void before() {
@ -66,7 +79,7 @@ public class SubscriptionValidatingInterceptorTest {
public void testEmptyStatus() {
try {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
} catch (UnprocessableEntityException e) {
@ -78,7 +91,7 @@ public class SubscriptionValidatingInterceptorTest {
public void testBadCriteria() {
try {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient");
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
@ -91,7 +104,7 @@ public class SubscriptionValidatingInterceptorTest {
public void testBadChannel() {
try {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient?");
mySubscriptionValidatingInterceptor.resourcePreCreate(badSub, null, null);
fail();
@ -104,7 +117,7 @@ public class SubscriptionValidatingInterceptorTest {
public void testEmptyEndpoint() {
try {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient?");
Subscription.SubscriptionChannelComponent channel = badSub.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
@ -118,7 +131,7 @@ public class SubscriptionValidatingInterceptorTest {
@Test
public void testMalformedEndpoint() {
Subscription badSub = new Subscription();
badSub.setStatus(Subscription.SubscriptionStatus.ACTIVE);
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
badSub.setCriteria("Patient?");
Subscription.SubscriptionChannelComponent channel = badSub.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
@ -170,11 +183,40 @@ public class SubscriptionValidatingInterceptorTest {
}
}
@Test
public void testInvalidTopic() throws URISyntaxException {
when(myDaoRegistry.getResourceDao("SubscriptionTopic")).thenReturn(mySubscriptionTopicDao);
SimpleBundleProvider emptyBundleProvider = new SimpleBundleProvider(Collections.emptyList());
when(mySubscriptionTopicDao.search(any(), any())).thenReturn(emptyBundleProvider);
org.hl7.fhir.r4b.model.Subscription badSub = new org.hl7.fhir.r4b.model.Subscription();
badSub.setStatus(Enumerations.SubscriptionStatus.ACTIVE);
badSub.getMeta().getProfile().add(new CanonicalType(new URI("http://other.profile")));
badSub.getMeta().getProfile().add(new CanonicalType(new URI(SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL)));
badSub.setCriteria("http://topic.url");
Subscription.SubscriptionChannelComponent channel = badSub.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
channel.setEndpoint("channel:my-queue-name");
try {
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub, null, null, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), is(Msg.code(2322) + "No SubscriptionTopic exists with url: http://topic.url"));
}
// Happy path
SubscriptionTopic topic = new SubscriptionTopic();
SimpleBundleProvider simpleBundleProvider = new SimpleBundleProvider(List.of(topic));
when(mySubscriptionTopicDao.search(any(), any())).thenReturn(simpleBundleProvider);
mySubscriptionValidatingInterceptor.validateSubmittedSubscription(badSub, null, null, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
}
@Configuration
public static class SpringConfig {
@Bean
FhirContext fhirContext() {
return FhirContext.forR4();
return FhirContext.forR4B();
}
@Bean
@ -191,7 +233,7 @@ public class SubscriptionValidatingInterceptorTest {
@Nonnull
private static Subscription createSubscription() {
final Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setStatus(Enumerations.SubscriptionStatus.REQUESTED);
subscription.setCriteria("Patient?");
final Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);

View File

@ -0,0 +1,38 @@
package ca.uhn.fhir.jpa.topic;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ActiveSubscriptionTopicCacheTest {
@Test
public void testOperations() {
var cache = new ActiveSubscriptionTopicCache();
SubscriptionTopic topic1 = new SubscriptionTopic();
topic1.setId("1");
cache.add(topic1);
assertThat(cache.getAll(), hasSize(1));
assertEquals(1, cache.size());
assertEquals("1", cache.getAll().iterator().next().getId());
SubscriptionTopic topic2 = new SubscriptionTopic();
topic2.setId("2");
cache.add(topic2);
SubscriptionTopic topic3 = new SubscriptionTopic();
topic3.setId("3");
cache.add(topic3);
assertEquals(3, cache.size());
Set<String> idsToKeep = Set.of("1", "3");
int removed = cache.removeIdsNotInCollection(idsToKeep);
assertEquals(1, removed);
assertEquals(2, cache.size());
}
}

View File

@ -0,0 +1,91 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.r4b.model.Bundle;
import org.hl7.fhir.r4b.model.Encounter;
import org.hl7.fhir.r4b.model.Resource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SubscriptionTopicPayloadBuilderR4BTest {
FhirContext ourFhirContext = FhirContext.forR4BCached();
@Test
public void testBuildPayloadDelete() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(1, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals(Bundle.HTTPVerb.DELETE, payload.getEntry().get(1).getRequest().getMethod());
}
@Test
public void testBuildPayloadUpdate() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(2, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals("Encounter", resources.get(1).getResourceType().name());
assertEquals(Bundle.HTTPVerb.PUT, payload.getEntry().get(1).getRequest().getMethod());
}
@Test
public void testBuildPayloadCreate() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(2, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals("Encounter", resources.get(1).getResourceType().name());
assertEquals(Bundle.HTTPVerb.POST, payload.getEntry().get(1).getRequest().getMethod());
}
}

View File

@ -0,0 +1,91 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.Resource;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SubscriptionTopicPayloadBuilderR5Test {
FhirContext ourFhirContext = FhirContext.forR5Cached();
@Test
public void testBuildPayloadDelete() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.DELETE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(1, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals(Bundle.HTTPVerb.DELETE, payload.getEntry().get(1).getRequest().getMethod());
}
@Test
public void testBuildPayloadUpdate() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.UPDATE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(2, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals("Encounter", resources.get(1).getResourceType().name());
assertEquals(Bundle.HTTPVerb.PUT, payload.getEntry().get(1).getRequest().getMethod());
}
@Test
public void testBuildPayloadCreate() {
// setup
var svc = new SubscriptionTopicPayloadBuilder(ourFhirContext);
var encounter = new Encounter();
encounter.setId("Encounter/1");
ResourceModifiedMessage msg = new ResourceModifiedMessage();
CanonicalSubscription sub = new CanonicalSubscription();
ActiveSubscription subscription = new ActiveSubscription(sub, "test");
SubscriptionTopic topic = new SubscriptionTopic();
msg.setOperationType(BaseResourceMessage.OperationTypeEnum.CREATE);
// run
Bundle payload = (Bundle)svc.buildPayload(encounter, msg, subscription, topic);
// verify
List<Resource> resources = BundleUtil.toListOfResourcesOfType(ourFhirContext, payload, Resource.class);
assertEquals(2, resources.size());
assertEquals("SubscriptionStatus", resources.get(0).getResourceType().name());
assertEquals("Encounter", resources.get(1).getResourceType().name());
assertEquals(Bundle.HTTPVerb.POST, payload.getEntry().get(1).getRequest().getMethod());
}
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import org.hl7.fhir.r5.model.Enumeration;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SubscriptionTopicUtilTest {
@Test
public void testMatch() {
// I know this is gross. I haven't found a nicer way to do this
var create = new Enumeration<>(new SubscriptionTopic.InteractionTriggerEnumFactory());
create.setValue(SubscriptionTopic.InteractionTrigger.CREATE);
var delete = new Enumeration<>(new SubscriptionTopic.InteractionTriggerEnumFactory());
delete.setValue(SubscriptionTopic.InteractionTrigger.DELETE);
List<Enumeration<SubscriptionTopic.InteractionTrigger>> supportedTypes = List.of(create, delete);
assertTrue(SubscriptionTopicUtil.matches(BaseResourceMessage.OperationTypeEnum.CREATE, supportedTypes));
assertFalse(SubscriptionTopicUtil.matches(BaseResourceMessage.OperationTypeEnum.UPDATE, supportedTypes));
assertTrue(SubscriptionTopicUtil.matches(BaseResourceMessage.OperationTypeEnum.DELETE, supportedTypes));
assertFalse(SubscriptionTopicUtil.matches(BaseResourceMessage.OperationTypeEnum.MANUALLY_TRIGGERED, supportedTypes));
assertFalse(SubscriptionTopicUtil.matches(BaseResourceMessage.OperationTypeEnum.TRANSACTION, supportedTypes));
}
}

View File

@ -0,0 +1,133 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.IdType;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class SubscriptionTriggerMatcherTest {
private static final FhirContext ourFhirContext = FhirContext.forR5();
@Mock
DaoRegistry myDaoRegistry;
@Mock
SearchParamMatcher mySearchParamMatcher;
private SubscriptionTopicSupport mySubscriptionTopicSupport;
private Encounter myEncounter;
@BeforeEach
public void before() {
mySubscriptionTopicSupport = new SubscriptionTopicSupport(ourFhirContext, myDaoRegistry, mySearchParamMatcher);
myEncounter = new Encounter();
myEncounter.setIdElement(new IdType("Encounter", "123", "2"));
}
@Test
public void testCreateEmptryTriggerNoMatch() {
// setup
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.CREATE);
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
InMemoryMatchResult result = svc.match();
// verify
assertFalse(result.matched());
}
@Test
public void testCreateSimpleTriggerMatches() {
// setup
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.CREATE);
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
trigger.setResource("Encounter");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.CREATE);
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
InMemoryMatchResult result = svc.match();
// verify
assertTrue(result.matched());
}
@Test
public void testCreateWrongOpNoMatch() {
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.CREATE);
// setup
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
trigger.setResource("Encounter");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
InMemoryMatchResult result = svc.match();
// verify
assertFalse(result.matched());
}
@Test
public void testUpdateMatch() {
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
// setup
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
trigger.setResource("Encounter");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
InMemoryMatchResult result = svc.match();
// verify
assertTrue(result.matched());
}
@Test
public void testUpdateWithPrevCriteriaMatch() {
ResourceModifiedMessage msg = new ResourceModifiedMessage(ourFhirContext, myEncounter, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
// setup
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = new SubscriptionTopic.SubscriptionTopicResourceTriggerComponent();
trigger.setResource("Encounter");
trigger.addSupportedInteraction(SubscriptionTopic.InteractionTrigger.UPDATE);
trigger.getQueryCriteria().setPrevious("Encounter?status=in-progress");
IFhirResourceDao mockEncounterDao = mock(IFhirResourceDao.class);
when(myDaoRegistry.getResourceDao("Encounter")).thenReturn(mockEncounterDao);
Encounter encounterPreviousVersion = new Encounter();
when(mockEncounterDao.read(any(), any())).thenReturn(encounterPreviousVersion);
when(mySearchParamMatcher.match(any(), any(), any())).thenReturn(InMemoryMatchResult.successfulMatch());
// run
SubscriptionTriggerMatcher svc = new SubscriptionTriggerMatcher(mySubscriptionTopicSupport, msg, trigger);
InMemoryMatchResult result = svc.match();
// verify
assertTrue(result.matched());
}
}

View File

@ -7,9 +7,9 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
@ -33,7 +33,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.nullable;
@ -299,6 +299,7 @@ public class SubscriptionValidatingInterceptorTest {
@Test
public void testSubscriptionUpdate() {
// setup
when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true);
when(myStorageSettings.isCrossPartitionSubscriptionEnabled()).thenReturn(true);
lenient()
@ -318,9 +319,11 @@ public class SubscriptionValidatingInterceptorTest {
lenient()
.when(requestDetails.getRestOperationType()).thenReturn(RestOperationTypeEnum.UPDATE);
// execute
mySvc.resourceUpdated(subscription, subscription, requestDetails, null);
verify(mySubscriptionStrategyEvaluator).determineStrategy(anyString());
// verify
verify(mySubscriptionStrategyEvaluator).determineStrategy(any(CanonicalSubscription.class));
verify(mySubscriptionCanonicalizer, times(2)).setMatchingStrategyTag(eq(subscription), nullable(SubscriptionMatchingStrategy.class));
}
}

View File

@ -0,0 +1,244 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.provider.r4b.BaseResourceProviderR4BTest;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.test.utilities.server.TransactionCapturingProviderExtension;
import ca.uhn.fhir.util.BundleUtil;
import com.apicatalog.jsonld.StringUtils;
import net.ttddyy.dsproxy.QueryCount;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4b.model.Bundle;
import org.hl7.fhir.r4b.model.CodeableConcept;
import org.hl7.fhir.r4b.model.Coding;
import org.hl7.fhir.r4b.model.Enumerations;
import org.hl7.fhir.r4b.model.Extension;
import org.hl7.fhir.r4b.model.Observation;
import org.hl7.fhir.r4b.model.Organization;
import org.hl7.fhir.r4b.model.Patient;
import org.hl7.fhir.r4b.model.Subscription;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public abstract class BaseSubscriptionsR4BTest extends BaseResourceProviderR4BTest {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseSubscriptionsR4BTest.class);
protected static int ourListenerPort;
@Order(0)
@RegisterExtension
protected static RestfulServerExtension ourRestfulServer = new RestfulServerExtension(FhirContext.forR4BCached());
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Patient> ourPatientProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Patient.class);
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Observation> ourObservationProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Observation.class);
@Order(1)
@RegisterExtension
protected static TransactionCapturingProviderExtension<Bundle> ourTransactionProvider = new TransactionCapturingProviderExtension<>(ourRestfulServer, Bundle.class);
protected static SingleQueryCountHolder ourCountHolder;
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Organization> ourOrganizationProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Organization.class);
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
protected CountingInterceptor myCountingInterceptor;
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@Autowired
private SingleQueryCountHolder myCountHolder;
@AfterEach
public void afterUnregisterRestHookListener() {
for (IIdType next : mySubscriptionIds) {
IIdType nextId = next.toUnqualifiedVersionless();
ourLog.info("Deleting: {}", nextId);
myClient.delete().resourceById(nextId).execute();
}
mySubscriptionIds.clear();
myStorageSettings.setAllowMultipleDelete(true);
ourLog.info("Deleting all subscriptions");
myClient.delete().resourceConditionalByUrl("Subscription?status=active").execute();
myClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute();
ourLog.info("Done deleting all subscriptions");
myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete());
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
}
@BeforeEach
public void beforeRegisterRestHookListener() {
mySubscriptionTestUtil.registerRestHookInterceptor();
}
@BeforeEach
public void beforeReset() throws Exception {
// Delete all Subscriptions
if (myClient != null) {
Bundle allSubscriptions = myClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute();
for (IBaseResource next : BundleUtil.toListOfResources(myFhirContext, allSubscriptions)) {
myClient.delete().resource(next).execute();
}
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}
myCountingInterceptor = new CountingInterceptor();
if (processingChannel != null) {
processingChannel.addInterceptor(myCountingInterceptor);
}
}
protected Subscription createSubscription(String theCriteria, String thePayload) {
return createSubscription(theCriteria, thePayload, null);
}
protected Subscription createSubscription(String theCriteria, String thePayload, Extension theExtension) {
String id = null;
return createSubscription(theCriteria, thePayload, theExtension, id);
}
@NotNull
protected Subscription createSubscription(String theCriteria, String thePayload, Extension theExtension, String id) {
Subscription subscription = newSubscription(theCriteria, thePayload);
if (theExtension != null) {
subscription.getChannel().addExtension(theExtension);
}
if (id != null) {
subscription.setId(id);
}
subscription = postOrPutSubscription(subscription);
return subscription;
}
protected Subscription postOrPutSubscription(IBaseResource theSubscription) {
MethodOutcome methodOutcome;
if (theSubscription.getIdElement().isEmpty()) {
methodOutcome = myClient.create().resource(theSubscription).execute();
} else {
methodOutcome = myClient.update().resource(theSubscription).execute();
}
theSubscription.setId(methodOutcome.getId().toUnqualifiedVersionless());
mySubscriptionIds.add(methodOutcome.getId());
return (Subscription) theSubscription;
}
protected Subscription newSubscription(String theCriteria, String thePayload) {
return newSubscriptionWithStatus(theCriteria, thePayload, Enumerations.SubscriptionStatus.ACTIVE);
}
@Nonnull
protected Subscription newSubscriptionWithStatus(String theCriteria, String thePayload, Enumerations.SubscriptionStatus theSubscriptionStatus) {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(theSubscriptionStatus);
subscription.setCriteria(theCriteria);
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(thePayload);
channel.setEndpoint(ourRestfulServer.getBaseUrl());
return subscription;
}
protected void waitForQueueToDrain() throws InterruptedException {
mySubscriptionTestUtil.waitForQueueToDrain();
}
@PostConstruct
public void initializeOurCountHolder() {
ourCountHolder = myCountHolder;
}
protected Observation sendObservation(String theCode, String theSystem) {
return sendObservation(theCode, theSystem, null, null);
}
protected Observation sendObservation(String theCode, String theSystem, String theSource, String theRequestId) {
Observation observation = createBaseObservation(theCode, theSystem);
if (StringUtils.isNotBlank(theSource)) {
observation.getMeta().setSource(theSource);
}
SystemRequestDetails systemRequestDetails = new SystemRequestDetails();
if (StringUtils.isNotBlank(theRequestId)) {
systemRequestDetails.setRequestId(theRequestId);
}
IIdType id = myObservationDao.create(observation, systemRequestDetails).getId();
observation.setId(id);
return observation;
}
protected Observation createBaseObservation(String theCode, String theSystem) {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept);
observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
Coding coding = codeableConcept.addCoding();
coding.setCode(theCode);
coding.setSystem(theSystem);
observation.setStatus(Enumerations.ObservationStatus.FINAL);
return observation;
}
protected Patient sendPatient() {
Patient patient = new Patient();
patient.setActive(true);
IIdType id = myPatientDao.create(patient).getId();
patient.setId(id);
return patient;
}
protected Organization sendOrganization() {
Organization org = new Organization();
org.setName("ORG");
IIdType id = myOrganizationDao.create(org).getId();
org.setId(id);
return org;
}
@AfterAll
public static void reportTotalSelects() {
ourLog.info("Total database select queries: {}", getQueryCount().getSelect());
}
private static QueryCount getQueryCount() {
return ourCountHolder.getQueryCountMap().get("");
}
}

View File

@ -0,0 +1,243 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicLoader;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.rest.annotation.Transaction;
import ca.uhn.fhir.rest.annotation.TransactionParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.test.concurrency.PointcutLatch;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4b.model.Bundle;
import org.hl7.fhir.r4b.model.Encounter;
import org.hl7.fhir.r4b.model.Enumerations;
import org.hl7.fhir.r4b.model.Subscription;
import org.hl7.fhir.r4b.model.SubscriptionStatus;
import org.hl7.fhir.r4b.model.SubscriptionTopic;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR4BTest.class);
public static final String SUBSCRIPTION_TOPIC_TEST_URL = "http://example.com/topic/test";
@Autowired
protected SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@Autowired
protected SubscriptionTopicLoader mySubscriptionTopicLoader;
@Autowired
private IInterceptorService myInterceptorService;
protected IFhirResourceDao<SubscriptionTopic> mySubscriptionTopicDao;
private static final TestSystemProvider ourTestSystemProvider = new TestSystemProvider();
private final PointcutLatch mySubscriptionTopicsCheckedLatch = new PointcutLatch(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED);
private final PointcutLatch mySubscriptionDeliveredLatch = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
@Override
@BeforeEach
protected void before() throws Exception {
super.before();
ourRestfulServer.unregisterProvider(mySystemProvider);
ourRestfulServer.registerProvider(ourTestSystemProvider);
mySubscriptionTopicDao = myDaoRegistry.getResourceDao(SubscriptionTopic.class);
myInterceptorService.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionTopicsCheckedLatch);
myInterceptorService.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY, mySubscriptionDeliveredLatch);
}
@Override
@AfterEach
public void after() throws Exception {
ourRestfulServer.unregisterProvider(ourTestSystemProvider);
ourRestfulServer.registerProvider(mySystemProvider);
myInterceptorService.unregisterAllAnonymousInterceptors();
mySubscriptionTopicsCheckedLatch.clear();
mySubscriptionDeliveredLatch.clear();
ourTestSystemProvider.clear();
super.after();
}
@Test
public void testCreate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Encounter.EncounterStatus.PLANNED, Encounter.EncounterStatus.FINISHED, SubscriptionTopic.InteractionTrigger.CREATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
waitForRegisteredSubscriptionTopicCount(1);
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
waitForActivatedSubscriptionCount(1);
assertEquals(0, ourTestSystemProvider.getCount());
Encounter sentEncounter = sendEncounterWithStatus(Encounter.EncounterStatus.FINISHED, true);
assertEquals(1, ourTestSystemProvider.getCount());
Bundle receivedBundle = ourTestSystemProvider.getLastInput();
List<IBaseResource> resources = BundleUtil.toListOfResources(myFhirCtx, receivedBundle);
assertEquals(2, resources.size());
SubscriptionStatus ss = (SubscriptionStatus) resources.get(0);
validateSubscriptionStatus(subscription, sentEncounter, ss);
Encounter encounter = (Encounter) resources.get(1);
assertEquals(Encounter.EncounterStatus.FINISHED, encounter.getStatus());
assertEquals(sentEncounter.getIdElement(), encounter.getIdElement());
}
@Test
public void testUpdate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Encounter.EncounterStatus.PLANNED, Encounter.EncounterStatus.FINISHED, SubscriptionTopic.InteractionTrigger.CREATE, SubscriptionTopic.InteractionTrigger.UPDATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
waitForRegisteredSubscriptionTopicCount(1);
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
waitForActivatedSubscriptionCount(1);
assertEquals(0, ourTestSystemProvider.getCount());
Encounter sentEncounter = sendEncounterWithStatus(Encounter.EncounterStatus.PLANNED, false);
assertEquals(0, ourTestSystemProvider.getCount());
sentEncounter.setStatus(Encounter.EncounterStatus.FINISHED);
updateEncounter(sentEncounter, true);
assertEquals(1, ourTestSystemProvider.getCount());
Bundle receivedBundle = ourTestSystemProvider.getLastInput();
List<IBaseResource> resources = BundleUtil.toListOfResources(myFhirCtx, receivedBundle);
assertEquals(2, resources.size());
SubscriptionStatus ss = (SubscriptionStatus) resources.get(0);
validateSubscriptionStatus(subscription, sentEncounter, ss);
Encounter encounter = (Encounter) resources.get(1);
assertEquals(Encounter.EncounterStatus.FINISHED, encounter.getStatus());
assertEquals(sentEncounter.getIdElement(), encounter.getIdElement());
}
private static void validateSubscriptionStatus(Subscription subscription, Encounter sentEncounter, SubscriptionStatus ss) {
assertEquals(Enumerations.SubscriptionStatus.ACTIVE, ss.getStatus());
assertEquals(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION, ss.getType());
assertEquals("1", ss.getEventsSinceSubscriptionStartElement().getValueAsString());
List<SubscriptionStatus.SubscriptionStatusNotificationEventComponent> notificationEvents = ss.getNotificationEvent();
assertEquals(1, notificationEvents.size());
SubscriptionStatus.SubscriptionStatusNotificationEventComponent notificationEvent = notificationEvents.get(0);
assertEquals("1", notificationEvent.getEventNumber());
assertEquals(sentEncounter.getIdElement().toUnqualifiedVersionless(), notificationEvent.getFocus().getReferenceElement());
assertEquals(subscription.getIdElement().toUnqualifiedVersionless(), ss.getSubscription().getReferenceElement());
assertEquals(SUBSCRIPTION_TOPIC_TEST_URL, ss.getTopic());
}
private Subscription createTopicSubscription(String theTopicUrl) throws InterruptedException {
Subscription subscription = newSubscription(theTopicUrl, Constants.CT_FHIR_JSON_NEW);
subscription.getMeta().addProfile(SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL);
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
Subscription retval = postOrPutSubscription(subscription);
mySubscriptionTopicsCheckedLatch.awaitExpected();
return retval;
}
private void waitForRegisteredSubscriptionTopicCount(int theTarget) throws Exception {
await().until(() -> subscriptionTopicRegistryHasSize(theTarget));
}
private boolean subscriptionTopicRegistryHasSize(int theTarget) {
int size = mySubscriptionTopicRegistry.size();
if (size == theTarget) {
return true;
}
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
return mySubscriptionTopicRegistry.size() == theTarget;
}
private SubscriptionTopic createEncounterSubscriptionTopic(Encounter.EncounterStatus theFrom, Encounter.EncounterStatus theCurrent, SubscriptionTopic.InteractionTrigger... theInteractionTriggers) {
SubscriptionTopic retval = new SubscriptionTopic();
retval.setUrl(SUBSCRIPTION_TOPIC_TEST_URL);
retval.setStatus(Enumerations.PublicationStatus.ACTIVE);
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = retval.addResourceTrigger();
trigger.setResource("Encounter");
for (SubscriptionTopic.InteractionTrigger interactionTrigger : theInteractionTriggers) {
trigger.addSupportedInteraction(interactionTrigger);
}
SubscriptionTopic.SubscriptionTopicResourceTriggerQueryCriteriaComponent queryCriteria = trigger.getQueryCriteria();
queryCriteria.setPrevious("Encounter?status=" + theFrom.toCode());
queryCriteria.setCurrent("Encounter?status=" + theCurrent.toCode());
queryCriteria.setRequireBoth(true);
mySubscriptionTopicDao.create(retval, mySrd);
return retval;
}
private Encounter sendEncounterWithStatus(Encounter.EncounterStatus theStatus, boolean theExpectDelivery) throws InterruptedException {
Encounter encounter = new Encounter();
encounter.setStatus(theStatus);
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.setExpectedCount(1);
}
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
IIdType id = myEncounterDao.create(encounter, mySrd).getId();
mySubscriptionTopicsCheckedLatch.awaitExpected();
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.awaitExpected();
}
encounter.setId(id);
return encounter;
}
private Encounter updateEncounter(Encounter theEncounter, boolean theExpectDelivery) throws InterruptedException {
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.setExpectedCount(1);
}
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
Encounter retval = (Encounter) myEncounterDao.update(theEncounter, mySrd).getResource();
mySubscriptionTopicsCheckedLatch.awaitExpected();
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.awaitExpected();
}
return retval;
}
static class TestSystemProvider {
final AtomicInteger myCount = new AtomicInteger(0);
Bundle myLastInput;
@Transaction
public Bundle transaction(@TransactionParam Bundle theInput) {
myCount.incrementAndGet();
myLastInput = theInput;
return theInput;
}
public int getCount() {
return myCount.get();
}
public Bundle getLastInput() {
return myLastInput;
}
public void clear() {
myCount.set(0);
myLastInput = null;
}
}
}

View File

@ -39,6 +39,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
@ -57,6 +58,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
private static Server ourListenerServer;
private static SingleQueryCountHolder ourCountHolder;
private static String ourListenerServerBase;
protected static RestfulServer ourListenerRestServer;
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
@ -120,8 +122,13 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
protected Subscription createSubscription(String theCriteria, String thePayload) {
Subscription subscription = newSubscription(theCriteria, thePayload);
return postSubscription(subscription);
}
@Nonnull
protected Subscription postSubscription(Subscription subscription) {
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
subscription.setId(methodOutcome.getId().toVersionless());
mySubscriptionIds.add(methodOutcome.getId());
return subscription;
@ -225,7 +232,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
@BeforeAll
public static void startListenerServer() throws Exception {
RestfulServer ourListenerRestServer = new RestfulServer(FhirContext.forR5Cached());
ourListenerRestServer = new RestfulServer(FhirContext.forR5Cached());
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);

View File

@ -0,0 +1,169 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicLoader;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.rest.annotation.Transaction;
import ca.uhn.fhir.rest.annotation.TransactionParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Subscription;
import org.hl7.fhir.r5.model.SubscriptionStatus;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR5Test.class);
public static final String SUBSCRIPTION_TOPIC_TEST_URL = "http://example.com/topic/test";
@Autowired
protected SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@Autowired
protected SubscriptionTopicLoader mySubscriptionTopicLoader;
protected IFhirResourceDao<SubscriptionTopic> mySubscriptionTopicDao;
private static final TestSystemProvider ourTestSystemProvider = new TestSystemProvider();
@Override
@BeforeEach
protected void before() throws Exception {
super.before();
ourListenerRestServer.unregisterProvider(mySystemProvider);
ourListenerRestServer.registerProvider(ourTestSystemProvider);
mySubscriptionTopicDao = myDaoRegistry.getResourceDao(SubscriptionTopic.class);
}
@Override
@AfterEach
public void after() throws Exception {
ourListenerRestServer.unregisterProvider(ourTestSystemProvider);
ourListenerRestServer.registerProvider(mySystemProvider);
super.after();
}
@Test
public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Encounter.EncounterStatus.PLANNED, Encounter.EncounterStatus.COMPLETED, SubscriptionTopic.InteractionTrigger.CREATE);
waitForRegisteredSubscriptionTopicCount(1);
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
waitForActivatedSubscriptionCount(1);
assertEquals(0, ourTestSystemProvider.getCount());
Encounter sentEncounter = sendEncounterWithStatus(Encounter.EncounterStatus.COMPLETED);
// Should see 1 subscription notification
waitForQueueToDrain();
await().until(() -> ourTestSystemProvider.getCount() > 0);
Bundle receivedBundle = ourTestSystemProvider.getLastInput();
List<IBaseResource> resources = BundleUtil.toListOfResources(myFhirCtx, receivedBundle);
assertEquals(2, resources.size());
SubscriptionStatus ss = (SubscriptionStatus) resources.get(0);
validateSubscriptionStatus(subscription, sentEncounter, ss);
Encounter encounter = (Encounter) resources.get(1);
assertEquals(Encounter.EncounterStatus.COMPLETED, encounter.getStatus());
assertEquals(sentEncounter.getIdElement(), encounter.getIdElement());
}
private static void validateSubscriptionStatus(Subscription subscription, Encounter sentEncounter, SubscriptionStatus ss) {
assertEquals(Enumerations.SubscriptionStatusCodes.ACTIVE, ss.getStatus());
assertEquals(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION, ss.getType());
assertEquals("1", ss.getEventsSinceSubscriptionStartElement().getValueAsString());
List<SubscriptionStatus.SubscriptionStatusNotificationEventComponent> notificationEvents = ss.getNotificationEvent();
assertEquals(1, notificationEvents.size());
SubscriptionStatus.SubscriptionStatusNotificationEventComponent notificationEvent = notificationEvents.get(0);
assertEquals(1, notificationEvent.getEventNumber());
assertEquals(sentEncounter.getIdElement().toUnqualifiedVersionless(), notificationEvent.getFocus().getReferenceElement());
assertEquals(subscription.getIdElement().toUnqualifiedVersionless(), ss.getSubscription().getReferenceElement());
assertEquals(SUBSCRIPTION_TOPIC_TEST_URL, ss.getTopic());
}
private Subscription createTopicSubscription(String theTopicUrl) {
Subscription subscription = newSubscription(theTopicUrl, Constants.CT_FHIR_JSON_NEW);
subscription.getMeta().addProfile(SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL);
return postSubscription(subscription);
}
private void waitForRegisteredSubscriptionTopicCount(int theTarget) throws Exception {
await().until(() -> subscriptionTopicRegistryHasSize(theTarget));
}
private boolean subscriptionTopicRegistryHasSize(int theTarget) {
int size = mySubscriptionTopicRegistry.size();
if (size == theTarget) {
return true;
}
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
return mySubscriptionTopicRegistry.size() == theTarget;
}
private SubscriptionTopic createEncounterSubscriptionTopic(Encounter.EncounterStatus theFrom, Encounter.EncounterStatus theCurrent, SubscriptionTopic.InteractionTrigger... theInteractionTriggers) {
SubscriptionTopic retval = new SubscriptionTopic();
retval.setUrl(SUBSCRIPTION_TOPIC_TEST_URL);
retval.setStatus(Enumerations.PublicationStatus.ACTIVE);
SubscriptionTopic.SubscriptionTopicResourceTriggerComponent trigger = retval.addResourceTrigger();
trigger.setResource("Encounter");
for (SubscriptionTopic.InteractionTrigger interactionTrigger : theInteractionTriggers) {
trigger.addSupportedInteraction(interactionTrigger);
}
SubscriptionTopic.SubscriptionTopicResourceTriggerQueryCriteriaComponent queryCriteria = trigger.getQueryCriteria();
queryCriteria.setPrevious("Encounter?status=" + theFrom.toCode());
queryCriteria.setCurrent("Encounter?status=" + theCurrent.toCode());
queryCriteria.setRequireBoth(true);
queryCriteria.setRequireBoth(true);
mySubscriptionTopicDao.create(retval, mySrd);
return retval;
}
private Encounter sendEncounterWithStatus(Encounter.EncounterStatus theStatus) {
Encounter encounter = new Encounter();
encounter.setStatus(theStatus);
IIdType id = myEncounterDao.create(encounter, mySrd).getId();
encounter.setId(id);
return encounter;
}
static class TestSystemProvider {
AtomicInteger myCount = new AtomicInteger(0);
Bundle myLastInput;
@Transaction
public Bundle transaction(@TransactionParam Bundle theInput) {
myCount.incrementAndGet();
myLastInput = theInput;
return theInput;
}
public int getCount() {
return myCount.get();
}
public Bundle getLastInput() {
return myLastInput;
}
}
}

View File

@ -129,6 +129,8 @@ import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -836,18 +838,10 @@ public abstract class BaseJpaTest extends BaseTest {
}
@SuppressWarnings("BusyWait")
public static void waitForSize(int theTarget, int theTimeout, Callable<Number> theCallable, Callable<String> theFailureMessage) throws Exception {
StopWatch sw = new StopWatch();
while (theCallable.call().intValue() != theTarget && sw.getMillis() < theTimeout) {
try {
Thread.sleep(50);
} catch (InterruptedException theE) {
throw new Error(theE);
}
}
if (sw.getMillis() >= theTimeout) {
fail("Size " + theCallable.call() + " is != target " + theTarget + " - " + theFailureMessage.call());
}
Thread.sleep(500);
public static void waitForSize(int theTarget, int theTimeoutMillis, Callable<Number> theCallable, Callable<String> theFailureMessage) throws Exception {
await()
.alias("Waiting for size " + theTarget + ". Current size is " + theCallable.call().intValue() + ": " + theFailureMessage.call())
.atMost(Duration.of(theTimeoutMillis, ChronoUnit.MILLIS))
.until(() -> theCallable.call().intValue() == theTarget);
}
}

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.rest.server;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
@ -167,4 +168,10 @@ public class SimpleBundleProvider implements IBundleProvider {
return mySize;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("mySize", mySize)
.toString();
}
}

View File

@ -0,0 +1,204 @@
package ca.uhn.fhir.cache;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener {
private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
private final String myResourceName;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
@Autowired
DaoRegistry myDaoRegistry;
private SearchParameterMap mySearchParameterMap;
private SystemRequestDetails mySystemRequestDetails;
private boolean myStopping;
private boolean myEnabled;
private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
private final Object mySyncResourcesLock = new Object();
public BaseResourceCacheSynchronizer(String theResourceName) {
myResourceName = theResourceName;
}
@PostConstruct
public void registerListener() {
if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
return;
}
mySearchParameterMap = getSearchParameterMap();
mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
IResourceChangeListenerCache resourceCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener(myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL);
resourceCache.forceRefresh();
myEnabled = true;
}
@PreDestroy
public void unregisterListener() {
myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
}
private boolean resourceDaoExists() {
return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName);
}
/**
* Read the existing resources from the database
*/
public void syncDatabaseToCache() {
if (!myEnabled) {
return;
}
if (!resourceDaoExists()) {
return;
}
if (!mySyncResourcesSemaphore.tryAcquire()) {
return;
}
try {
doSyncResourcesWithRetry();
} finally {
mySyncResourcesSemaphore.release();
}
}
@VisibleForTesting
public void acquireSemaphoreForUnitTest() throws InterruptedException {
if (!myEnabled) {
return;
}
mySyncResourcesSemaphore.acquire();
}
@VisibleForTesting
public int doSyncResourcessForUnitTest() {
if (!myEnabled) {
return 0;
}
// Two passes for delete flag to take effect
int first = doSyncResourcesWithRetry();
int second = doSyncResourcesWithRetry();
return first + second;
}
synchronized int doSyncResourcesWithRetry() {
// retry runs MAX_RETRIES times
// and if errors result every time, it will fail
Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, MAX_RETRIES);
return syncResourceRetrier.runWithRetry();
}
private int doSyncResources() {
if (isStopping()) {
return 0;
}
synchronized (mySyncResourcesLock) {
ourLog.debug("Starting sync {}s", myResourceName);
IBundleProvider resourceBundleList = getResourceDao().search(mySearchParameterMap, mySystemRequestDetails);
Integer resourceCount = resourceBundleList.size();
assert resourceCount != null;
if (resourceCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) {
ourLog.error("Currently over {} {}s. Some {}s have not been loaded.", SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS, myResourceName, myResourceName);
}
List<IBaseResource> resourceList = resourceBundleList.getResources(0, resourceCount);
return syncResourcesIntoCache(resourceList);
}
}
protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList);
@EventListener(ContextRefreshedEvent.class)
public void start() {
myStopping = false;
}
@EventListener(ContextClosedEvent.class)
public void shutdown() {
myStopping = true;
}
private boolean isStopping() {
return myStopping;
}
private IFhirResourceDao<?> getResourceDao() {
return myDaoRegistry.getResourceDao(myResourceName);
}
@Override
public void handleInit(Collection<IIdType> theResourceIds) {
if (!myEnabled) {
return;
}
if (!resourceDaoExists()) {
ourLog.warn("The resource type {} is enabled on this server, but there is no {} DAO configured.", myResourceName, myResourceName);
return;
}
IFhirResourceDao<?> resourceDao = getResourceDao();
SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
List<IBaseResource> resourceList = theResourceIds.stream().map(n -> resourceDao.read(n, systemRequestDetails)).collect(Collectors.toList());
handleInit(resourceList);
}
protected abstract void handleInit(List<IBaseResource> resourceList);
@Override
public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
if (!myEnabled) {
return;
}
// For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based on
// known resources that have been created, updated & deleted
syncDatabaseToCache();
}
@Nonnull
protected abstract SearchParameterMap getSearchParameterMap();
}

View File

@ -25,9 +25,14 @@ public enum SubscriptionMatchingStrategy {
*/
IN_MEMORY,
/**
/**
* Resources cannot be matched against this subscription in-memory. We need to make a call to a FHIR Repository to determine a match
*/
DATABASE
DATABASE,
/**
* This subscription uses a SubscriptionTopic for its matching
*/
TOPIC
}

View File

@ -78,6 +78,8 @@ public class SubscriptionCanonicalizer {
return canonicalizeDstu3(theSubscription);
case R4:
return canonicalizeR4(theSubscription);
case R4B:
return canonicalizeR4B(theSubscription);
case R5:
return canonicalizeR5(theSubscription);
case DSTU2_HL7ORG:
@ -231,11 +233,20 @@ public class SubscriptionCanonicalizer {
}, toList())));
}
case R5: {
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;
return subscription
.getExtension()
.stream()
.collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList())));
// TODO KHS fix org.hl7.fhir.r4b.model.BaseResource.getStructureFhirVersionEnum() for R4B
if (theSubscription instanceof org.hl7.fhir.r4b.model.Subscription) {
org.hl7.fhir.r4b.model.Subscription subscription = (org.hl7.fhir.r4b.model.Subscription) theSubscription;
return subscription
.getExtension()
.stream()
.collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList())));
} else if (theSubscription instanceof org.hl7.fhir.r5.model.Subscription) {
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;
return subscription
.getExtension()
.stream()
.collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList())));
}
}
case DSTU2_HL7ORG:
case DSTU2_1:
@ -307,25 +318,32 @@ public class SubscriptionCanonicalizer {
return retVal;
}
private CanonicalSubscription canonicalizeR5(IBaseResource theSubscription) {
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;
private CanonicalSubscription canonicalizeR4B(IBaseResource theSubscription) {
org.hl7.fhir.r4b.model.Subscription subscription = (org.hl7.fhir.r4b.model.Subscription) theSubscription;
CanonicalSubscription retVal = new CanonicalSubscription();
Enumerations.SubscriptionStatusCodes status = subscription.getStatus();
org.hl7.fhir.r4b.model.Enumerations.SubscriptionStatus status = subscription.getStatus();
if (status != null) {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
}
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelType(getChannelType(subscription));
retVal.setCriteriaString(getCriteria(theSubscription));
retVal.setEndpointUrl(subscription.getEndpoint());
retVal.setHeaders(subscription.getHeader());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getContentType());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
List<org.hl7.fhir.r4b.model.CanonicalType> profiles = subscription.getMeta().getProfile();
for (org.hl7.fhir.r4b.model.CanonicalType next : profiles) {
if (SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL.equals(next.getValueAsString())) {
retVal.setTopicSubscription(true);
}
}
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
String subjectTemplate;
@ -339,6 +357,78 @@ public class SubscriptionCanonicalizer {
retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
}
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
String stripVersionIds;
String deliverLatestVersion;
try {
stripVersionIds = getExtensionString(subscription.getChannel(), HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = getExtensionString(subscription.getChannel(), HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(565) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
}
List<org.hl7.fhir.r4b.model.Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
if (topicExts.size() > 0) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException(Msg.code(566) + "Topic reference must be an EventDefinition");
}
}
org.hl7.fhir.r4b.model.Extension extension = subscription.getChannel().getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
if (extension != null && extension.hasValue() && extension.hasValueBooleanType()) {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
return retVal;
}
private CanonicalSubscription canonicalizeR5(IBaseResource theSubscription) {
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;
// WIP STR5 now that we have SubscriptionTopic, rewrite this so that all R5 subscriptions are SubscriptionTopic
// subscriptions. This will require major rework of RestHookTestR5Test
CanonicalSubscription retVal = new CanonicalSubscription();
Enumerations.SubscriptionStatusCodes status = subscription.getStatus();
if (status != null) {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
}
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelType(getChannelType(subscription));
retVal.setCriteriaString(getCriteria(theSubscription));
retVal.setEndpointUrl(subscription.getEndpoint());
retVal.setHeaders(subscription.getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getContentType());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
List<org.hl7.fhir.r5.model.CanonicalType> profiles = subscription.getMeta().getProfile();
for (org.hl7.fhir.r5.model.CanonicalType next : profiles) {
if (SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL.equals(next.getValueAsString())) {
retVal.setTopicSubscription(true);
}
}
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
String subjectTemplate;
try {
from = getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(2323) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
retVal.getEmailDetails().setFrom(from);
retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
}
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
String stripVersionIds;
String deliverLatestVersion;
@ -346,7 +436,7 @@ public class SubscriptionCanonicalizer {
stripVersionIds = getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(565) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
throw new ConfigurationException(Msg.code(2324) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
@ -356,7 +446,7 @@ public class SubscriptionCanonicalizer {
if (topicExts.size() > 0) {
IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
throw new PreconditionFailedException(Msg.code(566) + "Topic reference must be an EventDefinition");
throw new PreconditionFailedException(Msg.code(2325) + "Topic reference must be an EventDefinition");
}
}
@ -408,6 +498,14 @@ public class SubscriptionCanonicalizer {
}
break;
}
case R4B: {
org.hl7.fhir.r4b.model.Subscription.SubscriptionChannelType type = ((org.hl7.fhir.r4b.model.Subscription) theSubscription).getChannel().getType();
if (type != null) {
String channelTypeCode = type.toCode();
retVal = CanonicalSubscriptionChannelType.fromCode(null, channelTypeCode);
}
break;
}
case R5: {
org.hl7.fhir.r5.model.Coding nextTypeCode = ((org.hl7.fhir.r5.model.Subscription) theSubscription).getChannelType();
CanonicalSubscriptionChannelType code = CanonicalSubscriptionChannelType.fromCode(nextTypeCode.getSystem(), nextTypeCode.getCode());
@ -416,6 +514,8 @@ public class SubscriptionCanonicalizer {
}
break;
}
default:
throw new IllegalStateException(Msg.code(2326) + "Unsupported Subscription FHIR version: " + myFhirContext.getVersion().getVersion());
}
return retVal;
@ -436,6 +536,9 @@ public class SubscriptionCanonicalizer {
case R4:
retVal = ((org.hl7.fhir.r4.model.Subscription) theSubscription).getCriteria();
break;
case R4B:
retVal = ((org.hl7.fhir.r4b.model.Subscription) theSubscription).getCriteria();
break;
case R5:
org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription;
String topicElement = subscription.getTopicElement().getValue();
@ -446,13 +549,16 @@ public class SubscriptionCanonicalizer {
}
retVal = topic.getResourceTriggerFirstRep().getQueryCriteria().getCurrent();
break;
default:
throw new IllegalStateException(Msg.code(2327) + "Subscription criteria is not supported for FHIR version: " + myFhirContext.getVersion().getVersion());
}
return retVal;
}
public void setMatchingStrategyTag(@Nonnull IBaseResource theSubscription, @Nullable SubscriptionMatchingStrategy theStrategy) {
public void setMatchingStrategyTag(@Nonnull IBaseResource theSubscription, @Nullable SubscriptionMatchingStrategy
theStrategy) {
IBaseMetaType meta = theSubscription.getMeta();
// Remove any existing strategy tag
@ -477,6 +583,8 @@ public class SubscriptionCanonicalizer {
display = "Database";
} else if (theStrategy == SubscriptionMatchingStrategy.IN_MEMORY) {
display = "In-memory";
} else if (theStrategy == SubscriptionMatchingStrategy.TOPIC) {
display = "SubscriptionTopic";
} else {
throw new IllegalStateException(Msg.code(567) + "Unknown " + SubscriptionMatchingStrategy.class.getSimpleName() + ": " + theStrategy);
}

View File

@ -46,4 +46,5 @@ public class SubscriptionConstants {
public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
public static final String ERROR_STATUS = Subscription.SubscriptionStatus.ERROR.toCode();
public static final String SUBSCRIPTION_TOPIC_PROFILE_URL = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-subscription";
}

View File

@ -77,6 +77,8 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
private boolean myCrossPartitionEnabled;
@JsonProperty("sendDeleteMessages")
private boolean mySendDeleteMessages;
private boolean myIsTopicSubscription;
/**
* Constructor
*/
@ -351,6 +353,14 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
.toString();
}
public void setTopicSubscription(boolean theTopicSubscription) {
myIsTopicSubscription = theTopicSubscription;
}
public boolean isTopicSubscription() {
return myIsTopicSubscription;
}
public static class EmailDetails implements IModelJson {
@JsonProperty("from")