diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java index a1ad210b291..c9d783cf8a7 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscription.java @@ -36,7 +36,7 @@ import java.util.HashSet; public class ActiveSubscription { private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); - private final CanonicalSubscription mySubscription; + private CanonicalSubscription mySubscription; private final SubscribableChannel mySubscribableChannel; private final Collection myDeliveryHandlerSet = new HashSet<>(); @@ -90,4 +90,8 @@ public class ActiveSubscription { public MessageHandler getDeliveryHandlerForUnitTest() { return myDeliveryHandlerSet.iterator().next(); } + + public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) { + mySubscription = theCanonicalizedSubscription; + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 7b45726e9cc..436f807fb9f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -130,6 +130,11 @@ public class SubscriptionRegistry { return false; } ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); + if (channelTypeSame(existingSubscription.get(), newSubscription)) { + ourLog.info("Channel type is same. Updating active subscription and re-using existing channel and handlers."); + updateSubscription(theSubscription); + return true; + } unregisterSubscription(theSubscription.getIdElement()); } else { ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); @@ -140,7 +145,23 @@ public class SubscriptionRegistry { } else { return false; } + } + private void updateSubscription(IBaseResource theSubscription) { + IIdType theId = theSubscription.getIdElement(); + Validate.notNull(theId); + Validate.notBlank(theId.getIdPart()); + ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart()); + Validate.notNull(activeSubscription); + CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); + activeSubscription.setSubscription(canonicalized); + + // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED + myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized); + } + + private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) { + return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType()); } public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java new file mode 100644 index 00000000000..b5fcd9c8026 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistryTest.java @@ -0,0 +1,78 @@ +package ca.uhn.fhir.jpa.subscription.module.cache; + + +import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; +import org.hl7.fhir.dstu3.model.Subscription; +import org.junit.After; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.Assert.*; + +public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test { + public static final String SUBSCRIPTION_ID = "1"; + public static final String ORIG_CRITERIA = "Patient?"; + public static final String NEW_CRITERIA = "Observation?"; + @Autowired + SubscriptionRegistry mySubscriptionRegistry; + + @After + public void clearRegistry() { + mySubscriptionRegistry.unregisterAllSubscriptions(); + } + + @Test + public void updateSubscriptionReusesActiveSubscription() { + Subscription subscription = createSubscription(); + assertEquals(0, mySubscriptionRegistry.size()); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertEquals(1, mySubscriptionRegistry.size()); + ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); + assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); + + subscription.setCriteria(NEW_CRITERIA); + assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertEquals(1, mySubscriptionRegistry.size()); + ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); + assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString()); + // The same object + assertTrue(newActiveSubscription == origActiveSubscription); + } + + @Test + public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() { + Subscription subscription = createSubscription(); + assertEquals(0, mySubscriptionRegistry.size()); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertEquals(1, mySubscriptionRegistry.size()); + ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); + assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); + + setChannel(subscription, Subscription.SubscriptionChannelType.EMAIL); + + assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString()); + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription); + assertEquals(1, mySubscriptionRegistry.size()); + ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID); + // A new object + assertFalse(newActiveSubscription == origActiveSubscription); + } + + private Subscription createSubscription() { + Subscription subscription = new Subscription(); + subscription.setId(SUBSCRIPTION_ID); + subscription.setCriteria(ORIG_CRITERIA); + subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); + setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK); + return subscription; + } + + private void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) { + Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); + channel.setType(theResthook); + channel.setPayload("application/json"); + channel.setEndpoint("http://unused.test.endpoint/"); + theSubscription.setChannel(channel); + } +} diff --git a/src/changes/changes.xml b/src/changes/changes.xml index c0264ab7948..7800b25903f 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -26,6 +26,9 @@ enabled after the refactoring of how Subscriptions are enabled that occurred in HAPI FHIR 3.7.0. Thanks to Volker Schmidt for the pull request! + + Re-use subscription channel and handlers when a subscription is updated (unless the channel type changed). +