Issue-1337: Fix unstable concurrent code in PointcutLatch, fix subscription tests getting latch exceptions due to missing expectations, make hapi-fhir-jpaserver-subscription tests load StructureDefinitions outside latch timers, as this can be slow on busy machines (#1338)

This commit is contained in:
Stig Døssing 2019-06-10 17:05:05 +02:00 committed by Ken Stevens
parent a952663dc3
commit f032916776
7 changed files with 57 additions and 51 deletions

View File

@ -44,11 +44,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private final String name; private final String name;
private CountDownLatch myCountdownLatch; private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>();
private AtomicReference<List<String>> myFailures; private final AtomicReference<List<String>> myFailures = new AtomicReference<>();
private AtomicReference<List<HookParams>> myCalledWith; private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>();
private final Pointcut myPointcut;
private int myInitialCount; private int myInitialCount;
private Pointcut myPointcut;
public PointcutLatch(Pointcut thePointcut) { public PointcutLatch(Pointcut thePointcut) {
this.name = thePointcut.name(); this.name = thePointcut.name();
@ -57,11 +57,12 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
public PointcutLatch(String theName) { public PointcutLatch(String theName) {
this.name = theName; this.name = theName;
myPointcut = null;
} }
@Override @Override
public void setExpectedCount(int count) { public void setExpectedCount(int count) {
if (myCountdownLatch != null) { if (myCountdownLatch.get() != null) {
throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed.");
} }
createLatch(count); createLatch(count);
@ -69,14 +70,14 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
} }
private void createLatch(int count) { private void createLatch(int count) {
myFailures = new AtomicReference<>(new ArrayList<>()); myFailures.set(new ArrayList<>());
myCalledWith = new AtomicReference<>(new ArrayList<>()); myCalledWith.set(new ArrayList<>());
myCountdownLatch = new CountDownLatch(count); myCountdownLatch.set(new CountDownLatch(count));
myInitialCount = count; myInitialCount = count;
} }
private void addFailure(String failure) { private void addFailure(String failure) {
if (myFailures != null) { if (myFailures.get() != null) {
myFailures.get().add(failure); myFailures.get().add(failure);
} else { } else {
throw new PointcutLatchException("trying to set failure on latch that hasn't been created: " + failure); throw new PointcutLatchException("trying to set failure on latch that hasn't been created: " + failure);
@ -95,9 +96,10 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
List<HookParams> retval = myCalledWith.get(); List<HookParams> retval = myCalledWith.get();
try { try {
Validate.notNull(myCountdownLatch, getName() + " awaitExpected() called before setExpected() called."); CountDownLatch latch = myCountdownLatch.get();
if (!myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS)) { Validate.notNull(latch, getName() + " awaitExpected() called before setExpected() called.");
throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + myCountdownLatch.getCount() + "."); if (!latch.await(timeoutSecond, TimeUnit.SECONDS)) {
throw new AssertionError(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to countdown from " + myInitialCount + " to 0. Is " + latch.getCount() + ".");
} }
List<String> failures = myFailures.get(); List<String> failures = myFailures.get();
@ -121,11 +123,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override @Override
public void clear() { public void clear() {
myCountdownLatch = null; myCountdownLatch.set(null);
} }
private String myCalledWithString() { private String myCalledWithString() {
if (myCalledWith == null) { if (myCalledWith.get() == null) {
return "[]"; return "[]";
} }
List<HookParams> calledWith = myCalledWith.get(); List<HookParams> calledWith = myCalledWith.get();
@ -140,21 +142,19 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override @Override
public void invoke(Pointcut thePointcut, HookParams theArgs) { public void invoke(Pointcut thePointcut, HookParams theArgs) {
if (myCountdownLatch == null) { CountDownLatch latch = myCountdownLatch.get();
if (latch == null) {
throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs); throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs);
} else if (myCountdownLatch.getCount() <= 0) { } else if (latch.getCount() <= 0) {
addFailure("invoke() called when countdown was zero."); addFailure("invoke() called when countdown was zero.");
} }
if (myCalledWith.get() != null) { if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs); myCalledWith.get().add(theArgs);
} }
ourLog.info("Called {} {} with {}", name, myCountdownLatch, hookParamsToString(theArgs)); ourLog.info("Called {} {} with {}", name, latch, hookParamsToString(theArgs));
if (myCountdownLatch == null) { latch.countDown();
throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs);
}
myCountdownLatch.countDown();
} }
public void call(Object arg) { public void call(Object arg) {

View File

@ -14,7 +14,7 @@ import static org.junit.Assert.fail;
@ContextConfiguration(classes = {TestSubscriptionDstu3Config.class}) @ContextConfiguration(classes = {TestSubscriptionDstu3Config.class})
public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest { public abstract class BaseSubscriptionDstu3Test extends BaseSubscriptionTest {
private SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper(); private final SubscriptionTestHelper mySubscriptionTestHelper = new SubscriptionTestHelper();
public static void waitForSize(int theTarget, List<?> theList) { public static void waitForSize(int theTarget, List<?> theList) {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();

View File

@ -23,9 +23,6 @@ public abstract class BaseSubscriptionTest {
@Autowired @Autowired
MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider; MockFhirClientSearchParamProvider myMockFhirClientSearchParamProvider;
@Autowired
SubscriptionLoader mySubscriptionLoader;
@Autowired @Autowired
protected protected
IInterceptorService myInterceptorRegistry; IInterceptorService myInterceptorRegistry;
@ -39,9 +36,4 @@ public abstract class BaseSubscriptionTest {
myMockFhirClientSearchParamProvider.setBundleProvider(theBundleProvider); myMockFhirClientSearchParamProvider.setBundleProvider(theBundleProvider);
mySearchParamRegistry.forceRefresh(); mySearchParamRegistry.forceRefresh();
} }
public void initSubscriptionLoader(IBundleProvider theBundleProvider) {
myMockFhirClientSubscriptionProvider.setBundleProvider(theBundleProvider);
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
}
} }

View File

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class SubscriptionTestHelper { public class SubscriptionTestHelper {
protected static AtomicLong idCounter = new AtomicLong(); protected static final AtomicLong idCounter = new AtomicLong();
public Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) { public Subscription makeActiveSubscription(String theCriteria, String thePayload, String theEndpoint) {

View File

@ -9,7 +9,10 @@ import ca.uhn.fhir.jpa.model.concurrency.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test; import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSearchParamProvider;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.rest.annotation.Create; import ca.uhn.fhir.rest.annotation.Create;
@ -17,8 +20,10 @@ import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.util.PortUtil; import ca.uhn.fhir.util.PortUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -54,7 +59,10 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
IInterceptorService myInterceptorRegistry; IInterceptorService myInterceptorRegistry;
@Autowired @Autowired
protected SubscriptionRegistry mySubscriptionRegistry; protected SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
@Autowired
private SubscriptionLoader mySubscriptionLoader;
protected String myCode = "1000000050"; protected String myCode = "1000000050";
@ -62,12 +70,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private static RestfulServer ourListenerRestServer; private static RestfulServer ourListenerRestServer;
private static Server ourListenerServer; private static Server ourListenerServer;
protected static String ourListenerServerBase; protected static String ourListenerServerBase;
protected static List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static final List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static final List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>()); protected static final List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
private static SubscribableChannel ourSubscribableChannel; private static SubscribableChannel ourSubscribableChannel;
protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
@Before @Before
public void beforeReset() { public void beforeReset() {
@ -75,10 +83,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourUpdatedObservations.clear(); ourUpdatedObservations.clear();
ourContentTypes.clear(); ourContentTypes.clear();
mySubscriptionRegistry.unregisterAllSubscriptions(); mySubscriptionRegistry.unregisterAllSubscriptions();
if (ourSubscribableChannel == null) {
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
}
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
} }
@ -100,6 +106,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return theResource; return theResource;
} }
protected void initSubscriptionLoader(List<Subscription> subscriptions, String uuid) throws InterruptedException {
myMockFhirClientSubscriptionProvider.setBundleProvider(new SimpleBundleProvider(new ArrayList<>(subscriptions), uuid));
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
}
protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint); Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint);
mySubscriptionActivatedPost.setExpectedCount(1); mySubscriptionActivatedPost.setExpectedCount(1);
@ -144,6 +155,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourListenerServer.setHandler(proxyHandler); ourListenerServer.setHandler(proxyHandler);
ourListenerServer.start(); ourListenerServer.start();
FhirContext context = ourListenerRestServer.getFhirContext();
//Preload structure definitions so the load doesn't happen during the test (first load can be a little slow)
context.getValidationSupport().fetchAllStructureDefinitions(context);
} }
@AfterClass @AfterClass
@ -153,7 +167,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public static class ObservationListener implements IResourceProvider, IPointcutLatch { public static class ObservationListener implements IResourceProvider, IPointcutLatch {
private PointcutLatch updateLatch = new PointcutLatch("Observation Update"); private final PointcutLatch updateLatch = new PointcutLatch("Observation Update");
@Create @Create
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {

View File

@ -1,8 +1,6 @@
package ca.uhn.fhir.jpa.subscription.module.standalone; package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import org.hl7.fhir.dstu3.model.Subscription; import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.Test; import org.junit.Test;
@ -12,6 +10,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
@Test @Test
public void testSubscriptionLoaderFhirClient() throws InterruptedException { public void testSubscriptionLoaderFhirClient() throws InterruptedException {
String payload = "application/fhir+json"; String payload = "application/fhir+json";
@ -23,10 +22,13 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase));
subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase));
IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); mySubscriptionActivatedPost.setExpectedCount(2);
initSubscriptionLoader(bundle); initSubscriptionLoader(subs, "uuid");
mySubscriptionActivatedPost.awaitExpected();
ourObservationListener.setExpectedCount(1);
sendObservation(myCode, "SNOMED-CT"); sendObservation(myCode, "SNOMED-CT");
ourObservationListener.awaitExpected();
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations); waitForSize(1, ourUpdatedObservations);
@ -44,8 +46,7 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED));
subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase).setStatus(Subscription.SubscriptionStatus.REQUESTED));
IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); initSubscriptionLoader(subs, "uuid");
initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT"); sendObservation(myCode, "SNOMED-CT");

