Merge pull request #1208 from jamesagnew/reuse-subscription-channels
Reuse subscription channels
This commit is contained in:
commit
a35f30b18e
|
@ -19,7 +19,7 @@ import ca.uhn.fhir.rest.annotation.Update;
|
||||||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||||
import ca.uhn.fhir.rest.server.IResourceProvider;
|
import ca.uhn.fhir.rest.server.IResourceProvider;
|
||||||
import ca.uhn.fhir.rest.server.RestfulServer;
|
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||||
import ca.uhn.fhir.util.PortUtil;
|
import ca.uhn.fhir.util.PortUtil;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
|
@ -119,8 +119,6 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
|
||||||
return observation;
|
return observation;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: re enable
|
|
||||||
@Ignore
|
|
||||||
@Test
|
@Test
|
||||||
public void testRestHookSubscriptionInvalidCriteria() throws Exception {
|
public void testRestHookSubscriptionInvalidCriteria() throws Exception {
|
||||||
String payload = "application/xml";
|
String payload = "application/xml";
|
||||||
|
@ -130,8 +128,8 @@ public class RestHookTestDstu2Test extends BaseResourceProviderDstu2Test {
|
||||||
try {
|
try {
|
||||||
createSubscription(criteria1, payload, ourListenerServerBase);
|
createSubscription(criteria1, payload, ourListenerServerBase);
|
||||||
fail();
|
fail();
|
||||||
} catch (InvalidRequestException e) {
|
} catch (UnprocessableEntityException e) {
|
||||||
assertEquals("HTTP 400 Bad Request: Invalid criteria: Failed to parse match URL[Observation?codeeeee=SNOMED-CT] - Resource type Observation does not have a parameter with name: codeeeee", e.getMessage());
|
assertEquals("TTP 422 Unprocessable Entity: Invalid subscription criteria submitted: Observation?codeeeee=SNOMED-CT Failed to parse match URL[Observation?codeeeee=SNOMED-CT] - Resource type Observation does not have a parameter with name: codeeeee", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,6 +139,24 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
||||||
return observation;
|
return observation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDatabaseStrategyMeta() throws InterruptedException {
|
||||||
|
String databaseCriteria = "Observation?code=17861-6&context.type=IHD";
|
||||||
|
Subscription subscription = createSubscription(databaseCriteria, null, ourNotificationListenerServer);
|
||||||
|
List<Coding> tag = subscription.getMeta().getTag();
|
||||||
|
assertEquals(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY, tag.get(0).getSystem());
|
||||||
|
assertEquals(SubscriptionMatchingStrategy.DATABASE.toString(), tag.get(0).getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMemorytrategyMeta() throws InterruptedException {
|
||||||
|
String inMemoryCriteria = "Observation?code=17861-6";
|
||||||
|
Subscription subscription = createSubscription(inMemoryCriteria, null, ourNotificationListenerServer);
|
||||||
|
List<Coding> tag = subscription.getMeta().getTag();
|
||||||
|
assertEquals(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY, tag.get(0).getSystem());
|
||||||
|
assertEquals(SubscriptionMatchingStrategy.IN_MEMORY.toString(), tag.get(0).getCode());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRestHookSubscription() throws Exception {
|
public void testRestHookSubscription() throws Exception {
|
||||||
String code = "1000000050";
|
String code = "1000000050";
|
||||||
|
|
|
@ -36,7 +36,7 @@ import java.util.HashSet;
|
||||||
public class ActiveSubscription {
|
public class ActiveSubscription {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
|
||||||
|
|
||||||
private final CanonicalSubscription mySubscription;
|
private CanonicalSubscription mySubscription;
|
||||||
private final SubscribableChannel mySubscribableChannel;
|
private final SubscribableChannel mySubscribableChannel;
|
||||||
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
|
private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
|
||||||
|
|
||||||
|
@ -90,4 +90,8 @@ public class ActiveSubscription {
|
||||||
public MessageHandler getDeliveryHandlerForUnitTest() {
|
public MessageHandler getDeliveryHandlerForUnitTest() {
|
||||||
return myDeliveryHandlerSet.iterator().next();
|
return myDeliveryHandlerSet.iterator().next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) {
|
||||||
|
mySubscription = theCanonicalizedSubscription;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,6 +130,11 @@ public class SubscriptionRegistry {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
|
ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
|
||||||
|
if (channelTypeSame(existingSubscription.get(), newSubscription)) {
|
||||||
|
ourLog.info("Channel type is same. Updating active subscription and re-using existing channel and handlers.");
|
||||||
|
updateSubscription(theSubscription);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
unregisterSubscription(theSubscription.getIdElement());
|
unregisterSubscription(theSubscription.getIdElement());
|
||||||
} else {
|
} else {
|
||||||
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
|
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
|
||||||
|
@ -140,7 +145,23 @@ public class SubscriptionRegistry {
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateSubscription(IBaseResource theSubscription) {
|
||||||
|
IIdType theId = theSubscription.getIdElement();
|
||||||
|
Validate.notNull(theId);
|
||||||
|
Validate.notBlank(theId.getIdPart());
|
||||||
|
ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
|
||||||
|
Validate.notNull(activeSubscription);
|
||||||
|
CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
|
||||||
|
activeSubscription.setSubscription(canonicalized);
|
||||||
|
|
||||||
|
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
|
||||||
|
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
|
||||||
|
return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
|
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
|
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
|
||||||
|
import org.hl7.fhir.dstu3.model.Subscription;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class SubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
|
||||||
|
public static final String SUBSCRIPTION_ID = "1";
|
||||||
|
public static final String ORIG_CRITERIA = "Patient?";
|
||||||
|
public static final String NEW_CRITERIA = "Observation?";
|
||||||
|
@Autowired
|
||||||
|
SubscriptionRegistry mySubscriptionRegistry;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void clearRegistry() {
|
||||||
|
mySubscriptionRegistry.unregisterAllSubscriptions();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void updateSubscriptionReusesActiveSubscription() {
|
||||||
|
Subscription subscription = createSubscription();
|
||||||
|
assertEquals(0, mySubscriptionRegistry.size());
|
||||||
|
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
|
||||||
|
assertEquals(1, mySubscriptionRegistry.size());
|
||||||
|
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
|
||||||
|
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
|
||||||
|
|
||||||
|
subscription.setCriteria(NEW_CRITERIA);
|
||||||
|
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
|
||||||
|
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
|
||||||
|
assertEquals(1, mySubscriptionRegistry.size());
|
||||||
|
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
|
||||||
|
assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString());
|
||||||
|
// The same object
|
||||||
|
assertTrue(newActiveSubscription == origActiveSubscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void updateSubscriptionDoesntReusesActiveSubscriptionWhenChannelChanges() {
|
||||||
|
Subscription subscription = createSubscription();
|
||||||
|
assertEquals(0, mySubscriptionRegistry.size());
|
||||||
|
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
|
||||||
|
assertEquals(1, mySubscriptionRegistry.size());
|
||||||
|
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
|
||||||
|
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
|
||||||
|
|
||||||
|
setChannel(subscription, Subscription.SubscriptionChannelType.EMAIL);
|
||||||
|
|
||||||
|
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
|
||||||
|
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
|
||||||
|
assertEquals(1, mySubscriptionRegistry.size());
|
||||||
|
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
|
||||||
|
// A new object
|
||||||
|
assertFalse(newActiveSubscription == origActiveSubscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Subscription createSubscription() {
|
||||||
|
Subscription subscription = new Subscription();
|
||||||
|
subscription.setId(SUBSCRIPTION_ID);
|
||||||
|
subscription.setCriteria(ORIG_CRITERIA);
|
||||||
|
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
|
||||||
|
setChannel(subscription, Subscription.SubscriptionChannelType.RESTHOOK);
|
||||||
|
return subscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setChannel(Subscription theSubscription, Subscription.SubscriptionChannelType theResthook) {
|
||||||
|
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
|
||||||
|
channel.setType(theResthook);
|
||||||
|
channel.setPayload("application/json");
|
||||||
|
channel.setEndpoint("http://unused.test.endpoint/");
|
||||||
|
theSubscription.setChannel(channel);
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,9 @@
|
||||||
enabled after the refactoring of how Subscriptions are enabled that
|
enabled after the refactoring of how Subscriptions are enabled that
|
||||||
occurred in HAPI FHIR 3.7.0. Thanks to Volker Schmidt for the pull request!
|
occurred in HAPI FHIR 3.7.0. Thanks to Volker Schmidt for the pull request!
|
||||||
</action>
|
</action>
|
||||||
|
<action type="change">
|
||||||
|
Re-use subscription channel and handlers when a subscription is updated (unless the channel type changed).
|
||||||
|
</action>
|
||||||
<action type="fix">
|
<action type="fix">
|
||||||
When using the <![CDATA[<code>_elements</code>]]> parameter on searches and reads,
|
When using the <![CDATA[<code>_elements</code>]]> parameter on searches and reads,
|
||||||
requesting extensions to be included caused the extensions to be included but
|
requesting extensions to be included caused the extensions to be included but
|
||||||
|
|
Loading…
Reference in New Issue