From f0329167768269ff623694fb556ae8a9982fa8bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20D=C3=B8ssing?= Date: Mon, 10 Jun 2019 17:05:05 +0200 Subject: [PATCH] Issue-1337: Fix unstable concurrent code in PointcutLatch, fix subscription tests getting latch exceptions due to missing expectations, make hapi-fhir-jpaserver-subscription tests load StructureDefinitions outside latch timers, as this can be slow on busy machines (#1338) --- .../jpa/model/concurrency/PointcutLatch.java | 42 +++++++++---------- .../module/BaseSubscriptionDstu3Test.java | 2 +- .../module/BaseSubscriptionTest.java | 8 ---- .../module/SubscriptionTestHelper.java | 2 +- ...kingQueueSubscribableChannelDstu3Test.java | 32 ++++++++++---- .../SubscriptionLoaderFhirClientTest.java | 13 +++--- .../standalone/SubscriptionLoaderTest.java | 9 ++-- 7 files changed, 57 insertions(+), 51 deletions(-) diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/concurrency/PointcutLatch.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/concurrency/PointcutLatch.java index 933597dfe24..e4369458a50 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/concurrency/PointcutLatch.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/concurrency/PointcutLatch.java @@ -44,11 +44,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { private final String name; - private CountDownLatch myCountdownLatch; - private AtomicReference> myFailures; - private AtomicReference> myCalledWith; + private final AtomicReference myCountdownLatch = new AtomicReference<>(); + private final AtomicReference> myFailures = new AtomicReference<>(); + private final AtomicReference> myCalledWith = new AtomicReference<>(); + private final Pointcut myPointcut; private int myInitialCount; - private Pointcut myPointcut; public PointcutLatch(Pointcut thePointcut) { this.name = thePointcut.name(); @@ -57,11 +57,12 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { public PointcutLatch(String theName) { this.name = theName; + myPointcut = null; } @Override public void setExpectedCount(int count) { - if (myCountdownLatch != null) { + if (myCountdownLatch.get() != null) { throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); } createLatch(count); @@ -69,14 +70,14 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { } private void createLatch(int count) { - myFailures = new AtomicReference<>(new ArrayList<>()); - myCalledWith = new AtomicReference<>(new ArrayList<>()); - myCountdownLatch = new CountDownLatch(count); + myFailures.set(new ArrayList<>()); + myCalledWith.set(new ArrayList<>()); + myCountdownLatch.set(new CountDownLatch(count)); myInitialCount = count; } private void addFailure(String failure) { - if (myFailures != null) { + if (myFailures.get() != null) { myFailures.get().add(failure); } else { throw new PointcutLatchException("trying to set failure on latch that hasn't been created: " + failure); @@ -95,9 +96,10 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { public List awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { List retval = myCalledWith.get(); try { - Validate.notNull(myCountdownLatch, getName() + " awaitExpected() called before setExpected() called."); - if (!myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)) { - throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + myCountdownLatch.getCount() + "."); + CountDownLatch latch = myCountdownLatch.get(); + Validate.notNull(latch, getName() + " awaitExpected() called before setExpected() called."); + if (!latch.await(timeoutSecond, TimeUnit.SECONDS)) { + throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + latch.getCount() + "."); } List failures = myFailures.get(); @@ -121,11 +123,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { @Override public void clear() { - myCountdownLatch = null; + myCountdownLatch.set(null); } private String myCalledWithString() { - if (myCalledWith == null) { + if (myCalledWith.get() == null) { return "[]"; } List calledWith = myCalledWith.get(); @@ -140,21 +142,19 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { @Override public void invoke(Pointcut thePointcut, HookParams theArgs) { - if (myCountdownLatch == null) { + CountDownLatch latch = myCountdownLatch.get(); + if (latch == null) { throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs); - } else if (myCountdownLatch.getCount() <= 0) { + } else if (latch.getCount() <= 0) { addFailure("invoke() called when countdown was zero."); } if (myCalledWith.get() != null) { myCalledWith.get().add(theArgs); } - ourLog.info("Called {} {} with {}", name, myCountdownLatch, hookParamsToString(theArgs)); + ourLog.info("Called {} {} with {}", name, latch, hookParamsToString(theArgs)); - if (myCountdownLatch == null) { - throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs); - } - myCountdownLatch.countDown(); + latch.countDown(); } public void call(Object arg) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java index 522e6062f8e..45bb72b0049 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionDstu3Test.java @@ -14,7 +14,7 @@ import static org.junit.Assert.fail; @ContextConfiguration(classes = {TestSubscriptionDstu3Config.class}) public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { - private SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper(); + private final SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper(); public static void waitForSize(int theTarget, List theList) { StopWatch sw = new StopWatch(); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java index 1bd5e7cc7c5..b0730ecff71 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/BaseSubscriptionTest.java @@ -23,9 +23,6 @@ public abstract class BaseSubscriptionTest { @Autowired MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider; - @Autowired - SubscriptionLoader mySubscriptionLoader; - @Autowired protected IInterceptorService myInterceptorRegistry; @@ -39,9 +36,4 @@ public abstract class BaseSubscriptionTest { myMockFhirClientSearchParamProvider.setBundleProvider(theBundleProvider); mySearchParamRegistry.forceRefresh(); } - - public void initSubscriptionLoader(IBundleProvider theBundleProvider) { - myMockFhirClientSubscriptionProvider.setBundleProvider(theBundleProvider); - mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); - } } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java index f9819c16ff6..e53dff20ded 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestHelper.java @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicLong; public class SubscriptionTestHelper { - protected static AtomicLong idCounter = new AtomicLong(); + protected static final AtomicLong idCounter = new AtomicLong(); public Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java index ea35c2cbc95..70ac121d01b 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/BaseBlockingQueueSubscribableChannelDstu3Test.java @@ -9,7 +9,10 @@ import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSearchParamProvider; +import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.rest.annotation.Create; @@ -17,8 +20,10 @@ import ca.uhn.fhir.rest.annotation.ResourceParam; import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.util.PortUtil; import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; @@ -54,7 +59,10 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base IInterceptorService myInterceptorRegistry; @Autowired protected SubscriptionRegistry mySubscriptionRegistry; - + @Autowired + private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; + @Autowired + private SubscriptionLoader mySubscriptionLoader; protected String myCode = "1000000050"; @@ -62,12 +70,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base private static RestfulServer ourListenerRestServer; private static Server ourListenerServer; protected static String ourListenerServerBase; - protected static List ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); - protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); - protected static List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); + protected static final List ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); + protected static final List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); + protected static final List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); private static SubscribableChannel ourSubscribableChannel; - protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); - protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); + protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); + protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); @Before public void beforeReset() { @@ -75,10 +83,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourUpdatedObservations.clear(); ourContentTypes.clear(); mySubscriptionRegistry.unregisterAllSubscriptions(); - if (ourSubscribableChannel == null) { ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); - } myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); } @@ -100,6 +106,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base return theResource; } + protected void initSubscriptionLoader(List subscriptions, String uuid) throws InterruptedException { + myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid)); + mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); + } + protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); mySubscriptionActivatedPost.setExpectedCount(1); @@ -144,6 +155,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base ourListenerServer.setHandler(proxyHandler); ourListenerServer.start(); + FhirContext context = ourListenerRestServer.getFhirContext(); + //Preload structure definitions so the load doesn't happen during the test (first load can be a little slow) + context.getValidationSupport().fetchAllStructureDefinitions(context); } @AfterClass @@ -153,7 +167,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base public static class ObservationListener implements IResourceProvider, IPointcutLatch { - private PointcutLatch updateLatch = new PointcutLatch("Observation Update"); + private final PointcutLatch updateLatch = new PointcutLatch("Observation Update"); @Create public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java index a4eede4e89e..31d0e46dec3 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderFhirClientTest.java @@ -1,8 +1,6 @@ package ca.uhn.fhir.jpa.subscription.module.standalone; import ca.uhn.fhir.rest.api.Constants; -import ca.uhn.fhir.rest.api.server.IBundleProvider; -import ca.uhn.fhir.rest.server.SimpleBundleProvider; import org.hl7.fhir.dstu3.model.Subscription; import org.junit.Test; @@ -12,6 +10,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { + @Test public void testSubscriptionLoaderFhirClient() throws InterruptedException { String payload = "application/fhir+json"; @@ -23,10 +22,13 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); - IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); - initSubscriptionLoader(bundle); + mySubscriptionActivatedPost.setExpectedCount(2); + initSubscriptionLoader(subs, "uuid"); + mySubscriptionActivatedPost.awaitExpected(); + ourObservationListener.setExpectedCount(1); sendObservation(myCode, "SNOMED-CT"); + ourObservationListener.awaitExpected(); waitForSize(0, ourCreatedObservations); waitForSize(1, ourUpdatedObservations); @@ -44,8 +46,7 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); - IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); - initSubscriptionLoader(bundle); + initSubscriptionLoader(subs, "uuid"); sendObservation(myCode, "SNOMED-CT"); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java index 0def5f74047..2848a55656e 100755 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/standalone/SubscriptionLoaderTest.java @@ -19,8 +19,6 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel private static final int MOCK_FHIR_CLIENT_FAILURES = 5; @Autowired private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; - @Autowired - private SubscriptionLoader mySubscriptionLoader; @Before public void setFailCount() { @@ -33,7 +31,7 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel } @Test - public void testSubscriptionLoaderFhirClientDown() { + public void testSubscriptionLoaderFhirClientDown() throws Exception { String payload = "application/fhir+json"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; @@ -43,8 +41,9 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); - IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); - initSubscriptionLoader(bundle); + mySubscriptionActivatedPost.setExpectedCount(2); + initSubscriptionLoader(subs, "uuid"); + mySubscriptionActivatedPost.awaitExpected(); assertEquals(0, myMockFhirClientSubscriptionProvider.getFailCount()); } }