Prototype.

Not working yet.
This commit is contained in:
Ken Stevens 2019-01-16 23:35:14 -05:00
parent 584179ba39
commit 7ffbe9505b
7 changed files with 172 additions and 20 deletions

View File

@ -0,0 +1,34 @@
package ca.uhn.fhir.jpa.searchparam.interceptor;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.function.Predicate;
import static javolution.testing.TestContext.assertTrue;
@Component
public class InterceptorRegistry {
private final Map<String, List<Predicate<Object>>> interceptorMap = new HashMap<>();
public void addInterceptor(String key, Predicate<Object> interceptor) {
interceptorMap.computeIfAbsent(key, entry -> new ArrayList<>()).add(interceptor);
}
public void removeInterceptor(String key, Predicate<Object> interceptor) {
assertTrue(interceptorMap.get(key).remove(interceptor));
}
// TODO KHS this feels like it should be a one-line lambda
public Boolean trigger(String key, Object object) {
List<Predicate<Object>> predicates = interceptorMap.get(key);
if (predicates != null) {
for (Predicate<Object> predicate : predicates) {
if (!predicate.test(object)) {
return false;
}
}
}
return true;
}
}

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.searchparam.interceptor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -47,6 +48,7 @@ import java.util.Optional;
@Component
public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);
public static final String INTERCEPTOR_POST_ACTIVATED = "SubscriptionRegistry.postActivated";
@Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@ -56,6 +58,8 @@ public class SubscriptionRegistry {
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
ModelConfig myModelConfig;
@Autowired
InterceptorRegistry myInterceptorRegistry;
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@ -97,6 +101,7 @@ public class SubscriptionRegistry {
deliveryHandler.ifPresent(activeSubscription::register);
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
myInterceptorRegistry.trigger(INTERCEPTOR_POST_ACTIVATED, theSubscription);
return canonicalized;
}

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.subscription.module.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.searchparam.interceptor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
@ -44,6 +45,8 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Service
public class SubscriptionMatchingSubscriber implements MessageHandler {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
public static final String INTERCEPTOR_PRE_PROCESSED = "SubscriptionMatchingSubscriber.preProcessed";
public static final String INTERCEPTOR_POST_PROCESSED = "SubscriptionMatchingSubscriber.postProcessed";
@Autowired
private ISubscriptionMatcher mySubscriptionMatcher;
@ -51,6 +54,8 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
private FhirContext myFhirContext;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private InterceptorRegistry myInterceptorRegistry;
@Override
public void handleMessage(Message<?> theMessage) throws MessagingException {
@ -66,6 +71,9 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
}
public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
if (!myInterceptorRegistry.trigger(INTERCEPTOR_PRE_PROCESSED, theMsg)) {
return;
}
switch (theMsg.getOperationType()) {
case CREATE:
case UPDATE:
@ -135,5 +143,6 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getIdElement(myFhirContext));
}
}
myInterceptorRegistry.trigger(INTERCEPTOR_POST_PROCESSED, theMsg);
}
}

View File

@ -0,0 +1,69 @@
package ca.uhn.fhir.jpa.subscription.module;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class LatchedService implements Predicate<Object> {
private static final Logger ourLog = LoggerFactory.getLogger(LatchedService.class);
private final String name;
private CountDownLatch myCountdownLatch;
private AtomicReference<String> myFailure;
private AtomicReference<List<Object>> myCalledWith;
public LatchedService(String name) {
this.name = name;
}
public void countdown() {
if (myCountdownLatch == null) {
myFailure.set(name + " latch countdown() called before expectedCount set.");
} else if (myCountdownLatch.getCount() <= 0) {
myFailure.set(name + " latch countdown() called "+ (1 - myCountdownLatch.getCount()) + " more times than expected.");
}
ourLog.info("{} counting down {}", name, myCountdownLatch);
myCountdownLatch.countDown();
}
public void setExpectedCount(int count) {
myFailure = new AtomicReference<>();
myCalledWith = new AtomicReference<>(new ArrayList<>());
myCountdownLatch = new CountDownLatch(count);
}
public void awaitExpected() throws InterruptedException {
awaitExpectedWithTimeout(10);
}
public void awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {
assertTrue(name+" latch timed out waiting "+timeoutSecond+" seconds for latch to be triggered.", myCountdownLatch.await(timeoutSecond, TimeUnit.SECONDS));
if (myFailure.get() != null) {
String error = myFailure.get();
error += "\nLatch called with values: "+myCalledWith.get().stream().map(Object::toString).collect(Collectors.joining(", "));
throw new AssertionError(error);
}
}
@Override
public boolean test(Object object) {
this.countdown();
if (myCalledWith.get() != null) {
myCalledWith.get().add(object);
}
return true;
}
}

