diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java index a57bb0e3cf4..22ba22da4f6 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessag import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -61,15 +62,37 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { } public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) { - IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext); - RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource); - - if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) { - String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource); - if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { - mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); - } + if (isSubscription(theResourceModifiedMessage)) { + handleSubscriptionActivation(theResourceModifiedMessage); } + mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage); } + + private void handleSubscriptionActivation(ResourceModifiedMessage theResourceModifiedMessage) { + switch (theResourceModifiedMessage.getOperationType()) { + case DELETE: + mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext)); + break; + case CREATE: + case UPDATE: + registerActiveSubscription(theResourceModifiedMessage.getNewPayload(myFhirContext)); + break; + default: + break; + } + } + + private boolean isSubscription(ResourceModifiedMessage theResourceModifiedMessage) { + IIdType id = theResourceModifiedMessage.getId(myFhirContext); + RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(id.getResourceType()); + return resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode()); + } + + private void registerActiveSubscription(IBaseResource theSubscription) { + String status = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); + if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); + } + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index efbdbad6e0c..6d41aaafa65 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -85,6 +85,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base @After public void cleanup() { myInterceptorRegistry.clearAnonymousHookForUnitTest(); + mySubscriptionMatchingPost.clear(); + mySubscriptionActivatedPost.clear(); + ourObservationListener.clear(); } public T sendResource(T theResource) throws InterruptedException { @@ -184,5 +187,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public void expectNothing() { updateLatch.expectNothing(); } + + public void clear() { updateLatch.clear();} } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java index ff120b6207f..6377ed0b52c 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java @@ -32,6 +32,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any()); Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); } @@ -42,6 +43,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any()); Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); } @@ -52,6 +54,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.DELETE); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry).unregisterSubscription(any()); Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java index 578e2da944f..a4eede4e89e 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java @@ -1,6 +1,5 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; -import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.server.SimpleBundleProvider; @@ -9,17 +8,12 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { @Test public void testSubscriptionLoaderFhirClient() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown()); - String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; @@ -33,7 +27,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba initSubscriptionLoader(bundle); sendObservation(myCode, "SNOMED-CT"); - latch.await(10, TimeUnit.SECONDS); waitForSize(0, ourCreatedObservations); waitForSize(1, ourUpdatedObservations); @@ -42,9 +35,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba @Test public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown()); - String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; @@ -58,7 +48,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba initSubscriptionLoader(bundle); sendObservation(myCode, "SNOMED-CT"); - latch.await(10, TimeUnit.SECONDS); waitForSize(0, ourCreatedObservations); waitForSize(0, ourUpdatedObservations);