Canonicalize R4 topic subscriptions (#4913)

* WIP

* add canonicalizer for R4 topic subscriptions

* test R4 topic subscription registration

* Add support for R4 Subscription Topic Backport

* FIXME

* add filter matching support

* add simpler signature

* add resource type matching to filter

* review feedback

* fixed

* changelog

* test both ways

* add bundle test

* change how toplevel is detected since in some contexts the parent state can be a non-null pre-resource state

* added logs

* changelog

* fix v2 issues

* Clean up WIP comments

* Switch to in-memory event numbers (instead of always 1)

* Final WIP cleanup

* Msg.code

* review feedback

* review feedback

* review feedback

* review feedback

* back out import changes

* back out import changes

---------

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2023-05-29 01:32:22 -04:00 committed by GitHub
parent 46857711c9
commit b84e8c0bcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1217 additions and 388 deletions

View File

@ -0,0 +1,4 @@
---
type: add
issue: 4913
title: "Added support for Subscriptions R5 Backport to R4 as documented in http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/components.html."

View File

@ -0,0 +1,8 @@
---
type: add
issue: 4913
title: "Added support for Topic Subscription Filters. In R4B and R5, SubscriptionTopic notifications
will interpret subscription filters as FHIR Search Parameters and perform an in-memory match of the focus resource
against the search parameters. For R4 or user-generated event notifications, the caller can specify a custom
implementation of ISubscriptionTopicFilterMatcher on the call to SubscriptionTopicDispatcher.dispatch() to provide
custom filter matching logic."

View File

@ -10,11 +10,9 @@ import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
import ca.uhn.hapi.converters.canonical.VersionCanonicalizer;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.BaseResource;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.SearchParameter;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -27,7 +25,6 @@ import static ca.uhn.fhir.util.HapiExtensions.EXTENSION_SEARCHPARAM_CUSTOM_TARGE
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.match.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
@ -41,9 +42,13 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegiste
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatcher;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicPayloadBuilder;
import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
@ -138,4 +143,27 @@ public class SubscriptionProcessorConfig {
return new CompositeInMemoryDaoSubscriptionMatcher(theDaoSubscriptionMatcher, theInMemorySubscriptionMatcher);
}
@Lazy
@Bean
SubscriptionTopicPayloadBuilder subscriptionTopicPayloadBuilder(FhirContext theFhirContext) {
switch(theFhirContext.getVersion().getVersion()) {
case R4:
case R4B:
case R5:
return new SubscriptionTopicPayloadBuilder(theFhirContext);
default:
return null;
}
}
@Lazy
@Bean
SubscriptionTopicDispatcher subscriptionTopicDispatcher(FhirContext theFhirContext, SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
return new SubscriptionTopicDispatcher(theFhirContext, theSubscriptionRegistry, theSubscriptionMatchDeliverer, theSubscriptionTopicPayloadBuilder);
}
@Bean
InMemoryTopicFilterMatcher inMemoryTopicFilterMatcher(SearchParamMatcher theSearchParamMatcher) {
return new InMemoryTopicFilterMatcher(theSearchParamMatcher);
}
}

View File

