From f0b2bb8309f56a96eb2f28dd8010f7d6057c86ee Mon Sep 17 00:00:00 2001 From: Ken Stevens Date: Tue, 1 Oct 2019 14:57:25 -0400 Subject: [PATCH] fixed triggering subscriptions --- .../subscription/SubscriptionTriggeringSvcImpl.java | 4 ++-- .../matcher/InMemorySubscriptionMatcherR4Test.java | 2 +- .../module/channel/SubscriptionChannelCache.java | 11 +++++++++++ .../module/channel/SubscriptionChannelRegistry.java | 5 +++-- .../channel/SubscriptionChannelWithHandlers.java | 13 ++++++------- .../subscriber/SubscriptionMatchingSubscriber.java | 1 + 6 files changed, 24 insertions(+), 12 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java index ea2a83daa98..0bcc69116bd 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringSvcImpl.java @@ -142,7 +142,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList())); jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList())); if (theSubscriptionId != null) { - jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); + jobDetails.setSubscriptionId(theSubscriptionId.getIdPart()); } // Submit job for processing @@ -314,7 +314,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); - msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); + msg.setSubscriptionId(theSubscriptionId); return myExecutorService.submit(() -> { for (int i = 0; ; i++) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java index b3f0d8c839c..b5b014aace8 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/module/matcher/InMemorySubscriptionMatcherR4Test.java @@ -399,7 +399,7 @@ public class InMemorySubscriptionMatcherR4Test { subscription.setCriteriaString(criteria); subscription.setIdElement(new IdType("Subscription", 123L)); ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE); - msg.setSubscriptionId("Subscription/123"); + msg.setSubscriptionId("123"); msg.setId(new IdType("Patient/ABC")); InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg); fail(); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java index b1d446e8577..0596f339385 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelCache.java @@ -1,9 +1,12 @@ package ca.uhn.fhir.jpa.subscription.module.channel; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import com.google.common.collect.Multimap; import com.google.common.collect.MultimapBuilder; import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -11,6 +14,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; class SubscriptionChannelCache { + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); + private final Map myCache = new ConcurrentHashMap<>(); public SubscriptionChannelWithHandlers get(String theChannelName) { @@ -44,4 +49,10 @@ class SubscriptionChannelCache { public boolean containsKey(String theChannelName) { return myCache.containsKey(theChannelName); } + + void logForUnitTest() { + for (String key : myCache.keySet()) { + ourLog.info("SubscriptionChannelCache: {}", key); + } + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java index 3e6689c568e..5044a7238f2 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java @@ -60,7 +60,7 @@ public class SubscriptionChannelRegistry { return; } String channelName = theActiveSubscription.getChannelName(); - ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName); + ourLog.info("Removing subscription {} from channel {}: {}", theActiveSubscription.getId() ,channelName, myActiveSubscriptionByChannelName); boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); if (!removed) { ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName); @@ -87,10 +87,11 @@ public class SubscriptionChannelRegistry { @VisibleForTesting public void logForUnitTest() { ourLog.info("{} Channels: {}", this, size()); + mySubscriptionChannelCache.logForUnitTest(); for (String key : myActiveSubscriptionByChannelName.keySet()) { Collection list = myActiveSubscriptionByChannelName.get(key); for (String value : list) { - ourLog.info("{}: {}", key, value); + ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value); } } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java index 2ed6c40e9f6..518259d34de 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelWithHandlers.java @@ -46,18 +46,13 @@ public class SubscriptionChannelWithHandlers implements Closeable { removeHandler(messageHandler); } if (mySubscribableChannel instanceof DisposableBean) { - int subscriberCount = mySubscribableChannel.getSubscriberCount(); - if (subscriberCount > 0) { - ourLog.info("Channel {} still has {} subscribers. Not destroying.", myChannelName, subscriberCount); - } else { - ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", myChannelName); - tryDestroyChannel((DisposableBean) mySubscribableChannel); - } + tryDestroyChannel((DisposableBean) mySubscribableChannel); } } private void tryDestroyChannel(DisposableBean theSubscribableChannel) { try { + ourLog.info("Destroying channel {}", myChannelName); theSubscribableChannel.destroy(); } catch (Exception e) { ourLog.error("Failed to destroy channel bean", e); @@ -67,4 +62,8 @@ public class SubscriptionChannelWithHandlers implements Closeable { public MessageChannel getChannel() { return mySubscribableChannel; } + + public String getChannelName() { + return myChannelName; + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java index 00e57b28b2b..e522120f7ac 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriber.java @@ -122,6 +122,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { if (isNotBlank(theMsg.getSubscriptionId())) { if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { + // TODO KHS we should use a hash to look it up instead of this full table scan ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId()); continue; }