View File

@ -19,8 +19,6 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel
private static final int MOCK_FHIR_CLIENT_FAILURES = 5; private static final int MOCK_FHIR_CLIENT_FAILURES = 5;
@Autowired @Autowired
private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider; private MockFhirClientSubscriptionProvider myMockFhirClientSubscriptionProvider;
@Autowired
private SubscriptionLoader mySubscriptionLoader;
@Before @Before
public void setFailCount() { public void setFailCount() {
@ -33,7 +31,7 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel
} }
@Test @Test
public void testSubscriptionLoaderFhirClientDown() { public void testSubscriptionLoaderFhirClientDown() throws Exception {
String payload = "application/fhir+json"; String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
@ -43,8 +41,9 @@ public class SubscriptionLoaderTest extends BaseBlockingQueueSubscribableChannel
subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria1, payload, ourListenerServerBase));
subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase)); subs.add(makeActiveSubscription(criteria2, payload, ourListenerServerBase));
IBundleProvider bundle = new SimpleBundleProvider(new ArrayList<>(subs), "uuid"); mySubscriptionActivatedPost.setExpectedCount(2);
initSubscriptionLoader(bundle); initSubscriptionLoader(subs, "uuid");
mySubscriptionActivatedPost.awaitExpected();
assertEquals(0, myMockFhirClientSubscriptionProvider.getFailCount()); assertEquals(0, myMockFhirClientSubscriptionProvider.getFailCount());
} }
} }