SubscriptionRegistry re-uses channel and handlers if channel type didn't change when subscription is updated

This commit is contained in:
Ken Stevens 2019-02-14 19:38:31 -05:00
parent 508e15956e
commit badbde7e4a
4 changed files with 107 additions and 1 deletions

View File

@ -36,7 +36,7 @@ import java.util.HashSet;
public class ActiveSubscription { public class ActiveSubscription {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private final CanonicalSubscription mySubscription; private CanonicalSubscription mySubscription;
private final SubscribableChannel mySubscribableChannel; private final SubscribableChannel mySubscribableChannel;
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>(); private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
@ -90,4 +90,8 @@ public class ActiveSubscription {
public MessageHandler getDeliveryHandlerForUnitTest() { public MessageHandler getDeliveryHandlerForUnitTest() {
return myDeliveryHandlerSet.iterator().next(); return myDeliveryHandlerSet.iterator().next();
} }
public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) {
mySubscription = theCanonicalizedSubscription;
}
} }

View File

@ -130,6 +130,11 @@ public class SubscriptionRegistry {
return false; return false;
} }
ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
if (channelTypeSame(existingSubscription.get(), newSubscription)) {
ourLog.info("Channel type is same. Updating active subscription and re-using existing channel and handlers.");
updateSubscription(theSubscription);
return true;
}
unregisterSubscription(theSubscription.getIdElement()); unregisterSubscription(theSubscription.getIdElement());
} else { } else {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
@ -140,7 +145,23 @@ public class SubscriptionRegistry {
} else { } else {
return false; return false;
} }
}
private void updateSubscription(IBaseResource theSubscription) {
IIdType theId = theSubscription.getIdElement();
Validate.notNull(theId);
Validate.notBlank(theId.getIdPart());
ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
Validate.notNull(activeSubscription);
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
activeSubscription.setSubscription(canonicalized);
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized);
}
private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
} }
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {

View File

@ -0,0 +1,78 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.*;
public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
public static final String SUBSCRIPTION_ID = "1";
public static final String ORIG_CRITERIA = "Patient?";
public static final String NEW_CRITERIA = "Observation?";
@Autowired
SubscriptionRegistry mySubscriptionRegistry;
@After
public void clearRegistry() {
mySubscriptionRegistry.unregisterAllSubscriptions();
}
@Test
public void updateSubscriptionReusesActiveSubscription() {
Subscription subscription = createSubscription();
assertEquals(0, mySubscriptionRegistry.size());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size());
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
subscription.setCriteria(NEW_CRITERIA);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size());
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString());
// The same object
assertTrue(newActiveSubscription == origActiveSubscription);
}
@Test
public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() {
Subscription subscription = createSubscription();
assertEquals(0, mySubscriptionRegistry.size());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size());
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
setChannel(subscription, Subscription.SubscriptionChannelType.EMAIL);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertEquals(1, mySubscriptionRegistry.size());
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
// A new object
assertFalse(newActiveSubscription == origActiveSubscription);
}
private Subscription createSubscription() {
Subscription subscription = new Subscription();
subscription.setId(SUBSCRIPTION_ID);
subscription.setCriteria(ORIG_CRITERIA);
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK);
return subscription;
}
private void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) {
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
channel.setType(theResthook);
channel.setPayload("application/json");
channel.setEndpoint("http://unused.test.endpoint/");
theSubscription.setChannel(channel);
}
}

View File

@ -26,6 +26,9 @@
enabled after the refactoring of how Subscriptions are enabled that enabled after the refactoring of how Subscriptions are enabled that
occurred in HAPI FHIR 3.7.0. Thanks to Volker Schmidt for the pull request! occurred in HAPI FHIR 3.7.0. Thanks to Volker Schmidt for the pull request!
</action> </action>
<action type="change">
Re-use subscription channel and handlers when a subscription is updated (unless the channel type changed).
</action>
</release> </release>
<release version="3.7.0" date="2019-02-06" description="Gale"> <release version="3.7.0" date="2019-02-06" description="Gale">
<action type="add"> <action type="add">