@ -24,13 +24,13 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -23,6 +23,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatchRequest;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import org.hl7.fhir.instance.model.api.IBaseBundle;
@ -30,7 +31,6 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class SubscriptionDeliveryRequest {
// One of these two will be populated
@ -41,13 +41,13 @@ public class SubscriptionDeliveryRequest {
private final RequestPartitionId myRequestPartitionId;
private final String myTransactionId;
public SubscriptionDeliveryRequest(@Nonnull IBaseBundle theBundlePayload, @Nonnull ActiveSubscription theActiveSubscription, @Nonnull RestOperationTypeEnum theOperationType, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) {
public SubscriptionDeliveryRequest(IBaseBundle theBundlePayload, ActiveSubscription theActiveSubscription, SubscriptionTopicDispatchRequest theSubscriptionTopicDispatchRequest) {
myPayload = theBundlePayload;
myPayloadId = null;
myActiveSubscription = theActiveSubscription;
myRestOperationType = theOperationType;
myRequestPartitionId = theRequestPartitionId;
myTransactionId = theTransactionId;
myRestOperationType = theSubscriptionTopicDispatchRequest.getRequestType();
myRequestPartitionId = theSubscriptionTopicDispatchRequest.getRequestPartitionId();
myTransactionId = theSubscriptionTopicDispatchRequest.getTransactionId();
}
public SubscriptionDeliveryRequest(@Nonnull IBaseResource thePayload, @Nonnull ResourceModifiedMessage theMsg, @Nonnull ActiveSubscription theActiveSubscription) {
@ -68,6 +68,8 @@ public class SubscriptionDeliveryRequest {
myTransactionId = theMsg.getTransactionId();
}
public IBaseResource getPayload() {
return myPayload;
}

View File

@ -24,6 +24,8 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
import java.util.concurrent.atomic.AtomicLong;
public class ActiveSubscription {
private SubscriptionCriteriaParser.SubscriptionCriteria myCriteria;
@ -34,6 +36,7 @@ public class ActiveSubscription {
private boolean flagForDeletion;
private ChannelRetryConfiguration myRetryConfigurationParameters;
private final AtomicLong myDeliveriesCount = new AtomicLong();
public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
myChannelName = theChannelName;
@ -81,4 +84,12 @@ public class ActiveSubscription {
public ChannelRetryConfiguration getRetryConfigurationParameters() {
return myRetryConfigurationParameters;
}
public long getDeliveriesCount() {
return myDeliveriesCount.get();
}
public long incrementDeliveriesCount() {
return myDeliveriesCount.incrementAndGet();
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Subscription;

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
@ -147,10 +148,12 @@ public class SubscriptionValidatingInterceptor {
if (!finished) {
if (subscription.isTopicSubscription()) {
if (myFhirContext.getVersion().getVersion() != FhirVersionEnum.R4) { // In R4 topic subscriptions exist without a corresponidng SubscriptionTopic resource
Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(subscription.getTopic());
if (!oTopic.isPresent()) {
throw new UnprocessableEntityException(Msg.code(2322) + "No SubscriptionTopic exists with topic: " + subscription.getTopic());
}
}
} else {
validateQuery(subscription.getCriteriaString(), "Subscription.criteria");

View File

@ -22,8 +22,6 @@ package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator;
import org.springframework.context.annotation.Bean;
@ -33,11 +31,6 @@ public class SubscriptionTopicConfig {
return new SubscriptionTopicMatchingSubscriber(theFhirContext);
}
@Bean
SubscriptionTopicPayloadBuilder subscriptionTopicPayloadBuilder(FhirContext theFhirContext) {
return new SubscriptionTopicPayloadBuilder(theFhirContext);
}
@Bean
SubscriptionTopicRegistry subscriptionTopicRegistry() {
return new SubscriptionTopicRegistry();
@ -62,9 +55,4 @@ public class SubscriptionTopicConfig {
SubscriptionTopicValidatingInterceptor subscriptionTopicValidatingInterceptor(FhirContext theFhirContext, SubscriptionQueryValidator theSubscriptionQueryValidator) {
return new SubscriptionTopicValidatingInterceptor(theFhirContext, theSubscriptionQueryValidator);
}
@Bean
SubscriptionTopicDispatcher subscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
return new SubscriptionTopicDispatcher(theSubscriptionRegistry, theSubscriptionMatchDeliverer, theSubscriptionTopicPayloadBuilder);
}
}

View File

@ -0,0 +1,80 @@
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.topic.filter.ISubscriptionTopicFilterMatcher;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
public class SubscriptionTopicDispatchRequest {
@Nonnull
private final String myTopicUrl;
@Nonnull
private final List<IBaseResource> myResources;
@Nonnull
private final ISubscriptionTopicFilterMatcher mySubscriptionTopicFilterMatcher;
@Nonnull
private final RestOperationTypeEnum myRequestType;
@Nullable
private final InMemoryMatchResult myInMemoryMatchResult;
@Nullable
private final RequestPartitionId myRequestPartitionId;
@Nullable
private final String myTransactionId;
/**
* @param theTopicUrl Deliver to subscriptions for this topic
* @param theResources The list of resources to deliver. The first resource will be the primary "focus" resource per the Subscription documentation.
* This list should _not_ include the SubscriptionStatus. The SubscriptionStatus will be added as the first element to
* the delivered bundle. The reason for this is that the SubscriptionStatus needs to reference the subscription ID, which is
* not known until the bundle is delivered.
* @param theSubscriptionTopicFilterMatcher is used to match the primary "focus" resource against the subscription filters
* @param theRequestType The type of request that led to this dispatch. This determines the request type of the bundle entries
* @param theInMemoryMatchResult Information about the match event that led to this dispatch that is sent to SUBSCRIPTION_RESOURCE_MATCHED
* @param theRequestPartitionId The request partitions of the request, if any. This is used by subscriptions that need to perform repository
* operations as a part of their delivery. Those repository operations will be performed on the supplied request partitions
* @param theTransactionId The transaction ID of the request, if any. This is used for logging.
*
*/
public SubscriptionTopicDispatchRequest(@Nonnull String theTopicUrl, @Nonnull List<IBaseResource> theResources, @Nonnull ISubscriptionTopicFilterMatcher theSubscriptionTopicFilterMatcher, @Nonnull RestOperationTypeEnum theRequestType, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) {
myTopicUrl = theTopicUrl;
myResources = theResources;
mySubscriptionTopicFilterMatcher = theSubscriptionTopicFilterMatcher;
myRequestType = theRequestType;
myInMemoryMatchResult = theInMemoryMatchResult;
myRequestPartitionId = theRequestPartitionId;
myTransactionId = theTransactionId;
}
public String getTopicUrl() {
return myTopicUrl;
}
public List<IBaseResource> getResources() {
return myResources;
}
public ISubscriptionTopicFilterMatcher getSubscriptionTopicFilterMatcher() {
return mySubscriptionTopicFilterMatcher;
}
public RestOperationTypeEnum getRequestType() {
return myRequestType;
}
public InMemoryMatchResult getInMemoryMatchResult() {
return myInMemoryMatchResult;
}
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public String getTransactionId() {
return myTransactionId;
}
}

View File

@ -19,18 +19,22 @@
*/
package ca.uhn.fhir.jpa.topic;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionDeliveryRequest;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscription;
import ca.uhn.fhir.jpa.topic.filter.ISubscriptionTopicFilterMatcher;
import ca.uhn.fhir.jpa.topic.filter.SubscriptionTopicFilterUtil;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.UUID;
@ -44,11 +48,14 @@ import java.util.UUID;
* send topic notifications to all Subscription resources subscribed to that topic.
*/
public class SubscriptionTopicDispatcher {
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
private final FhirContext myFhirContext;
private final SubscriptionRegistry mySubscriptionRegistry;
private final SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
private final SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder;
public SubscriptionTopicDispatcher(SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
public SubscriptionTopicDispatcher(FhirContext theFhirContext, SubscriptionRegistry theSubscriptionRegistry, SubscriptionMatchDeliverer theSubscriptionMatchDeliverer, SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
myFhirContext = theFhirContext;
mySubscriptionRegistry = theSubscriptionRegistry;
mySubscriptionMatchDeliverer = theSubscriptionMatchDeliverer;
mySubscriptionTopicPayloadBuilder = theSubscriptionTopicPayloadBuilder;
@ -62,24 +69,28 @@ public class SubscriptionTopicDispatcher {
* This list should _not_ include the SubscriptionStatus. The SubscriptionStatus will be added as the first element to
* the delivered bundle. The reason for this is that the SubscriptionStatus needs to reference the subscription ID, which is
* not known until the bundle is delivered.
* @param theInMemoryMatchResult Information about the match event that led to this dispatch that is sent to SUBSCRIPTION_RESOURCE_MATCHED
* @param theRequestPartitionId The request partitions of the request, if any. This is used by subscriptions that need to perform repository
* operations as a part of their delivery. Those repository operations will be performed on the supplied request partitions
* @param theTransactionId The transaction ID of the request, if any. This is used for logging.
* @param theRequestType The type of request that led to this dispatch. This determines the request type of the bundle entries
* @return The number of subscription notifications that were successfully queued for delivery
*/
public int dispatch(@Nonnull String theTopicUrl, @Nonnull List<IBaseResource> theResources, @Nonnull RestOperationTypeEnum theRequestType, @Nullable InMemoryMatchResult theInMemoryMatchResult, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theTransactionId) {
public int dispatch(String theTopicUrl, List<IBaseResource> theResources, RestOperationTypeEnum theRequestType) {
SubscriptionTopicDispatchRequest subscriptionTopicDispatchRequest = new SubscriptionTopicDispatchRequest(theTopicUrl, theResources, (f, r) -> InMemoryMatchResult.successfulMatch(), theRequestType, null, null, null);
return dispatch(subscriptionTopicDispatchRequest);
}
/**
* Deliver a Subscription topic notification to all subscriptions for the given topic.
*
* @param theSubscriptionTopicDispatchRequest contains the topic URL, the list of resources to deliver, and the request type
* @return The number of subscription notifications that were successfully queued for delivery
*/
public int dispatch(SubscriptionTopicDispatchRequest theSubscriptionTopicDispatchRequest) {
int count = 0;
List<ActiveSubscription> topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(theTopicUrl);
List<ActiveSubscription> topicSubscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(theSubscriptionTopicDispatchRequest.getTopicUrl());
if (!topicSubscriptions.isEmpty()) {
for (ActiveSubscription activeSubscription : topicSubscriptions) {
// WIP STR5 apply subscription filters
IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(theResources, activeSubscription, theTopicUrl, theRequestType);
// WIP STR5 do we need to add a total? If so can do that with R5BundleFactory
bundlePayload.setId(UUID.randomUUID().toString());
SubscriptionDeliveryRequest subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(bundlePayload, activeSubscription, theRequestType, theRequestPartitionId, theTransactionId);
boolean success = mySubscriptionMatchDeliverer.deliverPayload(subscriptionDeliveryRequest, theInMemoryMatchResult);
boolean success = matchFiltersAndDeliver(theSubscriptionTopicDispatchRequest, activeSubscription);
if (success) {
count++;
}
@ -87,4 +98,33 @@ public class SubscriptionTopicDispatcher {
}
return count;
}
private boolean matchFiltersAndDeliver(SubscriptionTopicDispatchRequest theSubscriptionTopicDispatchRequest, ActiveSubscription theActiveSubscription) {
String topicUrl = theSubscriptionTopicDispatchRequest.getTopicUrl();
List<IBaseResource> resources = theSubscriptionTopicDispatchRequest.getResources();
ISubscriptionTopicFilterMatcher subscriptionTopicFilterMatcher = theSubscriptionTopicDispatchRequest.getSubscriptionTopicFilterMatcher();
if (resources.size() > 0) {
IBaseResource firstResource = resources.get(0);
String resourceType = myFhirContext.getResourceType(firstResource);
CanonicalSubscription subscription = theActiveSubscription.getSubscription();
CanonicalTopicSubscription topicSubscription = subscription.getTopicSubscription();
if (topicSubscription.hasFilters()) {
ourLog.debug("Checking if resource {} matches {} subscription filters on {}", firstResource.getIdElement().toUnqualifiedVersionless().getValue(),
topicSubscription.getFilters().size(),
subscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
if (!SubscriptionTopicFilterUtil.matchFilters(firstResource, resourceType, subscriptionTopicFilterMatcher, topicSubscription)) {
return false;
}
}
}
theActiveSubscription.incrementDeliveriesCount();
IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(resources, theActiveSubscription, topicUrl, theSubscriptionTopicDispatchRequest.getRequestType());
bundlePayload.setId(UUID.randomUUID().toString());
SubscriptionDeliveryRequest subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(bundlePayload, theActiveSubscription, theSubscriptionTopicDispatchRequest);
return mySubscriptionMatchDeliverer.deliverPayload(subscriptionDeliveryRequest, theSubscriptionTopicDispatchRequest.getInMemoryMatchResult());
}
}

View File

@ -24,8 +24,8 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Enumerations;

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDe
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -61,6 +62,8 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher;
@Autowired
private InMemoryTopicFilterMatcher myInMemoryTopicFilterMatcher;
public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
@ -106,9 +109,10 @@ public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
private int deliverToTopicSubscriptions(ResourceModifiedMessage theMsg, SubscriptionTopic theSubscriptionTopic, InMemoryMatchResult theInMemoryMatchResult) {
String topicUrl = theSubscriptionTopic.getUrl();
List<IBaseResource> matchedResource = Collections.singletonList(theMsg.getNewPayload(myFhirContext));
IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext);
List<IBaseResource> matchedResourceList = Collections.singletonList(matchedResource);
RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType();
return mySubscriptionTopicDispatcher.dispatch(topicUrl, matchedResource, restOperationType, theInMemoryMatchResult, theMsg.getPartitionId(), theMsg.getTransactionId());
return mySubscriptionTopicDispatcher.dispatch(new SubscriptionTopicDispatchRequest(topicUrl, matchedResourceList, myInMemoryTopicFilterMatcher, restOperationType, theInMemoryMatchResult, theMsg.getPartitionId(), theMsg.getTransactionId()));
}
}

View File

@ -23,48 +23,65 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.topic.status.INotificationStatusBuilder;
import ca.uhn.fhir.jpa.topic.status.R4BNotificationStatusBuilder;
import ca.uhn.fhir.jpa.topic.status.R4NotificationStatusBuilder;
import ca.uhn.fhir.jpa.topic.status.R5NotificationStatusBuilder;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Bundle;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Reference;
import org.hl7.fhir.r5.model.SubscriptionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.UUID;
public class SubscriptionTopicPayloadBuilder {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicPayloadBuilder.class);
private final FhirContext myFhirContext;
private final FhirVersionEnum myFhirVersion;
private final INotificationStatusBuilder<? extends IBaseResource> myNotificationStatusBuilder;
public SubscriptionTopicPayloadBuilder(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
myFhirVersion = myFhirContext.getVersion().getVersion();
switch (myFhirVersion) {
case R4:
myNotificationStatusBuilder = new R4NotificationStatusBuilder(myFhirContext);
break;
case R4B:
myNotificationStatusBuilder = new R4BNotificationStatusBuilder(myFhirContext);
break;
case R5:
myNotificationStatusBuilder = new R5NotificationStatusBuilder(myFhirContext);
break;
default:
throw unsupportedFhirVersionException();
}
}
public IBaseBundle buildPayload(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, RestOperationTypeEnum theRestOperationType) {
BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext);
// WIP STR5 set eventsSinceSubscriptionStart from the database
int eventsSinceSubscriptionStart = 1;
IBaseResource subscriptionStatus = buildSubscriptionStatus(theResources, theActiveSubscription, theTopicUrl, eventsSinceSubscriptionStart);
FhirVersionEnum fhirVersion = myFhirContext.getVersion().getVersion();
IBaseResource notificationStatus = myNotificationStatusBuilder.buildNotificationStatus(theResources, theActiveSubscription, theTopicUrl);
bundleBuilder.addCollectionEntry(notificationStatus);
addResources(bundleBuilder, theResources, theRestOperationType);
// WIP STR5 add support for notificationShape include, revinclude
if (fhirVersion == FhirVersionEnum.R4B) {
bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode());
subscriptionStatus = VersionConvertorFactory_43_50.convertResource((org.hl7.fhir.r5.model.SubscriptionStatus) subscriptionStatus);
} else if (fhirVersion == FhirVersionEnum.R5) {
bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode());
} else {
throw new IllegalStateException(Msg.code(2331) + "SubscriptionTopic subscriptions are not supported on FHIR version: " + fhirVersion);
// Note we need to set the bundle type after we add the resources since adding the resources automatically sets the bundle type
setBundleType(bundleBuilder);
IBaseBundle retval = bundleBuilder.getBundle();
if (ourLog.isDebugEnabled()) {
String bundle = myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(retval);
ourLog.debug("Bundle: {}", bundle);
}
// WIP STR5 is this the right type of entry? see http://hl7.org/fhir/subscriptionstatus-examples.html
// WIP STR5 Also see http://hl7.org/fhir/R4B/notification-full-resource.json.html need to conform to these
bundleBuilder.addCollectionEntry(subscriptionStatus);
return retval;
}
private static void addResources(BundleBuilder bundleBuilder, List<IBaseResource> theResources, RestOperationTypeEnum theRestOperationType) {
for (IBaseResource resource : theResources) {
switch (theRestOperationType) {
case CREATE:
@ -78,27 +95,23 @@ public class SubscriptionTopicPayloadBuilder {
break;
}
}
return bundleBuilder.getBundle();
}
private SubscriptionStatus buildSubscriptionStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl, int theEventsSinceSubscriptionStart) {
SubscriptionStatus subscriptionStatus = new SubscriptionStatus();
subscriptionStatus.setId(UUID.randomUUID().toString());
subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscriptionStatus.setType(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION);
// WIP STR5 count events since subscription start and set eventsSinceSubscriptionStart
// store counts by subscription id
subscriptionStatus.setEventsSinceSubscriptionStart(theEventsSinceSubscriptionStart);
SubscriptionStatus.SubscriptionStatusNotificationEventComponent event = subscriptionStatus.addNotificationEvent();
event.setEventNumber(theEventsSinceSubscriptionStart);
if (theResources.size() > 0) {
event.setFocus(new Reference(theResources.get(0).getIdElement()));
private void setBundleType(BundleBuilder bundleBuilder) {
switch (myFhirVersion) {
case R4:
case R4B:
bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode());
break;
case R5:
bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode());
break;
default:
throw unsupportedFhirVersionException();
}
subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext)));
subscriptionStatus.setTopic(theTopicUrl);
return subscriptionStatus;
}
private IllegalStateException unsupportedFhirVersionException() {
return new IllegalStateException(Msg.code(2331) + "SubscriptionTopic subscriptions are not supported on FHIR version: " + myFhirVersion);
}
}

View File

@ -87,7 +87,8 @@ public class SubscriptionTopicValidatingInterceptor {
// WIP STR5 add cross-partition support like in SubscriptionValidatingInterceptor
// WIP STR5 warn if can't be evaluated in memory?
// WIP STR5 warn if the SubscriptionTopic criteria can't be evaluated in memory? Do we want to annotate the
// strategy with an extension like Subscription?
if (!finished) {
subscriptionTopic.getResourceTrigger().stream()

View File

@ -0,0 +1,15 @@
package ca.uhn.fhir.jpa.topic.filter;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import org.hl7.fhir.instance.model.api.IBaseResource;
public interface ISubscriptionTopicFilterMatcher {
/**
* Match a resource against a single subscription topic filter
* @param theCanonicalTopicSubscriptionFilter
* @param theIBaseResource
* @return
*/
InMemoryMatchResult match(CanonicalTopicSubscriptionFilter theCanonicalTopicSubscriptionFilter, IBaseResource theIBaseResource);
}

View File

@ -0,0 +1,20 @@
package ca.uhn.fhir.jpa.topic.filter;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
public class InMemoryTopicFilterMatcher implements ISubscriptionTopicFilterMatcher {
private final SearchParamMatcher mySearchParamMatcher;
public InMemoryTopicFilterMatcher(SearchParamMatcher theSearchParamMatcher) {
mySearchParamMatcher = theSearchParamMatcher;
}
@Override
public InMemoryMatchResult match(CanonicalTopicSubscriptionFilter theCanonicalTopicSubscriptionFilter, IBaseResource theResource) {
return mySearchParamMatcher.match(theCanonicalTopicSubscriptionFilter.asCriteriaString(), theResource, new SystemRequestDetails());
}
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.topic.filter;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
public final class SubscriptionTopicFilterUtil {
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
private SubscriptionTopicFilterUtil() {
}
public static boolean matchFilters(@Nonnull IBaseResource theResource, @Nonnull String theResourceType, @Nonnull ISubscriptionTopicFilterMatcher theSubscriptionTopicFilterMatcher, @Nonnull CanonicalTopicSubscription topicSubscription) {
boolean match = true;
for (CanonicalTopicSubscriptionFilter filter : topicSubscription.getFilters()) {
if (filter.getResourceType() == null || "Resource".equals(filter.getResourceType()) || !filter.getResourceType().equals(theResourceType)) {
continue;
}
if (!theSubscriptionTopicFilterMatcher.match(filter, theResource).matched()) {
match = false;
ourLog.debug("Resource {} did not match filter {}. Skipping remaining filters.", theResource.getIdElement().toUnqualifiedVersionless().getValue(), filter.asCriteriaString());
break;
}
ourLog.debug("Resource {} matches filter {}", theResource.getIdElement().toUnqualifiedVersionless().getValue(), filter.asCriteriaString());
}
return match;
}
}

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.topic.status;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import java.util.List;
public interface INotificationStatusBuilder<T extends IBaseResource> {
/**
* Build a notification status resource to include as the first element in a topic subscription notification bundle
* @param theResources The resources to include in the notification bundle. It should _NOT_ include the
* notification status resource. The first resource will be the "focus" resource.
* @param theActiveSubscription The active subscription that triggered the notification
* @param theTopicUrl The topic URL of the topic subscription
* @return the notification status resource. The resource type varies depending on the FHIR version.
*/
T buildNotificationStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl);
}

View File

@ -0,0 +1,23 @@
package ca.uhn.fhir.jpa.topic.status;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_43_50;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4b.model.SubscriptionStatus;
import java.util.List;
public class R4BNotificationStatusBuilder implements INotificationStatusBuilder<SubscriptionStatus> {
private final R5NotificationStatusBuilder myR5NotificationStatusBuilder;
public R4BNotificationStatusBuilder(FhirContext theFhirContext) {
myR5NotificationStatusBuilder = new R5NotificationStatusBuilder(theFhirContext);
}
@Override
public SubscriptionStatus buildNotificationStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl) {
org.hl7.fhir.r5.model.SubscriptionStatus subscriptionStatus = myR5NotificationStatusBuilder.buildNotificationStatus(theResources, theActiveSubscription, theTopicUrl);
return (SubscriptionStatus) VersionConvertorFactory_43_50.convertResource(subscriptionStatus);
}
}

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.jpa.topic.status;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.CanonicalType;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.DateType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r5.model.SubscriptionStatus;
import java.util.Date;
import java.util.List;
import java.util.UUID;
public class R4NotificationStatusBuilder implements INotificationStatusBuilder<Parameters> {
private final FhirContext myFhirContext;
public R4NotificationStatusBuilder(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}
public Parameters buildNotificationStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl) {
Long eventNumber = theActiveSubscription.getDeliveriesCount();
// See http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Parameters-r4-notification-status.json.html
// and http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/StructureDefinition-backport-subscription-status-r4.html
Parameters parameters = new Parameters();
parameters.getMeta().addProfile(SubscriptionConstants.SUBSCRIPTION_TOPIC_STATUS);
parameters.setId(UUID.randomUUID().toString());
parameters.addParameter("subscription", new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext)));
parameters.addParameter("topic", new CanonicalType(theTopicUrl));
parameters.addParameter("status", new CodeType(Subscription.SubscriptionStatus.ACTIVE.toCode()));
parameters.addParameter("type", new CodeType(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION.toCode()));
// WIP STR5 events-since-subscription-start should be read from the database
parameters.addParameter("events-since-subscription-start", eventNumber.toString());
Parameters.ParametersParameterComponent notificationEvent = parameters.addParameter();
notificationEvent.setName("notification-event");
notificationEvent.addPart().setName("event-number").setValue(new StringType(eventNumber.toString()));
notificationEvent.addPart().setName("timestamp").setValue(new DateType(new Date()));
if (theResources.size() > 0) {
IBaseResource firstResource = theResources.get(0);
notificationEvent.addPart().setName("focus").setValue(new Reference(firstResource.getIdElement().toUnqualifiedVersionless()));
}
return parameters;
}
}

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.jpa.topic.status;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Reference;
import org.hl7.fhir.r5.model.SubscriptionStatus;
import java.util.List;
import java.util.UUID;
public class R5NotificationStatusBuilder implements INotificationStatusBuilder<SubscriptionStatus> {
private final FhirContext myFhirContext;
public R5NotificationStatusBuilder(FhirContext theFhirContext) {
myFhirContext = theFhirContext;
}
@Override
public SubscriptionStatus buildNotificationStatus(List<IBaseResource> theResources, ActiveSubscription theActiveSubscription, String theTopicUrl) {
long eventNumber = theActiveSubscription.getDeliveriesCount();
SubscriptionStatus subscriptionStatus = new SubscriptionStatus();
subscriptionStatus.setId(UUID.randomUUID().toString());
subscriptionStatus.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscriptionStatus.setType(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION);
// WIP STR5 events-since-subscription-start should be read from the database
subscriptionStatus.setEventsSinceSubscriptionStart(eventNumber);
SubscriptionStatus.SubscriptionStatusNotificationEventComponent event = subscriptionStatus.addNotificationEvent();
event.setEventNumber(eventNumber);
if (theResources.size() > 0) {
event.setFocus(new Reference(theResources.get(0).getIdElement()));
}
subscriptionStatus.setSubscription(new Reference(theActiveSubscription.getSubscription().getIdElement(myFhirContext)));
subscriptionStatus.setTopic(theTopicUrl);
return subscriptionStatus;
}
}

View File

@ -4,13 +4,13 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;

View File

@ -12,6 +12,7 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;

View File

@ -1,11 +1,11 @@
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.IntegerType;
@ -13,37 +13,151 @@ import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class SubscriptionRegistryTest {
public static final String CHANNEL_NAME = "subscription-test";
public static final String SUBSCRIPTION_ID = "123";
static FhirContext ourFhirContext = FhirContext.forR4Cached();
@Spy
SubscriptionCanonicalizer mySubscriptionCanonicalizer = new SubscriptionCanonicalizer(ourFhirContext);
@Spy
ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer = new TestChannelNamer();
@Mock
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Mock
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Mock
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
@Mock
private IInterceptorBroadcaster myInterceptorBroadcaster;
IInterceptorBroadcaster myInterceptorBroadcaster;
@InjectMocks
private SubscriptionRegistry mySubscriptionRegistry;
SubscriptionRegistry mySubscriptionRegistry;
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithRetry_createsAsExpected() {
// init
String channelName = CHANNEL_NAME;
int retryCount = 2;
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// test
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
assertTrue(registered);
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
Assertions.assertNotNull(activeSubscription.getRetryConfigurationParameters());
assertEquals(channelName, activeSubscription.getChannelName());
assertEquals(retryCount, activeSubscription.getRetryConfigurationParameters().getRetryCount());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithoutRetry_createsAsExpected() {
// init
String channelName = CHANNEL_NAME;
Subscription subscription = createSubscription();
// test
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
assertTrue(registered);
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithBadRetry_createsAsExpected() {
// init
int retryCount = -1; // invalid retry count -> no retries created
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// test
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
assertTrue(registered);
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
assertEquals(CHANNEL_NAME, activeSubscription.getChannelName());
}
@Test
void R4TopicSubscription() {
// setup
Subscription topicSubscription1 = SubscriptionTestDataHelper.buildR4TopicSubscription();
topicSubscription1.setId("topicSubscription1");
// execute
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(topicSubscription1);
// verify
assertTrue(registered);
List<ActiveSubscription> subscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(SubscriptionTestDataHelper.TEST_TOPIC);
assertThat(subscriptions, hasSize(1));
Subscription topicSubscription2 = SubscriptionTestDataHelper.buildR4TopicSubscription();
topicSubscription2.setId("topicSubscription2");
registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(topicSubscription2);
assertTrue(registered);
subscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(SubscriptionTestDataHelper.TEST_TOPIC);
assertThat(subscriptions, hasSize(2));
// Repeat registration does not register
Subscription topicSubscription3 = SubscriptionTestDataHelper.buildR4TopicSubscription();
topicSubscription3.setId("topicSubscription2");
registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(topicSubscription3);
assertFalse(registered);
assertThat(subscriptions, hasSize(2));
// Now register a subscription with a different topic
Subscription topicSubscription4 = SubscriptionTestDataHelper.buildR4TopicSubscription();
String topicSubscription4Id = "topicSubscription4";
topicSubscription4.setId(topicSubscription4Id);
String testTopic4 = "test-topic-4";
topicSubscription4.setCriteria(testTopic4);
registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(topicSubscription4);
assertTrue(registered);
// Still 2 subs with the first topic
subscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(SubscriptionTestDataHelper.TEST_TOPIC);
assertThat(subscriptions, hasSize(2));
// Now also 1 sub with a different topic
subscriptions = mySubscriptionRegistry.getTopicSubscriptionsByTopic(testTopic4);
assertThat(subscriptions, hasSize(1));
assertEquals(topicSubscription4Id, subscriptions.get(0).getId());
}
private Subscription createSubscription(Extension... theExtensions) {
Subscription subscription = new Subscription();
subscription.setId("123");
subscription.setId(SUBSCRIPTION_ID);
subscription.setCriteria("Patient");
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
Subscription.SubscriptionChannelComponent channel
@ -61,120 +175,10 @@ public class SubscriptionRegistryTest {
return subscription;
}
private CanonicalSubscription getCanonicalSubscriptionFromSubscription(Subscription theSubscription) {
CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setStatus(theSubscription.getStatus());
subscription.setCriteriaString(theSubscription.getCriteria());
Subscription.SubscriptionChannelComponent channel = theSubscription.getChannel();
HashMap<String, List<String>> extensions = new HashMap<String, List<String>>();
for (Extension ex : channel.getExtension()) {
if (!extensions.containsKey(ex.getUrl())) {
extensions.put(ex.getUrl(), new ArrayList<>());
}
extensions.get(ex.getUrl()).add(ex.getValueAsPrimitive().getValueAsString());
}
subscription.setChannelExtensions(extensions);
return subscription;
}
/**
* Will mock the subscription canonicalizer with the provided subscription
* and the channel namer with the provided name.
*
* @param theSubscription
* @param theName
*/
private void mockSubscriptionCanonicalizerAndChannelNamer(Subscription theSubscription, String theName) {
Mockito.when(mySubscriptionCanonicalizer.canonicalize(Mockito.any(Subscription.class)))
.thenReturn(getCanonicalSubscriptionFromSubscription(theSubscription));
Mockito.when(mySubscriptionDeliveryChannelNamer.nameFromSubscription(Mockito.any(CanonicalSubscription.class)))
.thenReturn(theName);
}
/**
* Verifies an ActiveSubscription was registered, and passes it back
* for further verification.
* Also verifies that the interceptor was called.
*/
private ActiveSubscription verifySubscriptionIsRegistered() {
ArgumentCaptor<ActiveSubscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(ActiveSubscription.class);
Mockito.verify(mySubscriptionChannelRegistry)
.add(subscriptionArgumentCaptor.capture());
Mockito.verify(myInterceptorBroadcaster)
.callHooks(Mockito.any(Pointcut.class), Mockito.any(HookParams.class));
return subscriptionArgumentCaptor.getValue();
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
int retryCount = 2;
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(registered);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNotNull(activeSubscription.getRetryConfigurationParameters());
Assertions.assertEquals(channelName, activeSubscription.getChannelName());
Assertions.assertEquals(retryCount, activeSubscription.getRetryConfigurationParameters().getRetryCount());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithoutRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
Subscription subscription = createSubscription();
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(created);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
}
@Test
public void registerSubscriptionUnlessAlreadyRegistered_subscriptionWithBadRetry_createsAsExpected() {
// init
String channelName = "subscription-test";
int retryCount = -1; // invalid retry count -> no retries created
Extension retryExtension = new Extension();
retryExtension.setUrl(HapiExtensions.EX_RETRY_COUNT);
retryExtension.setValue(new IntegerType(retryCount));
Subscription subscription = createSubscription(retryExtension);
// when
mockSubscriptionCanonicalizerAndChannelNamer(subscription, channelName);
// test
boolean created = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
// verify
Assertions.assertTrue(created);
ActiveSubscription activeSubscription = verifySubscriptionIsRegistered();
Assertions.assertNull(activeSubscription.getRetryConfigurationParameters());
Assertions.assertEquals(channelName, activeSubscription.getChannelName());
private class TestChannelNamer implements ISubscriptionDeliveryChannelNamer {
@Override
public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) {
return CHANNEL_NAME;
}
}
}

View File

@ -9,9 +9,9 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import org.hl7.fhir.r4b.model.CanonicalType;
import org.hl7.fhir.r4b.model.Enumerations;
import org.hl7.fhir.r4b.model.Subscription;

View File

@ -0,0 +1,111 @@
package ca.uhn.fhir.jpa.topic.filter;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import org.hl7.fhir.r4.model.Encounter;
import org.hl7.fhir.r4.model.Observation;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SubscriptionTopicFilterUtilTest {
private final ISubscriptionTopicFilterMatcher myFalseMatcher = (f, r) -> InMemoryMatchResult.noMatch();
private final ISubscriptionTopicFilterMatcher myTrueMatcher = (f, r) -> InMemoryMatchResult.successfulMatch();
private final AtomicInteger myCounter = new AtomicInteger();
private final ISubscriptionTopicFilterMatcher myTrueFalseMatcher = (f, r) -> {
if (myCounter.getAndIncrement() == 0) {
return InMemoryMatchResult.successfulMatch();
} else {
return InMemoryMatchResult.noMatch();
}
};
private final ISubscriptionTopicFilterMatcher myFalseTrueMatcher = (f, r) -> {
if (myCounter.getAndIncrement() == 0) {
return InMemoryMatchResult.noMatch();
} else {
return InMemoryMatchResult.successfulMatch();
}
};
private Observation myObservation = new Observation();
@Test
void testFalseMatchNoFilters() {
myObservation = new Observation();
CanonicalTopicSubscription topicSubscription = new CanonicalTopicSubscription();
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myFalseMatcher, topicSubscription);
assertTrue(result);
}
@Test
void testFalseMatchOneFilter() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myFalseMatcher, topicSubscription);
assertFalse(result);
}
@Test
void testFalseMatchOneFilterTypeMismatch() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123");
boolean result = SubscriptionTopicFilterUtil.matchFilters(new Encounter(), "Encounter", myFalseMatcher, topicSubscription);
assertTrue(result);
}
@Test
void testFalseMatchTwoFilters() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123", "Observation?code=456");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myFalseMatcher, topicSubscription);
assertFalse(result);
}
@Test
void testTrueMatchNoFilters() {
myObservation = new Observation();
CanonicalTopicSubscription topicSubscription = new CanonicalTopicSubscription();
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myTrueMatcher, topicSubscription);
assertTrue(result);
}
@Test
void testTrueMatchOneFilter() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myTrueMatcher, topicSubscription);
assertTrue(result);
}
@Test
void testTrueMatchTwoFilters() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123", "Observation?code=456");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myTrueMatcher, topicSubscription);
assertTrue(result);
}
@Test
void testTrueFalseMatchTwoFilters() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123", "Observation?code=456");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myTrueFalseMatcher, topicSubscription);
assertFalse(result);
}
@Test
void testFalseTrueMatchTwoFilters() {
CanonicalTopicSubscription topicSubscription = buildTopicSubscriptionWithFilter("Observation?code=123", "Observation?code=456");
boolean result = SubscriptionTopicFilterUtil.matchFilters(myObservation, "Observation", myFalseTrueMatcher, topicSubscription);
assertFalse(result);
}
@NotNull
private static CanonicalTopicSubscription buildTopicSubscriptionWithFilter(String... theQueryUrls) {
CanonicalTopicSubscription topicSubscription = new CanonicalTopicSubscription();
for (String queryUrl : theQueryUrls) {
List<CanonicalTopicSubscriptionFilter> filters = CanonicalTopicSubscriptionFilter.fromQueryUrl(queryUrl);
filters.forEach(topicSubscription::addFilter);
}
return topicSubscription;
}
}

View File

@ -4,12 +4,15 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatcher;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IIdType;
@ -22,6 +25,7 @@ import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Meta;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
import org.hl7.fhir.r4.model.StringType;
@ -34,10 +38,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@ -62,6 +68,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@Autowired(required = false)
SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@Autowired
SubscriptionTopicDispatcher mySubscriptionTopicDispatcher;
@AfterEach
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
@ -125,7 +133,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
@ -149,7 +157,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals("http://source-system.com", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
@ -170,7 +178,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals("http://other-source", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
@ -241,7 +249,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
@ -266,7 +274,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
@ -354,7 +362,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send a meta-add
ourLog.info("Sending a meta-add");
@ -407,7 +415,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send a meta-add
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -457,7 +465,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send an update with no changes
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -490,7 +498,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
IdType idElement = ourObservationProvider.getStoredResources().get(0).getIdElement();
assertEquals(observation1.getIdElement().getIdPart(), idElement.getIdPart());
@ -512,7 +520,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
idElement = ourObservationProvider.getResourceUpdates().get(1).getIdElement();
assertEquals(observation2.getIdElement().getIdPart(), idElement.getIdPart());
@ -655,7 +663,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
@ -735,7 +743,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
@ -887,10 +895,10 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification for each type
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
ourPatientProvider.waitForCreateCount(0);
ourPatientProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
}
@ -914,10 +922,10 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification for each type
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
ourPatientProvider.waitForCreateCount(0);
ourPatientProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
ourOrganizationProvider.waitForCreateCount(0);
ourOrganizationProvider.waitForUpdateCount(0);
@ -1077,7 +1085,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals(CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertThat(ourRestfulServer.getRequestHeaders().get(0), hasItem("X-Foo: FOO"));
assertThat(ourRestfulServer.getRequestHeaders().get(0), hasItem("X-Bar: BAR"));
}
@ -1292,4 +1300,39 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
}
@Test
public void testTopicSubscription() throws Exception {
Subscription subscription = SubscriptionTestDataHelper.buildR4TopicSubscription();
subscription.setIdElement(null);
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(CT_FHIR_JSON_NEW);
channel.setEndpoint(ourRestfulServer.getBaseUrl());
MethodOutcome methodOutcome = myClient.create().resource(subscription).execute();
mySubscriptionIds.add(methodOutcome.getId());
waitForActivatedSubscriptionCount(1);
String patientId = "topic-test-patient-id";
Patient patient = new Patient();
patient.setId(patientId);
patient.setActive(true);
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
int count = mySubscriptionTopicDispatcher.dispatch(SubscriptionTestDataHelper.TEST_TOPIC, List.of(patient), RestOperationTypeEnum.CREATE);
assertEquals(1, count);
waitForQueueToDrain();
ourTransactionProvider.waitForTransactionCount(1);
Bundle bundle = ourTransactionProvider.getTransactions().get(0);
assertEquals(2, bundle.getEntry().size());
Parameters parameters = (Parameters) bundle.getEntry().get(0).getResource();
// WIP STR5 assert parameters contents
Patient bundlePatient = (Patient) bundle.getEntry().get(1).getResource();
assertTrue(bundlePatient.getActive());
assertEquals(Enumerations.AdministrativeGender.FEMALE, bundlePatient.getGender());
}
}

