diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/CanonicalSubscription.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/CanonicalSubscription.java index b055a20d5c4..239b99306e2 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/CanonicalSubscription.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/CanonicalSubscription.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.Subscription; @@ -357,4 +358,20 @@ public class CanonicalSubscription implements Serializable, Cloneable { } + @Override + public String toString() { + return new ToStringBuilder(this) + .append("myIdElement", myIdElement) + .append("myStatus", myStatus) + .append("myCriteriaString", myCriteriaString) + .append("myEndpointUrl", myEndpointUrl) + .append("myPayloadString", myPayloadString) +// .append("myHeaders", myHeaders) + .append("myChannelType", myChannelType) +// .append("myTrigger", myTrigger) +// .append("myEmailDetails", myEmailDetails) +// .append("myRestHookDetails", myRestHookDetails) +// .append("myChannelExtensions", myChannelExtensions) + .toString(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index cbc1029c1f9..ad98b9eaf48 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.Validate; import java.util.ArrayList; @@ -70,9 +69,4 @@ class ActiveSubscriptionCache { } } } - - @VisibleForTesting - void clearForUnitTests() { - myCache.clear(); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 65eda9b430a..7b45726e9cc 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -24,7 +24,6 @@ import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -113,7 +112,7 @@ public class SubscriptionRegistry { } @PreDestroy - public void preDestroy() { + public void unregisterAllSubscriptions() { unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); } @@ -156,9 +155,4 @@ public class SubscriptionRegistry { public int size() { return myActiveSubscriptionCache.size(); } - - @VisibleForTesting - public void clearForUnitTests() { - myActiveSubscriptionCache.clearForUnitTests(); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java index a57bb0e3cf4..f355a8fc34d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessag import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -61,15 +62,35 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { } public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) { - IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext); - RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource); - - if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) { - String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource); - if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { - mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); - } + switch (theResourceModifiedMessage.getOperationType()) { + case DELETE: + if (isSubscription(theResourceModifiedMessage)) { + mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext)); + } + return; + case CREATE: + case UPDATE: + if (isSubscription(theResourceModifiedMessage)) { + registerActiveSubscription(theResourceModifiedMessage.getNewPayload(myFhirContext)); + } + break; + default: + break; } + mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage); } + + private boolean isSubscription(ResourceModifiedMessage theResourceModifiedMessage) { + IIdType id = theResourceModifiedMessage.getId(myFhirContext); + RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(id.getResourceType()); + return resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode()); + } + + private void registerActiveSubscription(IBaseResource theSubscription) { + String status = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); + if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); + } + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index efbdbad6e0c..6ec7ee0895e 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -73,7 +73,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourCreatedObservations.clear(); ourUpdatedObservations.clear(); ourContentTypes.clear(); - mySubscriptionRegistry.clearForUnitTests(); + mySubscriptionRegistry.unregisterAllSubscriptions(); if (ourSubscribableChannel == null) { ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); @@ -85,6 +85,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base @After public void cleanup() { myInterceptorRegistry.clearAnonymousHookForUnitTest(); + mySubscriptionMatchingPost.clear(); + mySubscriptionActivatedPost.clear(); + ourObservationListener.clear(); } public T sendResource(T theResource) throws InterruptedException { @@ -184,5 +187,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public void expectNothing() { updateLatch.expectNothing(); } + + public void clear() { updateLatch.clear();} } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java index 27a3571d3be..31c859d1cb9 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandlerTest.java @@ -32,6 +32,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any()); Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); } @@ -42,7 +43,19 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any()); Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); } + + @Test + public void deleteSubscription() { + Subscription subscription = makeSubscriptionWithStatus("testCriteria", "testPayload", "testEndpoint", Subscription.SubscriptionStatus.REQUESTED); + ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.DELETE); + ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); + myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); + Mockito.verify(mySubscriptionRegistry).unregisterSubscription(any()); + Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); + Mockito.verify(mySubscriptionMatchingSubscriber, never()).matchActiveSubscriptionsAndDeliver(any()); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java index 578e2da944f..a4eede4e89e 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java @@ -1,6 +1,5 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; -import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.server.SimpleBundleProvider; @@ -9,17 +8,12 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { @Test public void testSubscriptionLoaderFhirClient() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown()); - String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; @@ -33,7 +27,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba initSubscriptionLoader(bundle); sendObservation(myCode, "SNOMED-CT"); - latch.await(10, TimeUnit.SECONDS); waitForSize(0, ourCreatedObservations); waitForSize(1, ourUpdatedObservations); @@ -42,9 +35,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba @Test public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown()); - String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; @@ -58,7 +48,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba initSubscriptionLoader(bundle); sendObservation(myCode, "SNOMED-CT"); - latch.await(10, TimeUnit.SECONDS); waitForSize(0, ourCreatedObservations); waitForSize(0, ourUpdatedObservations);