View File

@ -1,10 +1,14 @@
package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.searchparam.interceptor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.LatchedService;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -21,6 +25,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -36,6 +41,7 @@ import java.util.List;
public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
protected static ObservationListener ourObservationListener;
@Autowired
FhirContext myFhirContext;
@ -43,6 +49,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
StandaloneSubscriptionMessageHandler myStandaloneSubscriptionMessageHandler;
@Autowired
SubscriptionChannelFactory mySubscriptionChannelFactory;
@Autowired
InterceptorRegistry myInterceptorRegistry;
protected String myCode = "1000000050";
@ -56,6 +64,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private static SubscribableChannel ourSubscribableChannel;
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
private long idCounter = 0;
protected LatchedService mySubscriptionMatchingPost = new LatchedService(SubscriptionMatchingSubscriber.INTERCEPTOR_POST_PROCESSED);
protected LatchedService mySubscriptionActivatedPost = new LatchedService(SubscriptionRegistry.INTERCEPTOR_POST_ACTIVATED);
@Before
public void beforeReset() {
@ -66,6 +76,14 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
}
myInterceptorRegistry.addInterceptor(SubscriptionMatchingSubscriber.INTERCEPTOR_POST_PROCESSED, mySubscriptionMatchingPost);
myInterceptorRegistry.addInterceptor(SubscriptionRegistry.INTERCEPTOR_POST_ACTIVATED, mySubscriptionActivatedPost);
}
@After
public void cleanup() {
myInterceptorRegistry.removeInterceptor(SubscriptionRegistry.INTERCEPTOR_POST_ACTIVATED, mySubscriptionActivatedPost);
myInterceptorRegistry.removeInterceptor(SubscriptionMatchingSubscriber.INTERCEPTOR_POST_PROCESSED, mySubscriptionMatchingPost);
}
public <T extends IBaseResource> T sendResource(T theResource) {
@ -77,8 +95,10 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = returnedActiveSubscription(theCriteria, thePayload, theEndpoint);
return sendResource(subscription);
mySubscriptionActivatedPost.setExpectedCount(1);
Subscription retval = sendResource(subscription);
mySubscriptionActivatedPost.awaitExpected();
return retval;
}
protected Subscription returnedActiveSubscription(String theCriteria, String thePayload, String theEndpoint) {
@ -122,8 +142,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourListenerRestServer = new RestfulServer(FhirContext.forDstu3());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);
ourObservationListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(ourObservationListener);
ourListenerServer = new Server(ourListenerPort);
@ -145,6 +165,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public static class ObservationListener implements IResourceProvider {
private LatchedService updateLatch = new LatchedService("Observation Update");
@Create
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourLog.info("Received Listener Create");
@ -162,8 +184,17 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourUpdatedObservations.add(theObservation);
updateLatch.countdown();
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size());
return new MethodOutcome(new IdType("Observation/1"), false);
}
public void setExpectedCount(int count) {
updateLatch.setExpectedCount(count);
}
public void awaitExpected() throws InterruptedException {
updateLatch.awaitExpected();
}
}
}

View File

@ -25,10 +25,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
}
@ -43,10 +43,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
}
@ -61,9 +61,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(0);
mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected();
}
}

View File

@ -1,10 +1,12 @@
package ca.uhn.fhir.jpa.subscription.module.subscriber;
import ca.uhn.fhir.jpa.searchparam.interceptor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import ca.uhn.fhir.rest.api.Constants;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.Assert.assertEquals;
@ -25,10 +27,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
}
@ -43,10 +45,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
}
@ -61,9 +63,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
ourObservationListener.setExpectedCount(0);
mySubscriptionMatchingPost.setExpectedCount(1);
sendObservation(code, "SNOMED-CT");
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
mySubscriptionMatchingPost.awaitExpected();
ourObservationListener.awaitExpected();
}
}