View File

@ -3,12 +3,12 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicLoader;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.rest.annotation.Transaction;
import ca.uhn.fhir.rest.annotation.TransactionParam;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.test.concurrency.PointcutLatch;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -22,8 +22,6 @@ import org.hl7.fhir.r4b.model.SubscriptionTopic;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
@ -34,8 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR4BTest.class);
public static final String SUBSCRIPTION_TOPIC_TEST_URL = "http://example.com/topic/test";
public static final String SUBSCRIPTION_TOPIC_TEST_URL = "https://example.com/topic/test";
@Autowired
protected SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@ -80,11 +77,11 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
@Test
public void testCreate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Encounter.EncounterStatus.PLANNED, Encounter.EncounterStatus.FINISHED, SubscriptionTopic.InteractionTrigger.CREATE);
createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
waitForRegisteredSubscriptionTopicCount(1);
waitForRegisteredSubscriptionTopicCount();
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
Subscription subscription = createTopicSubscription();
waitForActivatedSubscriptionCount(1);
assertEquals(0, ourTestSystemProvider.getCount());
@ -107,11 +104,11 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
@Test
public void testUpdate() throws Exception {
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Encounter.EncounterStatus.PLANNED, Encounter.EncounterStatus.FINISHED, SubscriptionTopic.InteractionTrigger.CREATE, SubscriptionTopic.InteractionTrigger.UPDATE);
createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger.CREATE, SubscriptionTopic.InteractionTrigger.UPDATE);
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
waitForRegisteredSubscriptionTopicCount(1);
waitForRegisteredSubscriptionTopicCount();
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
Subscription subscription = createTopicSubscription();
waitForActivatedSubscriptionCount(1);
assertEquals(0, ourTestSystemProvider.getCount());
@ -119,7 +116,7 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
assertEquals(0, ourTestSystemProvider.getCount());
sentEncounter.setStatus(Encounter.EncounterStatus.FINISHED);
updateEncounter(sentEncounter, true);
updateEncounter(sentEncounter);
assertEquals(1, ourTestSystemProvider.getCount());
@ -151,8 +148,8 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
assertEquals(SUBSCRIPTION_TOPIC_TEST_URL, ss.getTopic());
}
private Subscription createTopicSubscription(String theTopicUrl) throws InterruptedException {
Subscription subscription = newSubscription(theTopicUrl, Constants.CT_FHIR_JSON_NEW);
private Subscription createTopicSubscription() throws InterruptedException {
Subscription subscription = newSubscription(SubscriptionTopicR4BTest.SUBSCRIPTION_TOPIC_TEST_URL, Constants.CT_FHIR_JSON_NEW);
subscription.getMeta().addProfile(SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL);
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
@ -162,20 +159,20 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
return retval;
}
private void waitForRegisteredSubscriptionTopicCount(int theTarget) throws Exception {
await().until(() -> subscriptionTopicRegistryHasSize(theTarget));
private void waitForRegisteredSubscriptionTopicCount() {
await().until(this::subscriptionTopicRegistryHasOneEntry);
}
private boolean subscriptionTopicRegistryHasSize(int theTarget) {
private boolean subscriptionTopicRegistryHasOneEntry() {
int size = mySubscriptionTopicRegistry.size();
if (size == theTarget) {
if (size == 1) {
return true;
}
mySubscriptionTopicLoader.doSyncResourcessForUnitTest();
return mySubscriptionTopicRegistry.size() == theTarget;
return mySubscriptionTopicRegistry.size() == 1;
}
private SubscriptionTopic createEncounterSubscriptionTopic(Encounter.EncounterStatus theFrom, Encounter.EncounterStatus theCurrent, SubscriptionTopic.InteractionTrigger... theInteractionTriggers) throws InterruptedException {
private void createEncounterSubscriptionTopic(SubscriptionTopic.InteractionTrigger... theInteractionTriggers) throws InterruptedException {
SubscriptionTopic retval = new SubscriptionTopic();
retval.setUrl(SUBSCRIPTION_TOPIC_TEST_URL);
retval.setStatus(Enumerations.PublicationStatus.ACTIVE);
@ -185,13 +182,12 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
trigger.addSupportedInteraction(interactionTrigger);
}
SubscriptionTopic.SubscriptionTopicResourceTriggerQueryCriteriaComponent queryCriteria = trigger.getQueryCriteria();
queryCriteria.setPrevious("Encounter?status=" + theFrom.toCode());
queryCriteria.setCurrent("Encounter?status=" + theCurrent.toCode());
queryCriteria.setPrevious("Encounter?status=" + Encounter.EncounterStatus.PLANNED.toCode());
queryCriteria.setCurrent("Encounter?status=" + Encounter.EncounterStatus.FINISHED.toCode());
queryCriteria.setRequireBoth(true);
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
mySubscriptionTopicDao.create(retval, mySrd);
mySubscriptionTopicsCheckedLatch.awaitExpected();
return retval;
}
private Encounter sendEncounterWithStatus(Encounter.EncounterStatus theStatus, boolean theExpectDelivery) throws InterruptedException {
@ -211,18 +207,13 @@ public class SubscriptionTopicR4BTest extends BaseSubscriptionsR4BTest {
return encounter;
}
private Encounter updateEncounter(Encounter theEncounter, boolean theExpectDelivery) throws InterruptedException {
if (theExpectDelivery) {
private void updateEncounter(Encounter theEncounter) throws InterruptedException {
mySubscriptionDeliveredLatch.setExpectedCount(1);
}
mySubscriptionTopicsCheckedLatch.setExpectedCount(1);
Encounter retval = (Encounter) myEncounterDao.update(theEncounter, mySrd).getResource();
myEncounterDao.update(theEncounter, mySrd);
mySubscriptionTopicsCheckedLatch.awaitExpected();
if (theExpectDelivery) {
mySubscriptionDeliveredLatch.awaitExpected();
}
return retval;
}
static class TestSystemProvider {
final AtomicInteger myCount = new AtomicInteger(0);

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.provider.r5.BaseResourceProviderR5Test;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
@ -53,6 +54,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -183,13 +185,17 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
return subscription;
}
protected Subscription newTopicSubscription(String theTopicUrl, String thePayload) {
protected Subscription newTopicSubscription(String theTopicUrl, String thePayload, String... theFilters) {
Subscription subscription = new Subscription();
subscription.setTopic(theTopicUrl);
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
for (String nextFilter : theFilters) {
filterComponentFromQueryString(nextFilter).forEach(subscription::addFilterBy);
}
subscription.getChannelType()
.setSystem(CanonicalSubscriptionChannelType.RESTHOOK.getSystem())
.setCode(CanonicalSubscriptionChannelType.RESTHOOK.toCode());
@ -198,6 +204,10 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
return subscription;
}
private Stream<Subscription.SubscriptionFilterByComponent> filterComponentFromQueryString(String theNextFilter) {
return CanonicalTopicSubscriptionFilter.fromQueryUrl(theNextFilter).stream().map(CanonicalTopicSubscriptionFilter::toSubscriptionFilterByComponent);
}
@PostConstruct
public void initializeOurCountHolder() {
ourCountHolder = myCountHolder;
@ -207,7 +217,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
return createResource(theResource, theExpectDelivery, 1);
}
// WIP STR5 consolidate with lambda
// TODO KHS consolidate with lambda
protected IIdType createResource(IBaseResource theResource, boolean theExpectDelivery, int theCount) throws InterruptedException {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(theResource.getClass());
if (theExpectDelivery) {
@ -284,15 +294,15 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
return retval;
}
protected static void validateSubscriptionStatus(Subscription subscription, IBaseResource sentResource, SubscriptionStatus ss) {
protected static void validateSubscriptionStatus(Subscription subscription, IBaseResource sentResource, SubscriptionStatus ss, Long theExpectedEventNumber) {
assertEquals(Enumerations.SubscriptionStatusCodes.ACTIVE, ss.getStatus());
assertEquals(SubscriptionStatus.SubscriptionNotificationType.EVENTNOTIFICATION, ss.getType());
assertEquals("1", ss.getEventsSinceSubscriptionStartElement().getValueAsString());
assertEquals(theExpectedEventNumber.toString(), ss.getEventsSinceSubscriptionStartElement().getValueAsString());
List<SubscriptionStatus.SubscriptionStatusNotificationEventComponent> notificationEvents = ss.getNotificationEvent();
assertEquals(1, notificationEvents.size());
SubscriptionStatus.SubscriptionStatusNotificationEventComponent notificationEvent = notificationEvents.get(0);
assertEquals(1, notificationEvent.getEventNumber());
assertEquals(theExpectedEventNumber, notificationEvent.getEventNumber());
assertEquals(sentResource.getIdElement().toUnqualifiedVersionless(), notificationEvent.getFocus().getReferenceElement());
assertEquals(subscription.getIdElement().toUnqualifiedVersionless(), ss.getSubscription().getReferenceElement());

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.fhir.util.Logs;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r5.model.Bundle;
@ -12,7 +13,6 @@ import org.hl7.fhir.r5.model.SubscriptionStatus;
import org.hl7.fhir.r5.model.SubscriptionTopic;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@ -20,7 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicR5Test.class);
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
@Test
public void testSubscriptionTopicRegistryBean() {
@ -28,19 +28,21 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
}
@Test
public void testRestHookSubscriptionTopicApplicationFhirJson() throws Exception {
public void testFilteredTopicSubscription() throws Exception {
//setup
// WIP SR4B test update, delete, etc
createEncounterSubscriptionTopic(Enumerations.EncounterStatus.PLANNED, Enumerations.EncounterStatus.COMPLETED, SubscriptionTopic.InteractionTrigger.CREATE);
waitForRegisteredSubscriptionTopicCount(1);
Subscription subscription = createTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL);
Subscription subscription = createTopicSubscription("Encounter?participant-type=PRPF");
waitForActivatedSubscriptionCount(1);
assertEquals(0, getSystemProviderCount());
// execute
Encounter sentEncounter = sendEncounterWithStatus(Enumerations.EncounterStatus.COMPLETED, true);
Encounter badSentEncounter = sendEncounterWithStatus(Enumerations.EncounterStatus.COMPLETED, false);
Encounter goodSentEncounter = sendEncounterWithStatusAndParticipationType(Enumerations.EncounterStatus.COMPLETED, "PRPF", true);
// verify
Bundle receivedBundle = getLastSystemProviderBundle();
@ -48,17 +50,15 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
assertEquals(2, resources.size());
SubscriptionStatus ss = (SubscriptionStatus) resources.get(0);
validateSubscriptionStatus(subscription, sentEncounter, ss);
validateSubscriptionStatus(subscription, goodSentEncounter, ss, 1L);
Encounter encounter = (Encounter) resources.get(1);
assertEquals(Enumerations.EncounterStatus.COMPLETED, encounter.getStatus());
assertEquals(sentEncounter.getIdElement(), encounter.getIdElement());
assertEquals(goodSentEncounter.getIdElement(), encounter.getIdElement());
}
private Subscription createTopicSubscription(String theTopicUrl) throws InterruptedException {
Subscription subscription = newTopicSubscription(theTopicUrl, Constants.CT_FHIR_JSON_NEW);
private Subscription createTopicSubscription(String... theFilters) throws InterruptedException {
Subscription subscription = newTopicSubscription(BaseSubscriptionsR5Test.SUBSCRIPTION_TOPIC_TEST_URL, Constants.CT_FHIR_JSON_NEW, theFilters);
return postSubscription(subscription);
}
@ -89,4 +89,15 @@ public class SubscriptionTopicR5Test extends BaseSubscriptionsR5Test {
}
private Encounter sendEncounterWithStatusAndParticipationType(Enumerations.EncounterStatus theStatus, String theParticipantType, boolean theExpectDelivery) throws InterruptedException {
Encounter encounter = new Encounter();
encounter.setStatus(theStatus);
encounter.addParticipant().addType().addCoding().setCode(theParticipantType);
IIdType id = createResource(encounter, theExpectDelivery);
encounter.setId(id);
return encounter;
}
}

View File

@ -87,7 +87,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
assertEquals(Enumerations.ObservationStatus.FINAL, obs.getStatus());
assertEquals(sentObservation.getIdElement(), obs.getIdElement());
}
@ -119,7 +119,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
Assertions.assertEquals("1", receivedObs.getIdElement().getVersionIdPart());
@ -139,7 +139,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see a second subscription notification
awaitUntilReceivedTransactionCount(2);
receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 2L);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
Assertions.assertEquals("2", receivedObs.getIdElement().getVersionIdPart());
@ -181,7 +181,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Send the transaction
sendTransaction(bundle, true);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
MatcherAssert.assertThat(receivedObs.getSubject().getReference(), matchesPattern("Patient/[0-9]+"));
}
@ -207,7 +207,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
Bundle responseBundle = sendTransaction(bundle, true);
awaitUntilReceivedTransactionCount(1);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
Observation obs = myObservationDao.read(new IdType(responseBundle.getEntry().get(0).getResponse().getLocation()), mySrd);
@ -233,7 +233,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
sendTransaction(bundle, true);
awaitUntilReceivedTransactionCount(2);
receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 2L);
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless(), mySrd);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
@ -252,7 +252,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
mySubscriptionTopicsCheckedLatch.setExpectedCount(100);
mySubscriptionDeliveredLatch.setExpectedCount(100);
// WIP STR5 I don't know the answer to this, but should we be bunching these up into a single delivery?
// WIP STR5 I don't know the answer to this, but should the server be bunching these up into a single delivery?
for (int i = 0; i < 100; i++) {
Observation observation = new Observation();
observation.getIdentifierFirstRep().setSystem("foo").setValue("ID" + i);
@ -279,7 +279,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@NotNull
private Subscription createTopicSubscription() throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -303,7 +302,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
awaitUntilReceivedTransactionCount(1);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
// Should see 1 subscription notification
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
@ -332,7 +331,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
awaitUntilReceivedTransactionCount(1);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation1);
Observation obs = assertBundleAndGetObservation(subscription, sentObservation1, 1L);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
@ -353,7 +352,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
awaitUntilReceivedTransactionCount(2);
Observation obs2 = assertBundleAndGetObservation(subscription, sentObservation2);
Observation obs2 = assertBundleAndGetObservation(subscription, sentObservation2, 2L);
Assertions.assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
@ -453,7 +452,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info(">>>2 Creating subscriptions");
Subscription subscription1 = createTopicSubscription();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_JSON_NEW);
Subscription subscription2 = postSubscription(subscription);
@ -462,7 +460,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info(">>>3 Send obs");
Observation sentObservation1 = sendObservationExpectDelivery();
awaitUntilReceivedTransactionCount(1);
Observation receivedObs = assertBundleAndGetObservation(subscription1, sentObservation1);
Observation receivedObs = assertBundleAndGetObservation(subscription1, sentObservation1, 1L);
assertEquals(Constants.CT_FHIR_JSON_NEW, getLastSystemProviderContentType());
Assertions.assertEquals("1", receivedObs.getIdElement().getVersionIdPart());
@ -543,7 +541,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
@Nonnull
private Subscription createTopicSubscription(String theTopicUrlSuffix) throws InterruptedException {
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + theTopicUrlSuffix, Constants.CT_FHIR_JSON_NEW);
return postSubscription(subscription);
@ -553,7 +550,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
public void testSubscriptionTriggerViaSubscription() throws Exception {
createSubscriptionTopic();
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -594,7 +590,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation);
Observation receivedObs = assertBundleAndGetObservation(subscription, sentObservation, 1L);
assertEquals(Constants.CT_FHIR_XML_NEW, getLastSystemProviderContentType());
ourLog.debug("Observation content: {}", myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(receivedObs));
@ -607,7 +603,6 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
ourLog.info("** About to create non-matching subscription");
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription1 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
Subscription subscription = postSubscription(subscription1);
@ -648,11 +643,9 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
createObservationSubscriptionTopic(OBS_CODE2);
waitForRegisteredSubscriptionTopicCount(2);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription3 = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE, Constants.CT_FHIR_XML_NEW);
postSubscription(subscription3);
// WIP STR5 will likely require matching TopicSubscription
Subscription subscription = newTopicSubscription(SUBSCRIPTION_TOPIC_TEST_URL + OBS_CODE2, Constants.CT_FHIR_XML_NEW);
postSubscription(subscription);
@ -702,7 +695,7 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
// Should see 1 subscription notification
awaitUntilReceivedTransactionCount(1);
assertBundleAndGetObservation(subscription, sentObservation);
assertBundleAndGetObservation(subscription, sentObservation, 1L);
// Disable
subscription.setStatus(Enumerations.SubscriptionStatusCodes.OFF);
@ -805,14 +798,14 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
observation.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("Catheter"));
createResource(observation, true);
awaitUntilReceivedTransactionCount(1);
assertBundleAndGetObservation(subscription, observation);
assertBundleAndGetObservation(subscription, observation, 1L);
}
{
Observation observation = new Observation();
observation.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("PD Catheter"));
createResource(observation, true);
awaitUntilReceivedTransactionCount(2);
assertBundleAndGetObservation(subscription, observation);
assertBundleAndGetObservation(subscription, observation, 2L);
}
{
Observation observation = new Observation();
@ -827,13 +820,13 @@ public class RestHookTestR5IT extends BaseSubscriptionsR5Test {
}
}
private Observation assertBundleAndGetObservation(Subscription subscription, Observation sentObservation) {
private Observation assertBundleAndGetObservation(Subscription subscription, Observation sentObservation, Long theExpectedEventNumber) {
Bundle receivedBundle = getLastSystemProviderBundle();
List<IBaseResource> resources = BundleUtil.toListOfResources(myFhirCtx, receivedBundle);
assertEquals(2, resources.size());
SubscriptionStatus ss = (SubscriptionStatus) resources.get(0);
validateSubscriptionStatus(subscription, sentObservation, ss);
validateSubscriptionStatus(subscription, sentObservation, ss, theExpectedEventNumber);
return (Observation) resources.get(1);
}

View File

@ -6,8 +6,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.matchesPattern;
public class BaseBinaryStorageSvcImplTest {

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.dao.expunge.IExpungeEverythingService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.api.MdmConstants;
import ca.uhn.fhir.mdm.model.CanonicalEID;
@ -32,6 +31,7 @@ import ca.uhn.fhir.mdm.svc.MdmLinkDeleteSvc;
import ca.uhn.fhir.mdm.util.EIDHelper;
import ca.uhn.fhir.mdm.util.MdmResourceUtil;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -17,9 +17,7 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.subscription.match.registry;
import org.hl7.fhir.dstu2.model.Subscription;
package ca.uhn.fhir.subscription;
public class SubscriptionConstants {
@ -43,8 +41,18 @@ public class SubscriptionConstants {
public static final int DELIVERY_EXECUTOR_QUEUE_SIZE = 1000;
public static final String SUBSCRIPTION_STATUS = "Subscription.status";
public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
public static final String ERROR_STATUS = Subscription.SubscriptionStatus.ERROR.toCode();
// These STATUS codes are unchanged from DSTU2 Subscription onwards
public static final String REQUESTED_STATUS = "requested";
public static final String ACTIVE_STATUS = "active";
public static final String ERROR_STATUS = "error";
public static final String SUBSCRIPTION_TOPIC_PROFILE_URL = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-subscription";
public static final String SUBSCRIPTION_TOPIC_FILTER_URL = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-filter-criteria";
public static final String SUBSCRIPTION_TOPIC_CHANNEL_HEARTBEAT_PERIOD_URL = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-heartbeat-period";
public static final String SUBSCRIPTION_TOPIC_CHANNEL_TIMEOUT_URL = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-timeout";
public static final String SUBSCRIPTION_TOPIC_CHANNEL_MAX_COUNT = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-max-count";
public static final String SUBSCRIPTION_TOPIC_CHANNEL_PAYLOAD_CONTENT = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-payload-content";
public static final String SUBSCRIPTION_TOPIC_STATUS = "http://hl7.org/fhir/uv/subscriptions-backport/StructureDefinition/backport-subscription-status-r4";
}

View File

@ -13,9 +13,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;

View File

@ -27,10 +27,10 @@ import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.ThreadPoolUtil;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

View File

@ -24,9 +24,9 @@ import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import org.apache.commons.lang3.Validate;
public class SubscriptionChannelFactory {

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.model.api.BasePrimitive;
import ca.uhn.fhir.model.api.ExtensionDt;
@ -34,6 +35,7 @@ import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.exceptions.FHIRException;
@ -97,11 +99,12 @@ public class SubscriptionCanonicalizer {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
retVal.setChannelType(getChannelType(theSubscription));
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
Subscription.Channel channel = subscription.getChannel();
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setHeaders(channel.getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadString(channel.getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
@ -148,11 +151,12 @@ public class SubscriptionCanonicalizer {
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelType(getChannelType(theSubscription));
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
org.hl7.fhir.dstu3.model.Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setHeaders(channel.getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadString(channel.getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
@ -162,8 +166,8 @@ public class SubscriptionCanonicalizer {
String subjectTemplate;
try {
from = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
from = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(558) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
@ -176,8 +180,8 @@ public class SubscriptionCanonicalizer {
String stripVersionIds;
String deliverLatestVersion;
try {
stripVersionIds = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
stripVersionIds = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(559) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
@ -266,24 +270,68 @@ public class SubscriptionCanonicalizer {
org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
CanonicalSubscription retVal = new CanonicalSubscription();
retVal.setStatus(subscription.getStatus());
retVal.setChannelType(getChannelType(theSubscription));
retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setHeaders(subscription.getChannel().getHeader());
org.hl7.fhir.r4.model.Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
retVal.setHeaders(channel.getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadString(channel.getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
List<org.hl7.fhir.r4.model.CanonicalType> profiles = subscription.getMeta().getProfile();
for (org.hl7.fhir.r4.model.CanonicalType next : profiles) {
if (SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL.equals(next.getValueAsString())) {
retVal.setTopicSubscription(true);
}
}
if (retVal.isTopicSubscription()) {
CanonicalTopicSubscription topicSubscription = retVal.getTopicSubscription();
topicSubscription.setTopic(getCriteria(theSubscription));
// WIP STR5 support other content types
topicSubscription.setContent(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE);
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setChannelType(getChannelType(subscription));
for (org.hl7.fhir.r4.model.Extension next : subscription.getCriteriaElement().getExtension()) {
if (SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL.equals(next.getUrl())) {
List<CanonicalTopicSubscriptionFilter> filters = CanonicalTopicSubscriptionFilter.fromQueryUrl(next.getValue().primitiveValue());
filters.forEach(topicSubscription::addFilter);
}
}
if (channel.hasExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_HEARTBEAT_PERIOD_URL)) {
org.hl7.fhir.r4.model.Extension timeoutExtension = channel.getExtensionByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_HEARTBEAT_PERIOD_URL);
topicSubscription.setHeartbeatPeriod(Integer.valueOf(timeoutExtension.getValue().primitiveValue()));
}
if (channel.hasExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_TIMEOUT_URL)) {
org.hl7.fhir.r4.model.Extension timeoutExtension = channel.getExtensionByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_TIMEOUT_URL);
topicSubscription.setTimeout(Integer.valueOf(timeoutExtension.getValue().primitiveValue()));
}
if (channel.hasExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_MAX_COUNT)) {
org.hl7.fhir.r4.model.Extension timeoutExtension = channel.getExtensionByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_MAX_COUNT);
topicSubscription.setMaxCount(Integer.valueOf(timeoutExtension.getValue().primitiveValue()));
}
if (channel.getPayloadElement().hasExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_PAYLOAD_CONTENT)) {
org.hl7.fhir.r4.model.Extension timeoutExtension = channel.getPayloadElement().getExtensionByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_PAYLOAD_CONTENT);
topicSubscription.setContent(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.fromCode(timeoutExtension.getValue().primitiveValue()));
}
} else {
retVal.setCriteriaString(getCriteria(theSubscription));
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setChannelType(getChannelType(subscription));
}
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from;
String subjectTemplate;
try {
from = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
from = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_EMAIL_FROM);
subjectTemplate = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(561) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
@ -295,8 +343,8 @@ public class SubscriptionCanonicalizer {
String stripVersionIds;
String deliverLatestVersion;
try {
stripVersionIds = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = subscription.getChannel().getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
stripVersionIds = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = channel.getExtensionString(HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(562) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
@ -312,7 +360,7 @@ public class SubscriptionCanonicalizer {
}
}
Extension extension = subscription.getChannel().getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
Extension extension = channel.getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
if (extension != null && extension.hasValue() && extension.getValue() instanceof BooleanType) {
retVal.setSendDeleteMessages(((BooleanType) extension.getValue()).booleanValue());
}
@ -328,10 +376,11 @@ public class SubscriptionCanonicalizer {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
}
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setHeaders(subscription.getChannel().getHeader());
org.hl7.fhir.r4b.model.Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
retVal.setHeaders(channel.getHeader());
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadString(channel.getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription));
@ -344,13 +393,14 @@ public class SubscriptionCanonicalizer {
if (retVal.isTopicSubscription()) {
retVal.getTopicSubscription().setTopic(getCriteria(theSubscription));
// WIP STR5 support other content types
retVal.getTopicSubscription().setContent(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE);
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setChannelType(getChannelType(subscription));
// WIP STR5 set other topic subscription fields
} else {
retVal.setCriteriaString(getCriteria(theSubscription));
retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
retVal.setEndpointUrl(channel.getEndpoint());
retVal.setChannelType(getChannelType(subscription));
}
@ -371,8 +421,8 @@ public class SubscriptionCanonicalizer {
String stripVersionIds;
String deliverLatestVersion;
try {
stripVersionIds = getExtensionString(subscription.getChannel(), HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = getExtensionString(subscription.getChannel(), HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
stripVersionIds = getExtensionString(channel, HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
deliverLatestVersion = getExtensionString(channel, HapiExtensions.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
} catch (FHIRException theE) {
throw new ConfigurationException(Msg.code(565) + "Failed to extract subscription extension(s): " + theE.getMessage(), theE);
}
@ -388,7 +438,7 @@ public class SubscriptionCanonicalizer {
}
}
org.hl7.fhir.r4b.model.Extension extension = subscription.getChannel().getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
org.hl7.fhir.r4b.model.Extension extension = channel.getExtensionByUrl(EX_SEND_DELETE_MESSAGES);
if (extension != null && extension.hasValue() && extension.hasValueBooleanType()) {
retVal.setSendDeleteMessages(extension.getValueBooleanType().booleanValue());
}
@ -401,7 +451,6 @@ public class SubscriptionCanonicalizer {
CanonicalSubscription retVal = new CanonicalSubscription();
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelExtensions(extractExtension(subscription));
retVal.setIdElement(subscription.getIdElement());
@ -422,8 +471,25 @@ public class SubscriptionCanonicalizer {
Enumerations.SubscriptionStatusCodes status = subscription.getStatus();
if (status != null) {
// WIP STR5 do all the codes map?
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
switch(status) {
case REQUESTED:
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.REQUESTED);
break;
case ACTIVE:
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.ACTIVE);
break;
case ERROR:
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.ERROR);
break;
case OFF:
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.OFF);
break;
case NULL:
case ENTEREDINERROR:
default:
ourLog.warn("Converting R5 Subscription status from {} to ERROR", status);
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.ERROR);
}
}
retVal.getTopicSubscription().setContent(subscription.getContent());
retVal.setEndpointUrl(subscription.getEndpoint());
@ -475,8 +541,7 @@ public class SubscriptionCanonicalizer {
retVal.setResourceType(theFilter.getResourceType());
retVal.setFilterParameter(theFilter.getFilterParameter());
retVal.setModifier(theFilter.getModifier());
// WIP STR5 add this once it's available
// retVal.setComparator(theFilter.getComparator());
retVal.setComparator(theFilter.getComparator());
retVal.setValue(theFilter.getValue());
return retVal;
}

View File

@ -405,6 +405,11 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
return myTopicSubscription.getHeartbeatPeriod();
}
public int getTimeout() {
assert isTopicSubscription();
return myTopicSubscription.getTimeout();
}
public int getMaxCount() {
assert isTopicSubscription();
return myTopicSubscription.getMaxCount();

View File

@ -132,4 +132,8 @@ public class CanonicalTopicSubscription {
public int hashCode() {
return new HashCodeBuilder(17, 37).append(myTopic).toHashCode();
}
public boolean hasFilters() {
return myFilters != null && !myFilters.isEmpty();
}
}

View File

@ -19,17 +19,26 @@
*/
package ca.uhn.fhir.jpa.subscription.model;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.UrlUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hl7.fhir.r5.model.Enumerations;
import org.hl7.fhir.r5.model.Subscription;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class CanonicalTopicSubscriptionFilter {
private static final Logger ourLog = Logs.getSubscriptionTopicLog();
@JsonProperty("resourceType")
String myResourceType;
@JsonProperty("filterParameter")
String myFilterParameter;
@JsonProperty("comparator")
Enumerations.SearchComparator myComparator;
@ -78,4 +87,52 @@ public class CanonicalTopicSubscriptionFilter {
public void setValue(String theValue) {
myValue = theValue;
}
public static List<CanonicalTopicSubscriptionFilter> fromQueryUrl(String theQueryUrl) {
UrlUtil.UrlParts urlParts = UrlUtil.parseUrl(theQueryUrl);
String resourceName = urlParts.getResourceType();
Map<String, String[]> params = UrlUtil.parseQueryString(urlParts.getParams());
List<CanonicalTopicSubscriptionFilter> retval = new ArrayList<>();
params.forEach((key, valueList) -> {
for (String value : valueList) {
CanonicalTopicSubscriptionFilter filter = new CanonicalTopicSubscriptionFilter();
filter.setResourceType(resourceName);
filter.setFilterParameter(key);
// WIP STR5 set modifier and comparator properly. This may be tricky without access to searchparameters,
// But this method cannot assume searchparameters exist on the server.
filter.setComparator(Enumerations.SearchComparator.EQ);
filter.setValue(value);
retval.add(filter);
}
});
return retval;
}
public Subscription.SubscriptionFilterByComponent toSubscriptionFilterByComponent() {
Subscription.SubscriptionFilterByComponent retval = new Subscription.SubscriptionFilterByComponent();
retval.setResourceType(myResourceType);
retval.setFilterParameter(myFilterParameter);
retval.setComparator(myComparator);
retval.setModifier(myModifier);
retval.setValue(myValue);
return retval;
}
public String asCriteriaString() {
String comparator = "=";
if (myComparator != null) {
switch (myComparator) {
case EQ:
comparator = "=";
break;
case NE:
comparator = ":not=";
break;
default:
ourLog.warn("Unsupported comparator: {}", myComparator);
}
}
return myResourceType + "?" + myFilterParameter + comparator + myValue;
}
}

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.primitive.BooleanDt;
import ca.uhn.fhir.subscription.SubscriptionTestDataHelper;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
@ -20,11 +21,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SubscriptionCanonicalizerTest {
private static final String TEST_TOPIC = "http://test.topic";
FhirContext r4Context = FhirContext.forR4();
private final SubscriptionCanonicalizer testedSC = new SubscriptionCanonicalizer(r4Context);
@ -51,40 +52,49 @@ class SubscriptionCanonicalizerTest {
@Test
void testCanonicalizeR4SendDeleteMessagesSetsExtensionValue() {
// setup
Subscription subscription = new Subscription();
Extension sendDeleteMessagesExtension = new Extension()
.setUrl(EX_SEND_DELETE_MESSAGES)
.setValue(new BooleanType(true));
subscription.getChannel().addExtension(sendDeleteMessagesExtension);
// execute
CanonicalSubscription canonicalSubscription = testedSC.canonicalize(subscription);
// verify
assertTrue(canonicalSubscription.getSendDeleteMessages());
}
@Test
public void testCanonicalizeDstu2SendDeleteMessages() {
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2());
//setup
SubscriptionCanonicalizer dstu2Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forDstu2Cached());
ca.uhn.fhir.model.dstu2.resource.Subscription dstu2Sub = new ca.uhn.fhir.model.dstu2.resource.Subscription();
ExtensionDt extensionDt = new ExtensionDt();
extensionDt.setUrl(EX_SEND_DELETE_MESSAGES);
extensionDt.setValue(new BooleanDt(true));
dstu2Sub.getChannel().addUndeclaredExtension(extensionDt);
// execute
CanonicalSubscription canonicalize = dstu2Canonicalizer.canonicalize(dstu2Sub);
// verify
assertTrue(canonicalize.getSendDeleteMessages());
}
@Test
public void testR5() {
// setup
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5());
SubscriptionCanonicalizer r5Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR5Cached());
org.hl7.fhir.r5.model.Subscription subscription = new org.hl7.fhir.r5.model.Subscription();
subscription.setStatus(Enumerations.SubscriptionStatusCodes.ACTIVE);
subscription.setContentType(CT_FHIR_JSON_NEW);
// WIP STR5 support different content types
subscription.setContent(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE);
subscription.setEndpoint("http://foo");
subscription.setTopic(TEST_TOPIC);
subscription.setTopic(SubscriptionTestDataHelper.TEST_TOPIC);
Coding channelType = new Coding().setSystem("http://terminology.hl7.org/CodeSystem/subscription-channel-type").setCode("rest-hook");
subscription.setChannelType(channelType);
subscription.addFilterBy(buildFilter("Observation", "param1", "value1"));
@ -93,32 +103,84 @@ class SubscriptionCanonicalizerTest {
subscription.setMaxCount(456);
// execute
CanonicalSubscription canonicalize = r5Canonicalizer.canonicalize(subscription);
CanonicalSubscription canonical = r5Canonicalizer.canonicalize(subscription);
// verify
assertEquals(Subscription.SubscriptionStatus.ACTIVE, canonicalize.getStatus());
assertEquals(CT_FHIR_JSON_NEW, canonicalize.getContentType());
assertEquals(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE, canonicalize.getContent());
assertEquals("http://foo", canonicalize.getEndpointUrl());
assertEquals(TEST_TOPIC, canonicalize.getTopic());
assertEquals(CanonicalSubscriptionChannelType.RESTHOOK, canonicalize.getChannelType());
assertThat(canonicalize.getFilters(), hasSize(2));
assertEquals(Subscription.SubscriptionStatus.ACTIVE, canonical.getStatus());
assertEquals(CT_FHIR_JSON_NEW, canonical.getContentType());
assertEquals(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE, canonical.getContent());
assertEquals("http://foo", canonical.getEndpointUrl());
assertEquals(SubscriptionTestDataHelper.TEST_TOPIC, canonical.getTopic());
assertEquals(CanonicalSubscriptionChannelType.RESTHOOK, canonical.getChannelType());
assertThat(canonical.getFilters(), hasSize(2));
CanonicalTopicSubscriptionFilter filter1 = canonicalize.getFilters().get(0);
CanonicalTopicSubscriptionFilter filter1 = canonical.getFilters().get(0);
assertEquals("Observation", filter1.getResourceType());
assertEquals("param1", filter1.getFilterParameter());
// WIP STR5 assert comparator once core libs are updated
assertEquals(Enumerations.SearchComparator.EQ, filter1.getComparator());
assertEquals(Enumerations.SearchModifierCode.EXACT, filter1.getModifier());
assertEquals("value1", filter1.getValue());
CanonicalTopicSubscriptionFilter filter2 = canonicalize.getFilters().get(1);
CanonicalTopicSubscriptionFilter filter2 = canonical.getFilters().get(1);
assertEquals("CarePlan", filter2.getResourceType());
assertEquals("param2", filter2.getFilterParameter());
// WIP STR5 assert comparator once core libs are updated
assertEquals(Enumerations.SearchModifierCode.EXACT, filter1.getModifier());
assertEquals(Enumerations.SearchComparator.EQ, filter2.getComparator());
assertEquals(Enumerations.SearchModifierCode.EXACT, filter2.getModifier());
assertEquals("value2", filter2.getValue());
assertEquals(123, canonicalize.getHeartbeatPeriod());
assertEquals(456, canonicalize.getMaxCount());
assertEquals(123, canonical.getHeartbeatPeriod());
assertEquals(456, canonical.getMaxCount());
}
@Test
void testR4Backport() {
// Example drawn from http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/Subscription-subscription-zulip.json.html
// setup
SubscriptionCanonicalizer r4Canonicalizer = new SubscriptionCanonicalizer(FhirContext.forR4Cached());
// execute
CanonicalSubscription canonical = r4Canonicalizer.canonicalize(SubscriptionTestDataHelper.buildR4TopicSubscription());
// verify
// Standard R4 stuff
assertEquals(2, canonical.getTags().size());
assertEquals("b", canonical.getTags().get("http://a"));
assertEquals("e", canonical.getTags().get("http://d"));
assertEquals("testId", canonical.getIdPart());
assertEquals("testId", canonical.getIdElementString());
assertEquals(SubscriptionTestDataHelper.TEST_ENDPOINT, canonical.getEndpointUrl());
assertEquals(CT_FHIR_JSON_NEW, canonical.getContentType());
assertThat(canonical.getHeaders(), hasSize(2));
assertEquals(SubscriptionTestDataHelper.TEST_HEADER1, canonical.getHeaders().get(0));
assertEquals(SubscriptionTestDataHelper.TEST_HEADER2, canonical.getHeaders().get(1));
assertEquals(Subscription.SubscriptionStatus.ACTIVE, canonical.getStatus());
assertEquals(CT_FHIR_JSON_NEW, canonical.getContentType());
assertEquals(org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE, canonical.getContent());
assertEquals(SubscriptionTestDataHelper.TEST_ENDPOINT, canonical.getEndpointUrl());
assertEquals(SubscriptionTestDataHelper.TEST_TOPIC, canonical.getTopic());
assertEquals(CanonicalSubscriptionChannelType.RESTHOOK, canonical.getChannelType());
assertThat(canonical.getFilters(), hasSize(2));
CanonicalTopicSubscriptionFilter filter1 = canonical.getFilters().get(0);
assertEquals("Encounter", filter1.getResourceType());
assertEquals("patient", filter1.getFilterParameter());
assertEquals(Enumerations.SearchComparator.EQ, filter1.getComparator());
assertNull(filter1.getModifier());
assertEquals("Patient/123", filter1.getValue());
CanonicalTopicSubscriptionFilter filter2 = canonical.getFilters().get(1);
assertEquals("Encounter", filter2.getResourceType());
assertEquals("status", filter2.getFilterParameter());
assertEquals(Enumerations.SearchComparator.EQ, filter2.getComparator());
assertNull(filter2.getModifier());
assertEquals("finished", filter2.getValue());
assertEquals(86400, canonical.getHeartbeatPeriod());
assertEquals(60, canonical.getTimeout());
assertEquals(20, canonical.getMaxCount());
}
@NotNull
@ -127,7 +189,7 @@ class SubscriptionCanonicalizerTest {
filter.setResourceType(theResourceType);
filter.setFilterParameter(theParam);
filter.setModifier(Enumerations.SearchModifierCode.EXACT);
// WIP STR5 add comparator once core libs are updated
filter.setComparator(Enumerations.SearchComparator.EQ);
filter.setValue(theValue);
return filter;
}

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.subscription.model;
import org.hamcrest.Matchers;
import org.hl7.fhir.r5.model.Enumerations;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertTrue;
class CanonicalTopicSubscriptionFilterTest {
@Test
void fromQueryUrl() {
String queryUrl = "/Patient?family=smith&given=stevie,elisha&family=carpenter";
List<CanonicalTopicSubscriptionFilter> filters = CanonicalTopicSubscriptionFilter.fromQueryUrl(queryUrl);
assertThat(filters, hasSize(3));
assertTrue(filters.stream().map(CanonicalTopicSubscriptionFilter::getComparator).allMatch(Enumerations.SearchComparator.EQ::equals));
assertTrue(filters.stream().map(CanonicalTopicSubscriptionFilter::getModifier).allMatch(Objects::isNull));
assertTrue(filters.stream().map(CanonicalTopicSubscriptionFilter::getResourceType).allMatch("Patient"::equals));
assertThat(filters.stream().map(CanonicalTopicSubscriptionFilter::getFilterParameter).collect(Collectors.toSet()), Matchers.containsInAnyOrder("family", "given"));
assertThat(filters.stream().map(CanonicalTopicSubscriptionFilter::getValue).collect(Collectors.toSet()), Matchers.containsInAnyOrder("smith", "stevie,elisha", "carpenter"));
}
}

View File

@ -0,0 +1,46 @@
package ca.uhn.fhir.subscription;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.PositiveIntType;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.UnsignedIntType;
import static ca.uhn.fhir.rest.api.Constants.CT_FHIR_JSON_NEW;
public class SubscriptionTestDataHelper {
public static final String TEST_TOPIC = "http://test.topic";
public static final String TEST_FILTER1 = "Encounter?patient=Patient/123";
public static final String TEST_FILTER2 = "Encounter?status=finished";
public static final String TEST_ENDPOINT = "http://rest.endpoint/path";
public static final String TEST_HEADER1 = "X-Foo: FOO";
public static final String TEST_HEADER2 = "X-Bar: BAR";
public static Subscription buildR4TopicSubscription() {
Subscription subscription = new Subscription();
// Standard R4 stuff
subscription.getMeta().addTag("http://a", "b", "c");
subscription.getMeta().addTag("http://d", "e", "f");
subscription.setId("testId");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setEndpoint(TEST_ENDPOINT);
subscription.getChannel().setPayload(CT_FHIR_JSON_NEW);
subscription.getChannel().addHeader(TEST_HEADER1);
subscription.getChannel().addHeader(TEST_HEADER2);
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
// Subscription Topic Extensions:
subscription.getMeta().addProfile(SubscriptionConstants.SUBSCRIPTION_TOPIC_PROFILE_URL);
subscription.setCriteria(TEST_TOPIC);
subscription.getCriteriaElement().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL, new StringType(TEST_FILTER1));
subscription.getCriteriaElement().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL, new StringType(TEST_FILTER2));
subscription.getChannel().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_HEARTBEAT_PERIOD_URL, new UnsignedIntType(86400));
subscription.getChannel().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_TIMEOUT_URL, new UnsignedIntType(60));
subscription.getChannel().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_MAX_COUNT, new PositiveIntType(20));
subscription.getChannel().getPayloadElement().addExtension(SubscriptionConstants.SUBSCRIPTION_TOPIC_CHANNEL_PAYLOAD_CONTENT, new CodeType("full-resource"));
return subscription;
}
}

View File

@ -26,7 +26,12 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.MetaUtil;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.*;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.ICompositeType;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;

View File

@ -41,8 +41,7 @@ public class TransactionCapturingProviderExtension<T extends IBaseBundle> implem
private static final Logger ourLog = LoggerFactory.getLogger(TransactionCapturingProviderExtension.class);
private final RestfulServerExtension myRestfulServerExtension;
private final List<T> myInputBundles = Collections.synchronizedList(new ArrayList<>());
private PlainProvider myProvider;
private final PlainProvider myProvider = new PlainProvider();
/**
* Constructor
@ -53,26 +52,26 @@ public class TransactionCapturingProviderExtension<T extends IBaseBundle> implem
@Override
public void afterEach(ExtensionContext context) throws Exception {
myProvider = new PlainProvider();
myRestfulServerExtension.getRestfulServer().unregisterProvider(myProvider);
myProvider.clear();
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
myRestfulServerExtension.getRestfulServer().registerProvider(myProvider);
myInputBundles.clear();
}
public void waitForTransactionCount(int theCount) {
assertThat(theCount, greaterThanOrEqualTo(myInputBundles.size()));
await().until(()->myInputBundles.size(), equalTo(theCount));
assertThat(theCount, greaterThanOrEqualTo(myProvider.size()));
await().until(()->myProvider.size(), equalTo(theCount));
}
public List<T> getTransactions() {
return Collections.unmodifiableList(myInputBundles);
return myProvider.getTransactions();
}
private class PlainProvider {
private final List<T> myInputBundles = Collections.synchronizedList(new ArrayList<>());
@Transaction
public T transaction(@TransactionParam T theInput) {
@ -81,6 +80,17 @@ public class TransactionCapturingProviderExtension<T extends IBaseBundle> implem
return theInput;
}
public void clear() {
myInputBundles.clear();
}
public int size() {
return myInputBundles.size();
}
public List<T> getTransactions() {
return Collections.unmodifiableList(myInputBundles);
}
}