From fd8b5206e769d16c70e3d2c19e1e2ad2b10547c8 Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Mon, 30 Sep 2019 15:01:47 -0400 Subject: [PATCH] Separated ChannelRegistry from ActiveSubscriptionRegistry so we can support a many-to-one relationship there All tests pass --- .../ca/uhn/fhir/jpa/config/BaseConfig.java | 2 +- .../SubscriptionMatcherInterceptor.java | 4 +- .../subscription/SubscriptionTestUtil.java | 5 +- ...inkedBlockingQueueSubscribableChannel.java | 2 +- .../module/cache/ActiveSubscription.java | 69 ++---------------- .../module/cache/ActiveSubscriptionCache.java | 12 ++-- ...ockingQueueSubscribableChannelFactory.java | 2 + .../module/cache/SubscriptionRegistry.java | 30 +++----- .../ISubscribableChannel.java | 2 +- .../ISubscribableChannelFactory.java | 2 +- .../ISubscriptionDeliveryChannelNamer.java | 2 +- .../channel/SubscriptionChannelCache.java | 47 +++++++++++++ .../SubscriptionChannelFactory.java | 13 ++-- .../channel/SubscriptionChannelRegistry.java | 62 ++++++++++++++++ .../SubscriptionChannelWithHandlers.java | 70 +++++++++++++++++++ .../SubscriptionDeliveryChannelNamer.java | 2 +- .../SubscriptionDeliveryHandlerFactory.java | 8 +-- .../module/config/BaseSubscriptionConfig.java | 2 +- .../SubscriptionMatchingSubscriber.java | 5 +- .../SubscriptionWebsocketHandler.java | 16 +++-- ...kingQueueSubscribableChannelDstu3Test.java | 33 +++++---- 21 files changed, 262 insertions(+), 128 deletions(-) rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/ISubscribableChannel.java (82%) rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/ISubscribableChannelFactory.java (94%) rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/ISubscriptionDeliveryChannelNamer.java (79%) create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/SubscriptionChannelFactory.java (76%) create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/SubscriptionDeliveryChannelNamer.java (91%) rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/{cache => channel}/SubscriptionDeliveryHandlerFactory.java (88%) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 0cc22e0ceeb..7df3756abe8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -28,7 +28,7 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java index 78090b464b1..5d052d52dad 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionMatcherInterceptor.java @@ -4,8 +4,8 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java index cac0a7fd5ac..615f05d7049 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender; import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber; @@ -24,6 +25,8 @@ public class SubscriptionTestUtil { private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; @Autowired private SubscriptionRegistry mySubscriptionRegistry; + @Autowired + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; public int getExecutorQueueSize() { LinkedBlockingQueueSubscribableChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest(); @@ -76,7 +79,7 @@ public class SubscriptionTestUtil { public void setEmailSender(IIdType theIdElement) { ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart()); - SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) activeSubscription.getDeliveryHandlerForUnitTest(); + SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) mySubscriptionChannelRegistry.get(activeSubscription.getChannelName()).getDeliveryHandlerForUnitTest(); subscriber.setEmailSender(myEmailSender); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/LinkedBlockingQueueSubscribableChannel.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/LinkedBlockingQueueSubscribableChannel.java index 5d4592a7dac..fcf61de8f30 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/LinkedBlockingQueueSubscribableChannel.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/LinkedBlockingQueueSubscribableChannel.java @@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module; * #L% */ -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; import ca.uhn.fhir.util.StopWatch; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.concurrent.BasicThreadFactory; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java index fb086300e76..ab2e464cc0c 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java @@ -22,47 +22,28 @@ package ca.uhn.fhir.jpa.subscription.module.cache; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; -import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.messaging.MessageHandler; -import java.io.Closeable; -import java.util.Collection; -import java.util.HashSet; - -public class ActiveSubscription implements Closeable { +public class ActiveSubscription { private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private CanonicalSubscription mySubscription; - private final ISubscribableChannel mySubscribableChannel; - private final Collection myDeliveryHandlerSet = new HashSet<>(); + private final String myChannelName; private boolean flagForDeletion; - public ActiveSubscription(CanonicalSubscription theSubscription, ISubscribableChannel theSubscribableChannel) { + public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) { mySubscription = theSubscription; - mySubscribableChannel = theSubscribableChannel; + myChannelName = theChannelName; } public CanonicalSubscription getSubscription() { return mySubscription; } - public ISubscribableChannel getSubscribableChannel() { - return mySubscribableChannel; - } - - public void register(MessageHandler theHandler) { - mySubscribableChannel.subscribe(theHandler); - myDeliveryHandlerSet.add(theHandler); - } - - public void unregister(MessageHandler theMessageHandler) { - if (mySubscribableChannel != null) { - mySubscribableChannel.unsubscribe(theMessageHandler); - } + public String getChannelName() { + return myChannelName; } public IIdType getIdElement(FhirContext theFhirContext) { @@ -73,11 +54,6 @@ public class ActiveSubscription implements Closeable { return mySubscription.getCriteriaString(); } - @VisibleForTesting - public MessageHandler getDeliveryHandlerForUnitTest() { - return myDeliveryHandlerSet.iterator().next(); - } - public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) { mySubscription = theCanonicalizedSubscription; } @@ -89,37 +65,4 @@ public class ActiveSubscription implements Closeable { public void setFlagForDeletion(boolean theFlagForDeletion) { flagForDeletion = theFlagForDeletion; } - - @Override - public void close() { - for (MessageHandler messageHandler : myDeliveryHandlerSet) { - unregister(messageHandler); - } - if (mySubscribableChannel instanceof DisposableBean) { - int subscriberCount = mySubscribableChannel.getSubscriberCount(); - if (subscriberCount > 0) { - ourLog.info("Channel for subscription {} still has {} subscribers. Not destroying.", mySubscription.getIdPart(), subscriberCount); - } else { - ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", mySubscription.getIdPart()); - tryDestroyChannel((DisposableBean) mySubscribableChannel); - } - } - } - - private void tryDestroyChannel(DisposableBean theSubscribableChannel) { - try { - theSubscribableChannel.destroy(); - } catch (Exception e) { - ourLog.error("Failed to destroy channel bean", e); - } - } - - /** - * Use close() instead - * KHS 15 Apr 2019 - */ - @Deprecated - public void unregisterAll() { - close(); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index 73020feaf7e..b65bf6f5da3 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ +import com.google.common.collect.Multimap; +import com.google.common.collect.MultimapBuilder; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,20 +49,20 @@ class ActiveSubscriptionCache { return myCache.size(); } - public void put(String theSubscriptionId, ActiveSubscription theValue) { - myCache.put(theSubscriptionId, theValue); + public void put(String theSubscriptionId, ActiveSubscription theActiveSubscription) { + myCache.put(theSubscriptionId, theActiveSubscription); } - public synchronized void remove(String theSubscriptionId) { + public synchronized ActiveSubscription remove(String theSubscriptionId) { Validate.notBlank(theSubscriptionId); ActiveSubscription activeSubscription = myCache.get(theSubscriptionId); if (activeSubscription == null) { - return; + return null; } - activeSubscription.close(); myCache.remove(theSubscriptionId); + return activeSubscription; } public void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java index 5cbb60bd5ec..1afe2d7acfa 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/LinkedBlockingQueueSubscribableChannelFactory.java @@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import java.util.concurrent.LinkedBlockingQueue; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index ce943a23c2b..6121045b059 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -23,14 +23,14 @@ package ca.uhn.fhir.jpa.subscription.module.cache; import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.Pointcut; -import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Subscription; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.MessageHandler; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @@ -52,11 +52,9 @@ public class SubscriptionRegistry { @Autowired SubscriptionCanonicalizer mySubscriptionCanonicalizer; @Autowired - SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; @Autowired - SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; - @Autowired - ModelConfig myModelConfig; + SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; @@ -90,21 +88,12 @@ public class SubscriptionRegistry { Validate.notNull(theSubscription); CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); - ISubscribableChannel deliveryChannel; - Optional deliveryHandler; - if (myModelConfig.isSubscriptionMatchingEnabled()) { - deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(canonicalized); - deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(canonicalized); - } else { - deliveryChannel = null; - deliveryHandler = Optional.empty(); - } + String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized); ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); - ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, deliveryChannel); - deliveryHandler.ifPresent(activeSubscription::register); - + ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName); + mySubscriptionChannelRegistry.add(activeSubscription); myActiveSubscriptionCache.put(subscriptionId, activeSubscription); // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED @@ -120,7 +109,10 @@ public class SubscriptionRegistry { String subscriptionId = theId.getIdPart(); ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue()); - myActiveSubscriptionCache.remove(subscriptionId); + ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(subscriptionId); + if (activeSubscription != null) { + mySubscriptionChannelRegistry.remove(activeSubscription); + } } @PreDestroy diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannel.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannel.java similarity index 82% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannel.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannel.java index 83a19824260..77ef2f38da1 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannel.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannel.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; import org.springframework.integration.support.management.SubscribableChannelManagement; import org.springframework.messaging.SubscribableChannel; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java similarity index 94% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java index 5be58d7f922..4d447f1edd3 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscribableChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscribableChannelFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscriptionDeliveryChannelNamer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java similarity index 79% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscriptionDeliveryChannelNamer.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java index 3c118205cc8..62ecf864bb5 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ISubscriptionDeliveryChannelNamer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/ISubscriptionDeliveryChannelNamer.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java new file mode 100644 index 00000000000..b1d446e8577 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java @@ -0,0 +1,47 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import com.google.common.collect.Multimap; +import com.google.common.collect.MultimapBuilder; +import org.apache.commons.lang3.Validate; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class SubscriptionChannelCache { + private final Map myCache = new ConcurrentHashMap<>(); + + public SubscriptionChannelWithHandlers get(String theChannelName) { + return myCache.get(theChannelName); + } + + public Collection getAll() { + return Collections.unmodifiableCollection(myCache.values()); + } + + public int size() { + return myCache.size(); + } + + public void put(String theChannelName, SubscriptionChannelWithHandlers theValue) { + myCache.put(theChannelName, theValue); + } + + public synchronized void remove(String theChannelName) { + Validate.notBlank(theChannelName); + + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = myCache.get(theChannelName); + if (subscriptionChannelWithHandlers == null) { + return; + } + + subscriptionChannelWithHandlers.close(); + myCache.remove(theChannelName); + } + + public boolean containsKey(String theChannelName) { + return myCache.containsKey(theChannelName); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java similarity index 76% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java index d394dbf3427..0395b202bbe 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L @@ -21,6 +21,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,17 +32,13 @@ public class SubscriptionChannelFactory { private ISubscribableChannelFactory mySubscribableChannelFactory; - @Autowired - ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; - @Autowired public SubscriptionChannelFactory(ISubscribableChannelFactory theSubscribableChannelFactory) { mySubscribableChannelFactory = theSubscribableChannelFactory; } - public ISubscribableChannel newDeliveryChannel(CanonicalSubscription theCanonicalSubscription) { - String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); - return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); + public ISubscribableChannel newDeliveryChannel(String theChannelName) { + return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); } public ISubscribableChannel newMatchingChannel(String theChannelName) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java new file mode 100644 index 00000000000..34d0258fd05 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java @@ -0,0 +1,62 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import com.google.common.collect.Multimap; +import com.google.common.collect.MultimapBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageHandler; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +@Component +public class SubscriptionChannelRegistry { + private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache(); + // This map is a reference count so we know to destroy the channel if there are no more active subscriptions using it + private final Multimap myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); + + @Autowired + SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; + @Autowired + SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; + @Autowired + ModelConfig myModelConfig; + + public void add(ActiveSubscription theActiveSubscription) { + if (!myModelConfig.isSubscriptionMatchingEnabled()) { + return; + } + String channelName = theActiveSubscription.getChannelName(); + myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription); + + if (mySubscriptionChannelCache.containsKey(channelName)) { + return; + } + + ISubscribableChannel deliveryChannel; + Optional deliveryHandler; + + deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName); + deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getSubscription().getChannelType()); + + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel); + deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); + mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers); + } + + public void remove(ActiveSubscription theActiveSubscription) { + String channelName = theActiveSubscription.getChannelName(); + myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription); + // FIXME KHS test + // This was the last one. Shut down the channel + if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { + SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName); + channel.close(); + } + } + + public SubscriptionChannelWithHandlers get(String theChannelName) { + return mySubscriptionChannelCache.get(theChannelName); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java new file mode 100644 index 00000000000..2ed6c40e9f6 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java @@ -0,0 +1,70 @@ +package ca.uhn.fhir.jpa.subscription.module.channel; + +import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.io.Closeable; +import java.util.Collection; +import java.util.HashSet; + +public class SubscriptionChannelWithHandlers implements Closeable { + private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); + + private final String myChannelName; + private final ISubscribableChannel mySubscribableChannel; + private final Collection myDeliveryHandlerSet = new HashSet<>(); + + public SubscriptionChannelWithHandlers(String theChannelName, ISubscribableChannel theSubscribableChannel) { + myChannelName = theChannelName; + mySubscribableChannel = theSubscribableChannel; + } + + public void addHandler(MessageHandler theHandler) { + mySubscribableChannel.subscribe(theHandler); + myDeliveryHandlerSet.add(theHandler); + } + + public void removeHandler(MessageHandler theMessageHandler) { + if (mySubscribableChannel != null) { + mySubscribableChannel.unsubscribe(theMessageHandler); + } + } + + @VisibleForTesting + public MessageHandler getDeliveryHandlerForUnitTest() { + return myDeliveryHandlerSet.iterator().next(); + } + + @Override + public void close() { + for (MessageHandler messageHandler : myDeliveryHandlerSet) { + removeHandler(messageHandler); + } + if (mySubscribableChannel instanceof DisposableBean) { + int subscriberCount = mySubscribableChannel.getSubscriberCount(); + if (subscriberCount > 0) { + ourLog.info("Channel {} still has {} subscribers. Not destroying.", myChannelName, subscriberCount); + } else { + ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", myChannelName); + tryDestroyChannel((DisposableBean) mySubscribableChannel); + } + } + } + + private void tryDestroyChannel(DisposableBean theSubscribableChannel) { + try { + theSubscribableChannel.destroy(); + } catch (Exception e) { + ourLog.error("Failed to destroy channel bean", e); + } + } + + public MessageChannel getChannel() { + return mySubscribableChannel; + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryChannelNamer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java similarity index 91% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryChannelNamer.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java index dffa7fae943..b404909013d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryChannelNamer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryChannelNamer.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import org.springframework.stereotype.Service; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java similarity index 88% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java index eb350809984..7a70c1e74bc 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionDeliveryHandlerFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionDeliveryHandlerFactory.java @@ -1,4 +1,4 @@ -package ca.uhn.fhir.jpa.subscription.module.cache; +package ca.uhn.fhir.jpa.subscription.module.channel; /*- * #%L @@ -42,10 +42,10 @@ public abstract class SubscriptionDeliveryHandlerFactory { @Lookup protected abstract SubscriptionDeliveringRestHookSubscriber getSubscriptionDeliveringRestHookSubscriber(); - public Optional createDeliveryHandler(CanonicalSubscription theSubscription) { - if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { + public Optional createDeliveryHandler(CanonicalSubscriptionChannelType theChannelType) { + if (theChannelType == CanonicalSubscriptionChannelType.EMAIL) { return Optional.of(getSubscriptionDeliveringEmailSubscriber(myEmailSender)); - } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { + } else if (theChannelType == CanonicalSubscriptionChannelType.RESTHOOK) { return Optional.of(getSubscriptionDeliveringRestHookSubscriber()); } else { return Optional.empty(); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java index fbcbd5a6194..cb9375bbd03 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/config/BaseSubscriptionConfig.java @@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.config; */ import ca.uhn.fhir.interceptor.executor.InterceptorService; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java index 28936db1428..f1928fcb659 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java @@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.EncodingEnum; @@ -61,6 +62,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { private SubscriptionRegistry mySubscriptionRegistry; @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; + @Autowired + private SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Override public void handleMessage(Message theMessage) throws MessagingException { @@ -177,7 +180,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) { boolean retval = false; ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg); - MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel(); + MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel(); if (deliveryChannel != null) { retval = true; trySendToDeliveryChannel(wrappedMsg, deliveryChannel); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java index d9dc2e8a2e7..ab94522e5c3 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/websocket/SubscriptionWebsocketHandler.java @@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber.websocket; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.IdType; @@ -45,6 +47,8 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement private static Logger ourLog = LoggerFactory.getLogger(SubscriptionWebsocketHandler.class); @Autowired protected WebsocketConnectionValidator myWebsocketConnectionValidator; + @Autowired + SubscriptionChannelRegistry mySubscriptionChannelRegistry; @Autowired private FhirContext myCtx; @@ -102,21 +106,23 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement } - private class BoundStaticSubscipriptionState implements IState, MessageHandler { + private class BoundStaticSubscriptionState implements IState, MessageHandler { private final WebSocketSession mySession; private final ActiveSubscription myActiveSubscription; - public BoundStaticSubscipriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { + public BoundStaticSubscriptionState(WebSocketSession theSession, ActiveSubscription theActiveSubscription) { mySession = theSession; myActiveSubscription = theActiveSubscription; - theActiveSubscription.register(this); + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(theActiveSubscription.getChannelName()); + subscriptionChannelWithHandlers.addHandler(this); } @Override public void closing() { - myActiveSubscription.unregister(this); + SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(myActiveSubscription.getChannelName()); + subscriptionChannelWithHandlers.removeHandler(this); } private void deliver() { @@ -172,7 +178,7 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement return null; } - myState = new BoundStaticSubscipriptionState(theSession, response.getActiveSubscription()); + myState = new BoundStaticSubscriptionState(theSession, response.getActiveSubscription()); return id; } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index 82f7e23eb33..6cbe3ae751b 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -10,8 +10,9 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; -import ca.uhn.fhir.jpa.subscription.module.cache.ISubscribableChannel; -import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannel; +import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; +import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; @@ -60,10 +61,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base IInterceptorService myInterceptorRegistry; @Autowired protected SubscriptionRegistry mySubscriptionRegistry; - @Autowired + @Autowired private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; - @Autowired + @Autowired private SubscriptionLoader mySubscriptionLoader; + @Autowired + private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; protected String myCode = "1000000050"; @@ -87,8 +90,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base canonicalSubscription.setIdElement(new IdDt("test")); canonicalSubscription.setChannelType(CanonicalSubscriptionChannelType.RESTHOOK); mySubscriptionRegistry.unregisterAllSubscriptions(); - ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(canonicalSubscription); - ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); + ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel(mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalSubscription)); + ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); } @@ -110,11 +113,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base return theResource; } - protected void initSubscriptionLoader(List subscriptions, String uuid) throws InterruptedException { + protected void initSubscriptionLoader(List subscriptions, String uuid) throws InterruptedException { myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid)); mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); } - + protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); mySubscriptionActivatedPost.setExpectedCount(1); @@ -157,11 +160,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourListenerServer.setHandler(proxyHandler); JettyUtil.startServer(ourListenerServer); - ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); - ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; - FhirContext context = ourListenerRestServer.getFhirContext(); - //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) - context.getValidationSupport().fetchAllStructureDefinitions(context); + ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + FhirContext context = ourListenerRestServer.getFhirContext(); + //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) + context.getValidationSupport().fetchAllStructureDefinitions(context); } @AfterClass @@ -206,6 +209,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base } @Override - public void clear() { updateLatch.clear();} + public void clear() { + updateLatch.clear(); + } } }