optimistic the windows tests will now finally pass!

This commit is contained in:
Ken Stevens 2019-01-23 11:57:59 -05:00
parent 2c7eb39b29
commit 707bf07099
7 changed files with 156 additions and 64 deletions

View File

@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.model.interceptor.api;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -73,4 +75,7 @@ public class HookParams {
return myParams.values().stream().map(t -> t.getClass().getSimpleName()).collect(Collectors.toList());
}
public Collection<Object> values() {
return Collections.unmodifiableCollection(myParams.values());
}
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L%
*/
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
@ -69,4 +70,9 @@ public class ActiveSubscriptionCache {
}
}
}
@VisibleForTesting
public void clearForUnitTests() {
myCache.clear();
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -155,4 +156,9 @@ public class SubscriptionRegistry {
public int size() {
return myActiveSubscriptionCache.size();
}
@VisibleForTesting
public void clearForUnitTests() {
myActiveSubscriptionCache.clearForUnitTests();
}
}

View File

@ -1,17 +1,19 @@
package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.model.interceptor.api.IAnonymousLambdaHook;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
@ -22,7 +24,6 @@ public class PointcutLatch implements IAnonymousLambdaHook {
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
private final String name;
private Semaphore mySemaphore = new Semaphore(1);
private CountDownLatch myCountdownLatch;
private AtomicReference<String> myFailure;
private AtomicReference<List<HookParams>> myCalledWith;
@ -35,61 +36,116 @@ public class PointcutLatch implements IAnonymousLambdaHook {
this.name = theName;
}
private void countdown() {
if (myCountdownLatch == null) {
myFailure.set(name + " latch countdown() called before expectedCount set.");
} else if (myCountdownLatch.getCount() <= 0) {
myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected.");
public void setExpectedCount(int count) throws InterruptedException {
if (myCountdownLatch != null) {
throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed.");
}
ourLog.info("{} counting down {}", name, myCountdownLatch);
myCountdownLatch.countDown();
createLatch(count);
}
public void setExpectedCount(int count) throws InterruptedException {
mySemaphore.acquire();
if (myCountdownLatch != null) {
myFailure.set(name + " latch setExpectedCount() called before previous awaitExpected() completed.");
}
private void createLatch(int count) {
myFailure = new AtomicReference<>();
myCalledWith = new AtomicReference<>(new ArrayList<>());
myCountdownLatch = new CountDownLatch(count);
}
public void awaitExpected() throws InterruptedException {
awaitExpected(true);
}
public void awaitExpected(boolean release) throws InterruptedException {
awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS, release);
}
public void awaitExpectedWithTimeout(int timeoutSecond, boolean release) throws InterruptedException {
try {
assertNotNull(name + " latch awaitExpected() called before previous setExpected() called.", myCountdownLatch);
assertTrue(name + " latch timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS));
if (myFailure.get() != null) {
String error = myFailure.get();
error += "\nLatch called with values: " + myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", "));
throw new AssertionError(error);
}
} finally {
if (release) {
release();
}
private void setFailure(String failure) {
if (myFailure != null) {
myFailure.set(failure);
} else {
throw new PointcutLatchException("trying to set failure on latch that hasn't been created: " + failure);
}
}
public void release() {
private String getName() {
return name + " " + this.getClass().getSimpleName();
}
public void awaitExpected() throws InterruptedException {
awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS);
}
public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
try {
assertNotNull(getName() + " awaitExpected() called before setExpected() called.", myCountdownLatch);
assertTrue(getName() + " timed out waiting " + timeoutSecond + " seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS));
if (myFailure.get() != null) {
String error = getName() + ": " + myFailure.get();
error += "\nLatch called with values: " + myCalledWithString();
throw new AssertionError(error);
}
} finally {
destroyLatch();
}
}
public void expectNothing() {
destroyLatch();
}
private void destroyLatch() {
myCountdownLatch = null;
mySemaphore.release();
}
private String myCalledWithString() {
if (myCalledWith == null) {
return "[]";
}
List<HookParams> calledWith = myCalledWith.get();
if (calledWith.isEmpty()) {
return "[]";
}
String retVal = "[ ";
retVal += calledWith.stream().flatMap(hookParams -> hookParams.values().stream()).map(itemToString()).collect(Collectors.joining(", "));
return retVal + " ]";
}
private static Function<Object, String> itemToString() {
return object -> {
if (object instanceof IBaseResource) {
IBaseResource resource = (IBaseResource) object;
return "Resource " + resource.getIdElement().getValue();
} else if (object instanceof ResourceModifiedMessage) {
ResourceModifiedMessage resourceModifiedMessage = (ResourceModifiedMessage)object;
// FIXME KHS can we get the context from the payload?
return "ResourceModified Message { " + resourceModifiedMessage.getOperationType() + ", " + resourceModifiedMessage.getNewPayload(FhirContext.forDstu3()).getIdElement().getValue() + "}";
} else {
return object.toString();
}
};
}
@Override
public void invoke(HookParams theArgs) {
if (myCountdownLatch == null) {
throw new PointcutLatchException("countdown() called before setExpectedCount() called.", theArgs);
} else if (myCountdownLatch.getCount() <= 0) {
setFailure("countdown() called " + (1 - myCountdownLatch.getCount()) + " more times than expected.");
}
this.countdown();
if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs);
}
}
private void countdown() {
ourLog.info("{} counting down {}", name, myCountdownLatch);
myCountdownLatch.countDown();
}
private class PointcutLatchException extends IllegalStateException {
public PointcutLatchException(String message, HookParams theArgs) {
super(getName() + ": " + message + " called with values: " + hookParamsToString(theArgs));
}
public PointcutLatchException(String message) {
super(getName() + ": " + message);
}
}
private static String hookParamsToString(HookParams hookParams) {
return hookParams.values().stream().map(itemToString()).collect(Collectors.joining(", "));
}
}

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.PointcutLatch;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.rest.annotation.Create;
@ -38,6 +39,7 @@ import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
@ -51,6 +53,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
SubscriptionChannelFactory mySubscriptionChannelFactory;
@Autowired
InterceptorRegistry myInterceptorRegistry;
@Autowired
protected SubscriptionRegistry mySubscriptionRegistry;
protected String myCode = "1000000050";
@ -63,7 +68,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
protected static List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
private static SubscribableChannel ourSubscribableChannel;
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
private long idCounter = 0;
private static AtomicLong idCounter = new AtomicLong();
protected PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
@ -72,6 +77,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
mySubscriptionRegistry.clearForUnitTests();
if (ourSubscribableChannel == null) {
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
@ -85,10 +91,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
myInterceptorRegistry.clearAnonymousHookForUnitTest();
}
public <T extends IBaseResource> T sendResource(T theResource) {
public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg);
mySubscriptionMatchingPost.setExpectedCount(1);
ourSubscribableChannel.send(message);
mySubscriptionMatchingPost.awaitExpected();
return theResource;
}
@ -105,8 +113,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria(theCriteria);
++idCounter;
IdType id = new IdType("Subscription", idCounter);
IdType id = new IdType("Subscription", idCounter.incrementAndGet());
subscription.setId(id);
Subscription.SubscriptionChannelComponent channel = new Subscription.SubscriptionChannelComponent();
@ -117,10 +124,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return subscription;
}
protected Observation sendObservation(String code, String system) {
protected Observation sendObservation(String code, String system) throws InterruptedException {
Observation observation = new Observation();
++idCounter;
IdType id = new IdType("Observation", idCounter);
IdType id = new IdType("Observation", idCounter.incrementAndGet());
observation.setId(id);
CodeableConcept codeableConcept = new CodeableConcept();
@ -134,7 +140,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return sendResource(observation);
}
@BeforeClass
public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort();
@ -192,12 +197,12 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
updateLatch.setExpectedCount(count);
}
public void awaitExpected(boolean release) throws InterruptedException {
updateLatch.awaitExpected(release);
public void awaitExpected() throws InterruptedException {
updateLatch.awaitExpected();
}
public void release() {
updateLatch.release();
public void expectNothing() {
updateLatch.expectNothing();
}
}
}

View File

@ -1,10 +1,12 @@
package ca.uhn.fhir.jpa.subscription.module.subscriber;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import ca.uhn.fhir.rest.api.Constants;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.assertEquals;
@ -25,12 +27,14 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(false);
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationListener.release();
}
@Test
@ -44,12 +48,14 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(false);
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
ourObservationListener.release();
}
@Test
@ -63,10 +69,12 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(0);
mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected(true);
ourObservationListener.expectNothing();
assertEquals(0, ourContentTypes.size());
}
}

View File

@ -25,12 +25,14 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(false);
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationListener.release();
}
@Test
@ -44,12 +46,14 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected(false);
ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size());
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
ourObservationListener.release();
}
@Test
@ -63,10 +67,12 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
assertEquals(2, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(0);
mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected(true);
ourObservationListener.expectNothing();
assertEquals(0, ourContentTypes.size());
}
}