diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/HookParams.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/HookParams.java index 5c97f1ae081..5156e67c66a 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/HookParams.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/HookParams.java @@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.model.interceptor.api; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -73,4 +75,7 @@ public class HookParams { return myParams.values().stream().map(t -> t.getClass().getSimpleName()).collect(Collectors.toList()); } + public Collection values() { + return Collections.unmodifiableCollection(myParams.values()); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index 0aa92298c04..6afcd485850 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.Validate; import java.util.ArrayList; @@ -69,4 +70,9 @@ public class ActiveSubscriptionCache { } } } + + @VisibleForTesting + public void clearForUnitTests() { + myCache.clear(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 29c6e939d2c..7c9b9b5da50 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; @@ -155,4 +156,9 @@ public class SubscriptionRegistry { public int size() { return myActiveSubscriptionCache.size(); } + + @VisibleForTesting + public void clearForUnitTests() { + myActiveSubscriptionCache.clearForUnitTests(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java index a8d6f91468e..6c0a5108203 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/PointcutLatch.java @@ -1,17 +1,19 @@ package ca.uhn.fhir.jpa.subscription.module; +import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.model.interceptor.api.HookParams; import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import static org.junit.Assert.assertNotNull; @@ -22,7 +24,6 @@ public class PointcutLatch implements IAnonymousLambdaHook { private static final int DEFAULT_TIMEOUT_SECONDS = 10; private final String name; - private Semaphore mySemaphore = new Semaphore(1); private CountDownLatch myCountdownLatch; private AtomicReference myFailure; private AtomicReference> myCalledWith; @@ -35,61 +36,116 @@ public class PointcutLatch implements IAnonymousLambdaHook { this.name = theName; } - private void countdown() { - if (myCountdownLatch == null) { - myFailure.set(name + " latch countdown() called before expectedCount set."); - } else if (myCountdownLatch.getCount() <= 0) { - myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected."); + public void setExpectedCount(int count) throws InterruptedException { + if (myCountdownLatch != null) { + throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); } - ourLog.info("{} counting down {}", name, myCountdownLatch); - myCountdownLatch.countDown(); + createLatch(count); } - public void setExpectedCount(int count) throws InterruptedException { - mySemaphore.acquire(); - if (myCountdownLatch != null) { - myFailure.set(name + " latch setExpectedCount() called before previous awaitExpected() completed."); - } + private void createLatch(int count) { myFailure = new AtomicReference<>(); myCalledWith = new AtomicReference<>(new ArrayList<>()); myCountdownLatch = new CountDownLatch(count); } - public void awaitExpected() throws InterruptedException { - awaitExpected(true); - } - - public void awaitExpected(boolean release) throws InterruptedException { - awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS, release); - } - - public void awaitExpectedWithTimeout(int timeoutSecond, boolean release) throws InterruptedException { - try { - assertNotNull(name + " latch awaitExpected() called before previous setExpected() called.", myCountdownLatch); - assertTrue(name + " latch timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)); - - if (myFailure.get() != null) { - String error = myFailure.get(); - error += "\nLatch called with values: " + myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", ")); - throw new AssertionError(error); - } - } finally { - if (release) { - release(); - } + private void setFailure(String failure) { + if (myFailure != null) { + myFailure.set(failure); + } else { + throw new PointcutLatchException("trying to set failure on latch that hasn't been created: " + failure); } } - public void release() { + private String getName() { + return name + " " + this.getClass().getSimpleName(); + } + + public void awaitExpected() throws InterruptedException { + awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS); + } + + public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { + try { + assertNotNull(getName() + " awaitExpected() called before setExpected() called.", myCountdownLatch); + assertTrue(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)); + + if (myFailure.get() != null) { + String error = getName() + ": " + myFailure.get(); + error += "\nLatch called with values: " + myCalledWithString(); + throw new AssertionError(error); + } + } finally { + destroyLatch(); + } + } + + public void expectNothing() { + destroyLatch(); + } + + private void destroyLatch() { myCountdownLatch = null; - mySemaphore.release(); + } + + private String myCalledWithString() { + if (myCalledWith == null) { + return "[]"; + } + List calledWith = myCalledWith.get(); + if (calledWith.isEmpty()) { + return "[]"; + } + String retVal = "[ "; + retVal += calledWith.stream().flatMap(hookParams -> hookParams.values().stream()).map(itemToString()).collect(Collectors.joining(", ")); + return retVal + " ]"; + } + + private static Function itemToString() { + return object -> { + if (object instanceof IBaseResource) { + IBaseResource resource = (IBaseResource) object; + return "Resource " + resource.getIdElement().getValue(); + } else if (object instanceof ResourceModifiedMessage) { + ResourceModifiedMessage resourceModifiedMessage = (ResourceModifiedMessage)object; + // FIXME KHS can we get the context from the payload? + return "ResourceModified Message { " + resourceModifiedMessage.getOperationType() + ", " + resourceModifiedMessage.getNewPayload(FhirContext.forDstu3()).getIdElement().getValue() + "}"; + } else { + return object.toString(); + } + }; } @Override public void invoke(HookParams theArgs) { + if (myCountdownLatch == null) { + throw new PointcutLatchException("countdown() called before setExpectedCount() called.", theArgs); + } else if (myCountdownLatch.getCount() <= 0) { + setFailure("countdown() called " + (1 - myCountdownLatch.getCount()) + " more times than expected."); + } + this.countdown(); if (myCalledWith.get() != null) { myCalledWith.get().add(theArgs); } } + + private void countdown() { + ourLog.info("{} counting down {}", name, myCountdownLatch); + myCountdownLatch.countDown(); + } + + private class PointcutLatchException extends IllegalStateException { + public PointcutLatchException(String message, HookParams theArgs) { + super(getName() + ": " + message + " called with values: " + hookParamsToString(theArgs)); + } + + public PointcutLatchException(String message) { + super(getName() + ": " + message); + } + } + + private static String hookParamsToString(HookParams hookParams) { + return hookParams.values().stream().map(itemToString()).collect(Collectors.joining(", ")); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index 4269641541a..f2608b12f95 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.PointcutLatch; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.rest.annotation.Create; @@ -38,6 +39,7 @@ import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class); @@ -51,6 +53,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base SubscriptionChannelFactory mySubscriptionChannelFactory; @Autowired InterceptorRegistry myInterceptorRegistry; + @Autowired + protected SubscriptionRegistry mySubscriptionRegistry; + protected String myCode = "1000000050"; @@ -63,7 +68,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base protected static List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); private static SubscribableChannel ourSubscribableChannel; private List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); - private long idCounter = 0; + private static AtomicLong idCounter = new AtomicLong(); protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); @@ -72,6 +77,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourCreatedObservations.clear(); ourUpdatedObservations.clear(); ourContentTypes.clear(); + mySubscriptionRegistry.clearForUnitTests(); if (ourSubscribableChannel == null) { ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); @@ -85,10 +91,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base myInterceptorRegistry.clearAnonymousHookForUnitTest(); } - public T sendResource(T theResource) { + public T sendResource(T theResource) throws InterruptedException { ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg); + mySubscriptionMatchingPost.setExpectedCount(1); ourSubscribableChannel.send(message); + mySubscriptionMatchingPost.awaitExpected(); return theResource; } @@ -105,8 +113,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE); subscription.setCriteria(theCriteria); - ++idCounter; - IdType id = new IdType("Subscription", idCounter); + IdType id = new IdType("Subscription", idCounter.incrementAndGet()); subscription.setId(id); Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent(); @@ -117,10 +124,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base return subscription; } - protected Observation sendObservation(String code, String system) { + protected Observation sendObservation(String code, String system) throws InterruptedException { Observation observation = new Observation(); - ++idCounter; - IdType id = new IdType("Observation", idCounter); + IdType id = new IdType("Observation", idCounter.incrementAndGet()); observation.setId(id); CodeableConcept codeableConcept = new CodeableConcept(); @@ -134,7 +140,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base return sendResource(observation); } - @BeforeClass public static void startListenerServer() throws Exception { ourListenerPort = PortUtil.findFreePort(); @@ -192,12 +197,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base updateLatch.setExpectedCount(count); } - public void awaitExpected(boolean release) throws InterruptedException { - updateLatch.awaitExpected(release); + public void awaitExpected() throws InterruptedException { + updateLatch.awaitExpected(); } - public void release() { - updateLatch.release(); + public void expectNothing() { + updateLatch.expectNothing(); } } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java index 5af2d728dff..d3fe0ba2a38 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionCheckingSubscriberTest.java @@ -1,10 +1,12 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test; import ca.uhn.fhir.rest.api.Constants; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import static org.junit.Assert.assertEquals; @@ -25,12 +27,14 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(false); + ourObservationListener.awaitExpected(); + assertEquals(1, ourContentTypes.size()); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - ourObservationListener.release(); } @Test @@ -44,12 +48,14 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(false); + ourObservationListener.awaitExpected(); + assertEquals(1, ourContentTypes.size()); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); - ourObservationListener.release(); } @Test @@ -63,10 +69,12 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(0); - mySubscriptionMatchingPost.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - mySubscriptionMatchingPost.awaitExpected(); - ourObservationListener.awaitExpected(true); + ourObservationListener.expectNothing(); + + assertEquals(0, ourContentTypes.size()); } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java index 8fd2e436e4e..446b903e4a3 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionMatchingSubscriberTest.java @@ -25,12 +25,14 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(false); + ourObservationListener.awaitExpected(); + assertEquals(1, ourContentTypes.size()); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - ourObservationListener.release(); } @Test @@ -44,12 +46,14 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - ourObservationListener.awaitExpected(false); + ourObservationListener.awaitExpected(); + assertEquals(1, ourContentTypes.size()); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); - ourObservationListener.release(); } @Test @@ -63,10 +67,12 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri sendSubscription(criteria1, payload, ourListenerServerBase); sendSubscription(criteria2, payload, ourListenerServerBase); + assertEquals(2, mySubscriptionRegistry.size()); + ourObservationListener.setExpectedCount(0); - mySubscriptionMatchingPost.setExpectedCount(1); sendObservation(code, "SNOMED-CT"); - mySubscriptionMatchingPost.awaitExpected(); - ourObservationListener.awaitExpected(true); + ourObservationListener.expectNothing(); + + assertEquals(0, ourContentTypes.size()); } }