diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 0cc22e0ceeb..8545b5f05cb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -6,8 +6,8 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.executor.InterceptorService; import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider; import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor; -import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider; +import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl; import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc; import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.graphql.JpaStorageServices; @@ -28,8 +28,8 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher; import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java index ecc0dca6795..9a242ffe355 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingInterceptor.java @@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.subscription; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.interceptor.api.Hook; +import ca.uhn.fhir.interceptor.api.Interceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.config.BaseConfig; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; -import ca.uhn.fhir.interceptor.api.Hook; -import ca.uhn.fhir.interceptor.api.Interceptor; -import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants; @@ -233,7 +233,7 @@ public class SubscriptionActivatingInterceptor { private void submitResourceModified(final ResourceModifiedMessage theMsg) { switch (theMsg.getOperationType()) { case DELETE: - mySubscriptionRegistry.unregisterSubscription(theMsg.getId(myFhirContext)); + mySubscriptionRegistry.unregisterSubscription(theMsg.getId(myFhirContext).getIdPart()); break; case CREATE: case UPDATE: diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionInterceptorLoader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionInterceptorLoader.java index d90c9b4818b..1e7e01934e5 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionInterceptorLoader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionInterceptorLoader.java @@ -20,10 +20,11 @@ package ca.uhn.fhir.jpa.subscription; * #L% */ -import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.dstu2.model.Subscription; import org.slf4j.Logger; @@ -47,6 +48,8 @@ public class SubscriptionInterceptorLoader { @Autowired private SubscriptionRegistry mySubscriptionRegistry; @Autowired + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; + @Autowired private ApplicationContext myApplicationContext; @Autowired private IInterceptorService myInterceptorRegistry; @@ -68,9 +71,11 @@ public class SubscriptionInterceptorLoader { private void loadSubscriptions() { ourLog.info("Loading subscriptions into the SubscriptionRegistry..."); - // Activate scheduled subscription loads into the SubscriptionRegistry - myApplicationContext.getBean(SubscriptionLoader.class); + // Load active subscriptions into the SubscriptionRegistry and activate their channels + SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class); + loader.syncSubscriptions(); ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size()); + ourLog.info("...{} subscription channels started", mySubscriptionChannelRegistry.size()); } @VisibleForTesting diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java index 6424344be87..92d2e553fa4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java @@ -4,7 +4,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java index f17c89bd598..48bf8859030 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java @@ -143,7 +143,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc jobDetails.setRemainingResourceIds(resourceIds.stream().map(t->t.getValue()).collect(Collectors.toList())); jobDetails.setRemainingSearchUrls(searchUrls.stream().map(t->t.getValue()).collect(Collectors.toList())); if (theSubscriptionId != null) { - jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); + jobDetails.setSubscriptionId(theSubscriptionId.getIdPart()); } // Submit job for processing @@ -315,7 +315,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); - msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); + msg.setSubscriptionId(theSubscriptionId); return myExecutorService.submit(() -> { for (int i = 0; ; i++) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java index cac0a7fd5ac..bb3d30a6ce7 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java @@ -4,7 +4,8 @@ import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; -import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; import org.hl7.fhir.dstu2.model.Subscription; @@ -24,6 +25,8 @@ public class SubscriptionTestUtil { private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; @Autowired private SubscriptionRegistry mySubscriptionRegistry; + @Autowired + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; public int getExecutorQueueSize() { LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest(); @@ -76,7 +79,8 @@ public class SubscriptionTestUtil { public void setEmailSender(IIdType theIdElement) { ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart()); - SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) activeSubscription.getDeliveryHandlerForUnitTest(); + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(activeSubscription.getChannelName()); + SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) subscriptionChannelWithHandlers.getDeliveryHandlerForUnitTest(); subscriber.setEmailSender(myEmailSender); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java index b3f0d8c839c..b5b014aace8 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java @@ -399,7 +399,7 @@ public class InMemorySubscriptionMatcherR4Test { subscription.setCriteriaString(criteria); subscription.setIdElement(new IdType("Subscription", 123L)); ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE); - msg.setSubscriptionId("Subscription/123"); + msg.setSubscriptionId("123"); msg.setId(new IdType("Patient/ABC")); InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg); fail(); diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml index 356a896078c..e6cd72a5acc 100644 --- a/hapi-fhir-jpaserver-subscription/pom.xml +++ b/hapi-fhir-jpaserver-subscription/pom.xml @@ -1,4 +1,5 @@ - + 4.0.0 @@ -112,12 +113,17 @@ jackson-databind test - - ca.uhn.hapi.fhir - hapi-fhir-test-utilities - ${project.version} - test - + + ca.uhn.hapi.fhir + hapi-fhir-test-utilities + ${project.version} + test + + + org.awaitility + awaitility + test + diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java index 51a1191fe82..5289910e437 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java @@ -20,65 +20,37 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ -import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; -import com.google.common.annotations.VisibleForTesting; -import org.hl7.fhir.instance.model.api.IIdType; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.SubscribableChannel; -import java.io.Closeable; -import java.util.Collection; -import java.util.HashSet; - -public class ActiveSubscription implements Closeable { +public class ActiveSubscription { private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private CanonicalSubscription mySubscription; - private final SubscribableChannel mySubscribableChannel; - private final Collection myDeliveryHandlerSet = new HashSet<>(); + private final String myChannelName; + private final String myId; private boolean flagForDeletion; - public ActiveSubscription(CanonicalSubscription theSubscription, SubscribableChannel theSubscribableChannel) { + public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) { mySubscription = theSubscription; - mySubscribableChannel = theSubscribableChannel; + myChannelName = theChannelName; + myId = theSubscription.getIdPart(); } public CanonicalSubscription getSubscription() { return mySubscription; } - public SubscribableChannel getSubscribableChannel() { - return mySubscribableChannel; - } - - public void register(MessageHandler theHandler) { - mySubscribableChannel.subscribe(theHandler); - myDeliveryHandlerSet.add(theHandler); - } - - public void unregister(MessageHandler theMessageHandler) { - if (mySubscribableChannel != null) { - mySubscribableChannel.unsubscribe(theMessageHandler); - } - } - - public IIdType getIdElement(FhirContext theFhirContext) { - return mySubscription.getIdElement(theFhirContext); + public String getChannelName() { + return myChannelName; } public String getCriteriaString() { return mySubscription.getCriteriaString(); } - @VisibleForTesting - public MessageHandler getDeliveryHandlerForUnitTest() { - return myDeliveryHandlerSet.iterator().next(); - } - public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) { mySubscription = theCanonicalizedSubscription; } @@ -91,26 +63,11 @@ public class ActiveSubscription implements Closeable { flagForDeletion = theFlagForDeletion; } - @Override - public void close() { - for (MessageHandler messageHandler : myDeliveryHandlerSet) { - unregister(messageHandler); - } - if (mySubscribableChannel instanceof DisposableBean) { - try { - ((DisposableBean) mySubscribableChannel).destroy(); - } catch (Exception e) { - ourLog.error("Failed to destroy channel bean", e); - } - } + public String getId() { + return myId; } - /** - * Use close() instead - * KHS 15 Apr 2019 - */ - @Deprecated - public void unregisterAll() { - close(); + public CanonicalSubscriptionChannelType getChannelType() { + return mySubscription.getChannelType(); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index 73020feaf7e..e19bd4cb94a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -24,10 +24,7 @@ import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; class ActiveSubscriptionCache { @@ -47,23 +44,24 @@ class ActiveSubscriptionCache { return myCache.size(); } - public void put(String theSubscriptionId, ActiveSubscription theValue) { - myCache.put(theSubscriptionId, theValue); + public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) { + myCache.put(theSubscriptionId, theActiveSubscription); } - public synchronized void remove(String theSubscriptionId) { + public synchronized ActiveSubscription remove(String theSubscriptionId) { Validate.notBlank(theSubscriptionId); ActiveSubscription activeSubscription = myCache.get(theSubscriptionId); if (activeSubscription == null) { - return; + return null; } - activeSubscription.close(); myCache.remove(theSubscriptionId); + return activeSubscription; } - public void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { + List markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection theAllIds) { + List retval = new ArrayList<>(); for (String next : new ArrayList<>(myCache.keySet())) { ActiveSubscription activeSubscription = myCache.get(next); if (theAllIds.contains(next)) { @@ -72,11 +70,12 @@ class ActiveSubscriptionCache { } else { if (activeSubscription.isFlagForDeletion()) { ourLog.info("Unregistering Subscription/{}", next); - remove(next); + retval.add(next); } else { activeSubscription.setFlagForDeletion(true); } } } + return retval; } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java index 31f57cd0c22..435bea071f6 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java @@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import org.springframework.messaging.SubscribableChannel; import java.util.concurrent.LinkedBlockingQueue; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 05d0ab85d5b..accf81cb7d4 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -21,22 +21,24 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.interceptor.api.HookParams; -import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -48,16 +50,14 @@ import java.util.Optional; // TODO KHS Does jpa need a subscription registry if matching is disabled? @Component public class SubscriptionRegistry { - private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class); + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); @Autowired - SubscriptionCanonicalizer mySubscriptionCanonicalizer; + private SubscriptionCanonicalizer mySubscriptionCanonicalizer; @Autowired - SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; @Autowired - SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; - @Autowired - ModelConfig myModelConfig; + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; @@ -91,21 +91,12 @@ public class SubscriptionRegistry { Validate.notNull(theSubscription); CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); - SubscribableChannel deliveryChannel; - Optional deliveryHandler; - if (myModelConfig.isSubscriptionMatchingEnabled()) { - deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(subscriptionId, canonicalized.getChannelType().toCode().toLowerCase()); - deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized); - } else { - deliveryChannel = null; - deliveryHandler = Optional.empty(); - } - - ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); - ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, deliveryChannel); - deliveryHandler.ifPresent(activeSubscription::register); + String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized); + ourLog.info("Registering active subscription {}", subscriptionId); + ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName); + mySubscriptionChannelRegistry.add(activeSubscription); myActiveSubscriptionCache.put(subscriptionId, activeSubscription); // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED @@ -116,12 +107,14 @@ public class SubscriptionRegistry { return canonicalized; } - public void unregisterSubscription(IIdType theId) { - Validate.notNull(theId); - String subscriptionId = theId.getIdPart(); + public void unregisterSubscription(String theSubscriptionId) { + Validate.notNull(theSubscriptionId); - ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue()); - myActiveSubscriptionCache.remove(subscriptionId); + ourLog.info("Unregistering active subscription {}", theSubscriptionId); + ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); + if (activeSubscription != null) { + mySubscriptionChannelRegistry.remove(activeSubscription); + } } @PreDestroy @@ -133,7 +126,11 @@ public class SubscriptionRegistry { } void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { - myActiveSubscriptionCache.unregisterAllSubscriptionsNotInCollection(theAllIds); + + List idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); + for (String id : idsToDelete) { + unregisterSubscription(id); + } } public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { @@ -151,7 +148,7 @@ public class SubscriptionRegistry { updateSubscription(theSubscription); return true; } - unregisterSubscription(theSubscription.getIdElement()); + unregisterSubscription(theSubscription.getIdElement().getIdPart()); } if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { registerSubscription(theSubscription.getIdElement(), theSubscription); @@ -183,7 +180,7 @@ public class SubscriptionRegistry { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { if (hasSubscription(theSubscription.getIdElement()).isPresent()) { ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); - unregisterSubscription(theSubscription.getIdElement()); + unregisterSubscription(theSubscription.getIdElement().getIdPart()); return true; } return false; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java similarity index 94% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java index 3a3848f7227..41cff5edd13 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java new file mode 100644 index 00000000000..62ecf864bb5 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java @@ -0,0 +1,7 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; + +public interface ISubscriptionDeliveryChannelNamer { + String nameFromSubscription(CanonicalSubscription theCanonicalSubscription); +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java new file mode 100644 index 00000000000..89a5236802f --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java @@ -0,0 +1,49 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class SubscriptionChannelCache { + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); + + private final Map myCache = new ConcurrentHashMap<>(); + + public SubscriptionChannelWithHandlers get(String theChannelName) { + return myCache.get(theChannelName); + } + + public int size() { + return myCache.size(); + } + + public void put(String theChannelName, SubscriptionChannelWithHandlers theValue) { + myCache.put(theChannelName, theValue); + } + + synchronized void closeAndRemove(String theChannelName) { + Validate.notBlank(theChannelName); + + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = myCache.get(theChannelName); + if (subscriptionChannelWithHandlers == null) { + return; + } + + subscriptionChannelWithHandlers.close(); + myCache.remove(theChannelName); + } + + public boolean containsKey(String theChannelName) { + return myCache.containsKey(theChannelName); + } + + void logForUnitTest() { + for (String key : myCache.keySet()) { + ourLog.info("SubscriptionChannelCache: {}", key); + } + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java similarity index 81% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java index 4fba28bbe3c..c4a53b8d696 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L @@ -34,12 +34,8 @@ public class SubscriptionChannelFactory { mySubscribableChannelFactory = theSubscribableChannelFactory; } - public SubscribableChannel newDeliveryChannel(String theSubscriptionId, String theChannelType) { - String channelName = "subscription-delivery-" + - theChannelType + - "-" + - theSubscriptionId; - return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); + public SubscribableChannel newDeliveryChannel(String theChannelName) { + return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); } public SubscribableChannel newMatchingChannel(String theChannelName) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java new file mode 100644 index 00000000000..4fe88e31788 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java @@ -0,0 +1,99 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Multimap; +import com.google.common.collect.MultimapBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Optional; + +@Component +public class SubscriptionChannelRegistry { + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); + + private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache(); + // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it + // Key Channel Name, Value Subscription Id + private final Multimap myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); + + @Autowired + private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + @Autowired + private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; + @Autowired + private ModelConfig myModelConfig; + + public synchronized void add(ActiveSubscription theActiveSubscription) { + if (!myModelConfig.isSubscriptionMatchingEnabled()) { + return; + } + String channelName = theActiveSubscription.getChannelName(); + ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); + myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); + + if (mySubscriptionChannelCache.containsKey(channelName)) { + ourLog.info("Channel {} already exists. Not creating.", channelName); + return; + } + + SubscribableChannel deliveryChannel; + Optional deliveryHandler; + + deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName); + deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType()); + + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel); + deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); + mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers); + } + + public synchronized void remove(ActiveSubscription theActiveSubscription) { + if (!myModelConfig.isSubscriptionMatchingEnabled()) { + return; + } + String channelName = theActiveSubscription.getChannelName(); + ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName); + boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); + if (!removed) { + ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName); + } + + // This was the last one. Close and remove the channel + if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { + SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName); + if (channel != null) { + channel.close(); + } + mySubscriptionChannelCache.closeAndRemove(channelName); + } + } + + public synchronized SubscriptionChannelWithHandlers get(String theChannelName) { + return mySubscriptionChannelCache.get(theChannelName); + } + + public synchronized int size() { + return mySubscriptionChannelCache.size(); + } + + @VisibleForTesting + public void logForUnitTest() { + ourLog.info("{} Channels: {}", this, size()); + mySubscriptionChannelCache.logForUnitTest(); + for (String key : myActiveSubscriptionByChannelName.keySet()) { + Collection list = myActiveSubscriptionByChannelName.get(key); + for (String value : list) { + ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value); + } + } + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java new file mode 100644 index 00000000000..d7681f89adc --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java @@ -0,0 +1,66 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; + +import java.io.Closeable; +import java.util.Collection; +import java.util.HashSet; + +public class SubscriptionChannelWithHandlers implements Closeable { + private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); + + private final String myChannelName; + private final SubscribableChannel mySubscribableChannel; + private final Collection myDeliveryHandlerSet = new HashSet<>(); + + public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) { + myChannelName = theChannelName; + mySubscribableChannel = theSubscribableChannel; + } + + public void addHandler(MessageHandler theHandler) { + mySubscribableChannel.subscribe(theHandler); + myDeliveryHandlerSet.add(theHandler); + } + + public void removeHandler(MessageHandler theMessageHandler) { + if (mySubscribableChannel != null) { + mySubscribableChannel.unsubscribe(theMessageHandler); + } + } + + @VisibleForTesting + public MessageHandler getDeliveryHandlerForUnitTest() { + return myDeliveryHandlerSet.iterator().next(); + } + + @Override + public void close() { + for (MessageHandler messageHandler : myDeliveryHandlerSet) { + removeHandler(messageHandler); + } + if (mySubscribableChannel instanceof DisposableBean) { + tryDestroyChannel((DisposableBean) mySubscribableChannel); + } + } + + private void tryDestroyChannel(DisposableBean theSubscribableChannel) { + try { + ourLog.info("Destroying channel {}", myChannelName); + theSubscribableChannel.destroy(); + } catch (Exception e) { + ourLog.error("Failed to destroy channel bean", e); + } + } + + public MessageChannel getChannel() { + return mySubscribableChannel; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java new file mode 100644 index 00000000000..b404909013d --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java @@ -0,0 +1,17 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import org.springframework.stereotype.Service; + +@Service +public class SubscriptionDeliveryChannelNamer implements ISubscriptionDeliveryChannelNamer { + @Override + public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) { + String channelType = theCanonicalSubscription.getChannelType().toCode().toLowerCase(); + String subscriptionId = theCanonicalSubscription.getIdPart(); + return "subscription-delivery-" + + channelType + + "-" + + subscriptionId; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java similarity index 82% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java index eb350809984..d646786501b 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L @@ -20,16 +20,13 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ -import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; -import org.hl7.fhir.r4.model.Subscription; import org.springframework.beans.factory.annotation.Lookup; import org.springframework.messaging.MessageHandler; import org.springframework.stereotype.Component; -import org.thymeleaf.util.Validate; import java.util.Optional; @@ -42,10 +39,10 @@ public abstract class SubscriptionDeliveryHandlerFactory { @Lookup protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber(); - public Optional createDeliveryHandler(CanonicalSubscription theSubscription) { - if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { + public Optional createDeliveryHandler(CanonicalSubscriptionChannelType theChannelType) { + if (theChannelType == CanonicalSubscriptionChannelType.EMAIL) { return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender)); - } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { + } else if (theChannelType == CanonicalSubscriptionChannelType.RESTHOOK) { return Optional.of(getSubscriptionDeliveringRestHookSubscriber()); } else { return Optional.empty(); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java index fbcbd5a6194..b22a7e7267d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java @@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.config; */ import ca.uhn.fhir.interceptor.executor.InterceptorService; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java index bd5605beb4a..1d3d05ab409 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java @@ -65,7 +65,7 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { switch (theResourceModifiedMessage.getOperationType()) { case DELETE: if (isSubscription(theResourceModifiedMessage)) { - mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext)); + mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext).getIdPart()); } return; case CREATE: diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java index 28936db1428..e522120f7ac 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.EncodingEnum; @@ -61,6 +62,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { private SubscriptionRegistry mySubscriptionRegistry; @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; + @Autowired + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Override public void handleMessage(Message theMessage) throws MessagingException { @@ -119,6 +122,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { if (isNotBlank(theMsg.getSubscriptionId())) { if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { + // TODO KHS we should use a hash to look it up instead of this full table scan ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId()); continue; } @@ -133,7 +137,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { continue; } ourLog.debug("Subscription {} was matched by resource {} {}", - nextActiveSubscription.getSubscription().getIdElement(myFhirContext).getValue(), + nextActiveSubscription.getId(), resourceId.toUnqualifiedVersionless().getValue(), matchResult.isInMemory() ? "in-memory" : "by querying the repository"); @@ -177,12 +181,12 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { boolean retval = false; ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); - MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); + MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel(); if (deliveryChannel != null) { retval = true; trySendToDeliveryChannel(wrappedMsg, deliveryChannel); } else { - ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext)); + ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId()); } return retval; } @@ -200,7 +204,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { } private String getId(ActiveSubscription theActiveSubscription) { - return theActiveSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue(); + return theActiveSubscription.getId(); } private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java index d9dc2e8a2e7..d10cacfecea 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java @@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber.websocket; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.IdType; @@ -45,6 +47,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); @Autowired protected WebsocketConnectionValidator myWebsocketConnectionValidator; + @Autowired + SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Autowired private FhirContext myCtx; @@ -102,26 +106,28 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement } - private class BoundStaticSubscipriptionState implements IState, MessageHandler { + private class BoundStaticSubscriptionState implements IState, MessageHandler { private final WebSocketSession mySession; private final ActiveSubscription myActiveSubscription; - public BoundStaticSubscipriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { + public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { mySession = theSession; myActiveSubscription = theActiveSubscription; - theActiveSubscription.register(this); + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(theActiveSubscription.getChannelName()); + subscriptionChannelWithHandlers.addHandler(this); } @Override public void closing() { - myActiveSubscription.unregister(this); + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(myActiveSubscription.getChannelName()); + subscriptionChannelWithHandlers.removeHandler(this); } private void deliver() { try { - String payload = "ping " + myActiveSubscription.getIdElement(myCtx).getIdPart(); + String payload = "ping " + myActiveSubscription.getId(); ourLog.info("Sending WebSocket message: {}", payload); mySession.sendMessage(new TextMessage(payload)); } catch (IOException e) { @@ -153,7 +159,6 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement handleFailure(e); } } - } private class InitialState implements IState { @@ -172,7 +177,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement return null; } - myState = new BoundStaticSubscipriptionState(theSession, response.getActiveSubscription()); + myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription()); return id; } @@ -206,94 +211,3 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement } } - - -// private IIdType bingSearch(WebSocketSession theSession, String theRemaining) { -// Subscription subscription = new Subscription(); -// subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET); -// subscription.setStatus(SubscriptionStatus.ACTIVE); -// subscription.setCriteria(theRemaining); -// -// try { -// String params = theRemaining.substring(theRemaining.indexOf('?')+1); -// List paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&'); -// EncodingEnum encoding = EncodingEnum.JSON; -// for (NameValuePair nameValuePair : paramValues) { -// if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) { -// EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue()); -// if (nextEncoding != null) { -// encoding = nextEncoding; -// } -// } -// } -// -// IIdType id = ourSubscriptionDao.create(subscription).getId(); -// -// mySubscriptionPid = ourSubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id); -// mySubscriptionId = subscription.getIdElement(); -// myState = new BoundDynamicSubscriptionState(theSession, encoding); -// -// return id; -// } catch (UnprocessableEntityException e) { -// ourLog.warn("Failed to bind subscription: " + e.getMessage()); -// try { -// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage())); -// } catch (IOException e2) { -// handleFailure(e2); -// } -// } catch (Exception e) { -// handleFailure(e); -// try { -// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included")); -// } catch (IOException e2) { -// handleFailure(e2); -// } -// } -// return null; -// } - - -//private class BoundDynamicSubscriptionState implements SubscriptionWebsocketHandler.IState { -// -// private EncodingEnum myEncoding; -// private WebSocketSession mySession; -// -// public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) { -// mySession = theSession; -// myEncoding = theEncoding; -// } -// -// @Override -// public void closing() { -// ourLog.info("Deleting subscription {}", mySubscriptionId); -// try { -// ourSubscriptionDao.delete(mySubscriptionId, null); -// } catch (Exception e) { -// handleFailure(e); -// } -// } -// -// @Override -// public void deliver(List theResults) { -// try { -// for (IBaseResource nextResource : theResults) { -// ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement()); -// String encoded = myEncoding.newParser(ourCtx).encodeResourceToString(nextResource); -// String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded; -// mySession.sendMessage(new TextMessage(payload)); -// } -// } catch (IOException e) { -// handleFailure(e); -// } -// } -// -// @Override -// public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) { -// try { -// theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload())); -// } catch (IOException e) { -// handleFailure(e); -// } -// } -// -//} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java index 45bb72b0049..f74460fa338 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java @@ -1,18 +1,27 @@ package ca.uhn.fhir.jpa.subscription.module; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.module.config.TestSubscriptionDstu3Config; import ca.uhn.fhir.util.StopWatch; import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import java.util.List; import java.util.stream.Collectors; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @ContextConfiguration(classes = {TestSubscriptionDstu3Config.class}) public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { + @Autowired + protected SubscriptionRegistry mySubscriptionRegistry; + @Autowired + protected SubscriptionChannelRegistry mySubscriptionChannelRegistry; private final SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper(); @@ -53,4 +62,22 @@ public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { protected Subscription makeSubscriptionWithStatus(String theCriteria, String thePayload, String theEndpoint, Subscription.SubscriptionStatus status) { return mySubscriptionTestHelper.makeSubscriptionWithStatus(theCriteria, thePayload, theEndpoint, status); } + + protected void clearRegistry() { + mySubscriptionRegistry.unregisterAllSubscriptions(); + await().until(this::registryEmpty); + } + + private boolean registryEmpty() { + return mySubscriptionRegistry.size() == 0 && mySubscriptionChannelRegistry.size() == 0; + } + + protected void assertRegistrySize(int theSize) { + assertRegistrySize(theSize, theSize); + } + + protected void assertRegistrySize(int theSubscriptionRegistrySize, int theSubscriptionChannelRegistrySize) { + assertEquals(theSubscriptionRegistrySize, mySubscriptionRegistry.size()); + assertEquals(theSubscriptionChannelRegistrySize, mySubscriptionChannelRegistry.size()); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java index 9cccebca632..52fe9fab7c1 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java @@ -1,90 +1,101 @@ package ca.uhn.fhir.jpa.subscription.module.cache; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.model.primitive.IdDt; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.*; public class ActiveSubscriptionCacheTest { + static final String ID1 = "id1"; + static final String ID2 = "id2"; + @Test public void twoPhaseDelete() { ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); - ActiveSubscription activeSub1 = new ActiveSubscription(null, null); - String id1 = "id1"; - activeSubscriptionCache.put(id1, activeSub1); + ActiveSubscription activeSub1 = buildActiveSubscription(ID1); + activeSubscriptionCache.put(ID1, activeSub1); assertFalse(activeSub1.isFlagForDeletion()); List saveIds = new ArrayList<>(); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertTrue(activeSub1.isFlagForDeletion()); - assertNotNull(activeSubscriptionCache.get(id1)); + assertNotNull(activeSubscriptionCache.get(ID1)); + assertEquals(0, idsToDelete.size()); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); - assertNull(activeSubscriptionCache.get(id1)); + idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); + assertThat(idsToDelete, containsInAnyOrder(ID1)); + } + + private ActiveSubscription buildActiveSubscription(String theId) { + CanonicalSubscription canonicalSubscription = new CanonicalSubscription(); + canonicalSubscription.setIdElement(new IdDt(theId)); + return new ActiveSubscription(canonicalSubscription, null); } @Test public void secondPassUnflags() { ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); - ActiveSubscription activeSub1 = new ActiveSubscription(null, null); - String id1 = "id1"; + ActiveSubscription activeSub1 = buildActiveSubscription(ID1); List saveIds = new ArrayList<>(); - activeSubscriptionCache.put(id1, activeSub1); + activeSubscriptionCache.put(ID1, activeSub1); assertFalse(activeSub1.isFlagForDeletion()); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertTrue(activeSub1.isFlagForDeletion()); - assertNotNull(activeSubscriptionCache.get(id1)); + assertNotNull(activeSubscriptionCache.get(ID1)); + assertEquals(0, idsToDelete.size()); - saveIds.add(id1); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + saveIds.add(ID1); + idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertFalse(activeSub1.isFlagForDeletion()); - assertNotNull(activeSubscriptionCache.get(id1)); + assertEquals(0, idsToDelete.size()); } @Test public void onlyFlaggedDeleted() { ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); - ActiveSubscription activeSub1 = new ActiveSubscription(null, null); - String id1 = "id1"; - ActiveSubscription activeSub2 = new ActiveSubscription(null, null); - String id2 = "id2"; - activeSubscriptionCache.put(id1, activeSub1); - activeSubscriptionCache.put(id2, activeSub2); + + ActiveSubscription activeSub1 = buildActiveSubscription(ID1); + ActiveSubscription activeSub2 = buildActiveSubscription(ID2); + activeSubscriptionCache.put(activeSub1.getId(), activeSub1); + activeSubscriptionCache.put(activeSub2.getId(), activeSub2); activeSub1.setFlagForDeletion(true); List saveIds = new ArrayList<>(); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); - assertNull(activeSubscriptionCache.get(id1)); - assertNotNull(activeSubscriptionCache.get(id2)); + assertThat(idsToDelete, containsInAnyOrder(ID1)); + assertNotNull(activeSubscriptionCache.get(ID2)); assertTrue(activeSub2.isFlagForDeletion()); } @Test public void onListSavesAndUnmarksFlag() { ActiveSubscriptionCache activeSubscriptionCache = new ActiveSubscriptionCache(); - ActiveSubscription activeSub1 = new ActiveSubscription(null, null); - String id1 = "id1"; - ActiveSubscription activeSub2 = new ActiveSubscription(null, null); - String id2 = "id2"; - activeSubscriptionCache.put(id1, activeSub1); - activeSubscriptionCache.put(id2, activeSub2); + ActiveSubscription activeSub1 = buildActiveSubscription(ID1); + + ActiveSubscription activeSub2 = buildActiveSubscription(ID2); + + activeSubscriptionCache.put(ID1, activeSub1); + activeSubscriptionCache.put(ID2, activeSub2); activeSub1.setFlagForDeletion(true); List saveIds = new ArrayList<>(); - saveIds.add(id1); - saveIds.add(id2); + saveIds.add(ID1); + saveIds.add(ID2); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); - assertNotNull(activeSubscriptionCache.get(id1)); + assertNotNull(activeSubscriptionCache.get(ID1)); assertFalse(activeSub1.isFlagForDeletion()); - assertNotNull(activeSubscriptionCache.get(id2)); + assertNotNull(activeSubscriptionCache.get(ID2)); assertFalse(activeSub2.isFlagForDeletion()); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java new file mode 100644 index 00000000000..7931f0864dd --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/BaseSubscriptionRegistryTest.java @@ -0,0 +1,33 @@ +package ca.uhn.fhir.jpa.subscription.module.cache; + +import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; +import org.hl7.fhir.dstu3.model.Subscription; +import org.junit.After; + +public abstract class BaseSubscriptionRegistryTest extends BaseSubscriptionDstu3Test { + public static final String SUBSCRIPTION_ID = "1"; + public static final String ORIG_CRITERIA = "Patient?"; + public static final String NEW_CRITERIA = "Observation?"; + + @After + public void clearRegistryAfter() { + super.clearRegistry(); + } + + protected Subscription createSubscription() { + Subscription subscription = new Subscription(); + subscription.setId(SUBSCRIPTION_ID); + subscription.setCriteria(ORIG_CRITERIA); + subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); + setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK); + return subscription; + } + + protected void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) { + Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); + channel.setType(theResthook); + channel.setPayload("application/json"); + channel.setEndpoint("http://unused.test.endpoint/"); + theSubscription.setChannel(channel); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java new file mode 100644 index 00000000000..0c5924b0651 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistrySharedTest.java @@ -0,0 +1,44 @@ +package ca.uhn.fhir.jpa.subscription.module.cache; + +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; +import org.hl7.fhir.dstu3.model.Subscription; +import org.junit.Test; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.test.annotation.DirtiesContext; + +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest { + + private static final String OTHER_ID = "OTHER_ID"; + + @Configuration + public static class SpringConfig { + @Primary + @Bean + ISubscriptionDeliveryChannelNamer subscriptionDeliveryChannelNamer() { + return new SharedNamer(); + } + + private class SharedNamer implements ISubscriptionDeliveryChannelNamer { + @Override + public String nameFromSubscription(CanonicalSubscription theCanonicalSubscription) { + return "shared"; + } + } + } + + @Test + public void testTwoSubscriptionsOneChannel() { + Subscription subscription = createSubscription(); + assertRegistrySize(0); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertRegistrySize(1); + Subscription otherSubscription = createSubscription(); + otherSubscription.setId(OTHER_ID); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(otherSubscription); + assertRegistrySize(2, 1); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java index aa51e2fbf80..12c37ff025e 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java @@ -1,45 +1,24 @@ package ca.uhn.fhir.jpa.subscription.module.cache; - -import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import org.hl7.fhir.dstu3.model.Subscription; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.springframework.beans.factory.annotation.Autowired; import static org.junit.Assert.*; -public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test { - public static final String SUBSCRIPTION_ID = "1"; - public static final String ORIG_CRITERIA = "Patient?"; - public static final String NEW_CRITERIA = "Observation?"; - @Autowired - SubscriptionRegistry mySubscriptionRegistry; - - @Before - public void clearRegistryBefore() { - mySubscriptionRegistry.unregisterAllSubscriptions(); - } - - @After - public void clearRegistryAfter() { - mySubscriptionRegistry.unregisterAllSubscriptions(); - } - +public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest { @Test public void updateSubscriptionReusesActiveSubscription() { Subscription subscription = createSubscription(); - assertEquals(0, mySubscriptionRegistry.size()); + assertRegistrySize(0); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); - assertEquals(1, mySubscriptionRegistry.size()); + assertRegistrySize(1); ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); subscription.setCriteria(NEW_CRITERIA); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); - assertEquals(1, mySubscriptionRegistry.size()); + assertRegistrySize(1); ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString()); // The same object @@ -49,9 +28,10 @@ public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test { @Test public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() { Subscription subscription = createSubscription(); - assertEquals(0, mySubscriptionRegistry.size()); + assertRegistrySize(0); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); - assertEquals(1, mySubscriptionRegistry.size()); + assertRegistrySize(1); + ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); @@ -59,26 +39,21 @@ public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test { assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); - assertEquals(1, mySubscriptionRegistry.size()); + assertRegistrySize(1); + ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); // A new object assertFalse(newActiveSubscription == origActiveSubscription); } - private Subscription createSubscription() { - Subscription subscription = new Subscription(); - subscription.setId(SUBSCRIPTION_ID); - subscription.setCriteria(ORIG_CRITERIA); - subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); - setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK); - return subscription; + @Test + public void updateRemove() { + Subscription subscription = createSubscription(); + assertRegistrySize(0); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertRegistrySize(1); + mySubscriptionRegistry.unregisterSubscription(subscription.getId()); + assertRegistrySize(0); } - private void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) { - Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); - channel.setType(theResthook); - channel.setPayload("application/json"); - channel.setEndpoint("http://unused.test.endpoint/"); - theSubscription.setChannel(channel); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java new file mode 100644 index 00000000000..3871d77ef3f --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java @@ -0,0 +1,60 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import ca.uhn.fhir.model.primitive.IdDt; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +@RunWith(SpringRunner.class) +public class SubscriptionChannelRegistryTest { + private static final String TEST_CHANNEL_NAME = "TEST_CHANNEL"; + @Autowired + SubscriptionChannelRegistry mySubscriptionChannelRegistry; + @MockBean + SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + @MockBean + SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; + @MockBean + ModelConfig myModelConfig; + + @Configuration + static class SpringConfig { + @Bean + SubscriptionChannelRegistry subscriptionChannelRegistry() { + return new SubscriptionChannelRegistry(); + } + } + + + @Test + public void testAddAddRemoveRemove() { + when(myModelConfig.isSubscriptionMatchingEnabled()).thenReturn(true); + + CanonicalSubscription cansubA = new CanonicalSubscription(); + cansubA.setIdElement(new IdDt("A")); + ActiveSubscription activeSubscriptionA = new ActiveSubscription(cansubA, TEST_CHANNEL_NAME); + CanonicalSubscription cansubB = new CanonicalSubscription(); + cansubB.setIdElement(new IdDt("B")); + ActiveSubscription activeSubscriptionB = new ActiveSubscription(cansubB, TEST_CHANNEL_NAME); + + assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); + mySubscriptionChannelRegistry.add(activeSubscriptionA); + assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); + mySubscriptionChannelRegistry.add(activeSubscriptionB); + mySubscriptionChannelRegistry.remove(activeSubscriptionB); + assertNotNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); + mySubscriptionChannelRegistry.remove(activeSubscriptionA); + assertNull(mySubscriptionChannelRegistry.get(TEST_CHANNEL_NAME)); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index 1cc7bb133b2..02572f42786 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -7,13 +7,17 @@ import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.test.concurrency.IPointcutLatch; import ca.uhn.test.concurrency.PointcutLatch; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; +import ca.uhn.fhir.model.primitive.IdDt; import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.Update; @@ -57,10 +61,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base IInterceptorService myInterceptorRegistry; @Autowired protected SubscriptionRegistry mySubscriptionRegistry; - @Autowired + @Autowired private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; - @Autowired + @Autowired private SubscriptionLoader mySubscriptionLoader; + @Autowired + private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; protected String myCode = "1000000050"; @@ -80,9 +86,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourCreatedObservations.clear(); ourUpdatedObservations.clear(); ourContentTypes.clear(); + CanonicalSubscription canonicalSubscription = new CanonicalSubscription(); + canonicalSubscription.setIdElement(new IdDt("test")); + canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK); mySubscriptionRegistry.unregisterAllSubscriptions(); - ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); - ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); + ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalSubscription)); + ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); } @@ -93,6 +102,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base mySubscriptionMatchingPost.clear(); mySubscriptionActivatedPost.clear(); ourObservationListener.clear(); + super.clearRegistry(); } public T sendResource(T theResource) throws InterruptedException { @@ -104,11 +114,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base return theResource; } - protected void initSubscriptionLoader(List subscriptions, String uuid) throws InterruptedException { + protected void initSubscriptionLoader(List subscriptions, String uuid) throws InterruptedException { myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid)); mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); } - + protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); mySubscriptionActivatedPost.setExpectedCount(1); @@ -151,11 +161,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourListenerServer.setHandler(proxyHandler); JettyUtil.startServer(ourListenerServer); - ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); - ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; - FhirContext context = ourListenerRestServer.getFhirContext(); - //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) - context.getValidationSupport().fetchAllStructureDefinitions(context); + ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + FhirContext context = ourListenerRestServer.getFhirContext(); + //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) + context.getValidationSupport().fetchAllStructureDefinitions(context); } @AfterClass diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java index 926bd1f59e8..5ec3694bdbc 100755 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SearchParamLoaderTest.java @@ -16,7 +16,7 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; public class SearchParamLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test { - private static final int MOCK_FHIR_CLIENT_FAILURES = 5; + private static final int MOCK_FHIR_CLIENT_FAILURES = 3; @Autowired private MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider; @Autowired diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java index 31d0e46dec3..100f219de9f 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java @@ -10,7 +10,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { - + @Test public void testSubscriptionLoaderFhirClient() throws InterruptedException { String payload = "application/fhir+json"; @@ -22,13 +22,13 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); - mySubscriptionActivatedPost.setExpectedCount(2); + mySubscriptionActivatedPost.setExpectedCount(2); initSubscriptionLoader(subs, "uuid"); - mySubscriptionActivatedPost.awaitExpected(); + mySubscriptionActivatedPost.awaitExpected(); - ourObservationListener.setExpectedCount(1); + ourObservationListener.setExpectedCount(1); sendObservation(myCode, "SNOMED-CT"); - ourObservationListener.awaitExpected(); + ourObservationListener.awaitExpected(); waitForSize(0, ourCreatedObservations); waitForSize(1, ourUpdatedObservations); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java index 2848a55656e..929563099b7 100755 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java @@ -1,9 +1,6 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; -import ca.uhn.fhir.rest.api.server.IBundleProvider; -import ca.uhn.fhir.rest.server.SimpleBundleProvider; import org.hl7.fhir.dstu3.model.Subscription; import org.junit.After; import org.junit.Before; @@ -16,7 +13,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannelDstu3Test { - private static final int MOCK_FHIR_CLIENT_FAILURES = 5; + private static final int MOCK_FHIR_CLIENT_FAILURES = 3; @Autowired private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; @@ -41,9 +38,9 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); - mySubscriptionActivatedPost.setExpectedCount(2); + mySubscriptionActivatedPost.setExpectedCount(2); initSubscriptionLoader(subs, "uuid"); - mySubscriptionActivatedPost.awaitExpected(); + mySubscriptionActivatedPost.awaitExpected(); assertEquals(0, myMockFhirClientSubscriptionProvider.getFailCount()); } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java index 0286388164f..236fd5f8448 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/WebsocketConnectionValidatorTest.java @@ -12,7 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; -import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import static org.junit.Assert.*;