Merge pull request #1172 from jamesagnew/ja-subscription-interceptors

Ja subscription interceptors
This commit is contained in:
James Agnew 2019-01-19 07:13:14 -06:00 committed by GitHub
commit 29c3cee287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 819 additions and 111 deletions

View File

@ -65,7 +65,11 @@ public class VersionUtil {
ourBuildNumber = p.getProperty("hapifhir.buildnumber");
ourBuildTime = p.getProperty("hapifhir.timestamp");
ourLog.info("HAPI FHIR version {} - Rev {}", ourVersion, StringUtils.right(ourBuildNumber, 10));
if (System.getProperty("suppress_hapi_fhir_version_log") == null) {
ourLog.info("HAPI FHIR version {} - Rev {}", ourVersion, StringUtils.right(ourBuildNumber, 10));
}
} catch (Exception e) {
ourLog.warn("Unable to determine HAPI version information", e);
}

View File

@ -1 +0,0 @@
version=${project.version}

View File

@ -945,6 +945,19 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
}
}
@Test
@Ignore
public void testQuery() throws IOException {
ourLog.info("** Performing Search");
HttpGet read = new HttpGet(ourServerBase + "/MedicationRequest?category=community&identifier=urn:oid:2.16.840.1.113883.3.7418.12.3%7C&intent=order&medication.code:text=calcitriol,hectorol,Zemplar,rocaltrol,vectical,vitamin%20D,doxercalciferol,paricalcitol&status=active,completed");
try (CloseableHttpResponse response = ourHttpClient.execute(read)) {
ourLog.info(response.toString());
}
ourLog.info("** DONE Performing Search");
}
@Test
public void testDeleteResourceConditional1() throws IOException {
String methodName = "testDeleteResourceConditional1";

View File

@ -89,16 +89,22 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
ourHeaders.clear();
// Delete all Subscriptions
Bundle allSubscriptions = ourClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute();
for (IBaseResource next : BundleUtil.toListOfResources(myFhirCtx, allSubscriptions)) {
ourClient.delete().resource(next).execute();
if (ourClient != null) {
Bundle allSubscriptions = ourClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute();
for (IBaseResource next : BundleUtil.toListOfResources(myFhirCtx, allSubscriptions)) {
ourClient.delete().resource(next).execute();
}
waitForActivatedSubscriptionCount(0);
}
waitForActivatedSubscriptionCount(0);
LinkedBlockingQueueSubscribableChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
processingChannel.clearInterceptorsForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}
myCountingInterceptor = new CountingInterceptor();
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
if (processingChannel != null) {
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
}
}

View File

@ -0,0 +1,165 @@
package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionHook;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.*;
/**
* Test the rest-hook subscriptions
*/
@ContextConfiguration(classes = {RestHookWithInterceptorR4Test.MyCtxConfig.class})
public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(RestHookWithInterceptorR4Test.class);
private static boolean ourNextModifyResourceId;
private static boolean ourNextBeforeRestHookDeliveryReturn;
private static boolean ourHitBeforeRestHookDelivery;
private static boolean ourNextAfterRestHookDeliveryReturn;
private static boolean ourHitAfterRestHookDelivery;
private static boolean ourNextAddHeader;
private static FhirContext ourCtx = FhirContext.forR4();
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@After
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
}
@Override
@Before
public void before() throws Exception {
super.before();
ourNextModifyResourceId = false;
ourNextAddHeader = false;
ourNextBeforeRestHookDeliveryReturn = true;
ourNextAfterRestHookDeliveryReturn = true;
ourHitBeforeRestHookDelivery = false;
ourHitAfterRestHookDelivery = false;
}
@Test
public void testBeforeRestHookDelivery_ModifyResourceId() throws Exception {
ourNextModifyResourceId = true;
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
sendObservation();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals("Observation/A", ourUpdatedObservations.get(0).getId());
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
}
@Test
public void testBeforeRestHookDelivery_AddHeader() throws Exception {
ourNextAddHeader = true;
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
sendObservation();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
assertThat(ourHeaders, hasItem("X-Foo: Bar"));
}
@Test
public void testBeforeRestHookDelivery_AbortDelivery() throws Exception {
ourNextBeforeRestHookDeliveryReturn = false;
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
sendObservation();
Thread.sleep(1000);
assertEquals(0, ourUpdatedObservations.size());
}
protected Observation sendObservation() {
Observation observation = new Observation();
observation.setStatus(Observation.ObservationStatus.FINAL);
MethodOutcome methodOutcome = ourClient.create().resource(observation).execute();
observation.setId(methodOutcome.getId());
return observation;
}
@Configuration
public static class MyCtxConfig {
@Bean
public MyInterceptor interceptor() {
return new MyInterceptor();
}
}
/**
* Interceptor class
*/
@SubscriptionInterceptor
public static class MyInterceptor {
/**
* Constructor
*/
public MyInterceptor() {
ourLog.info("Creating interceptor");
}
@SubscriptionHook(Pointcut.BEFORE_REST_HOOK_DELIVERY)
public boolean beforeRestHookDelivery(ResourceDeliveryMessage theDeliveryMessage, CanonicalSubscription theSubscription) {
if (ourNextModifyResourceId) {
theDeliveryMessage.getPayload(ourCtx).setId(new IdType("Observation/A"));
}
if (ourNextAddHeader) {
theSubscription.addHeader("X-Foo: Bar");
}
ourHitBeforeRestHookDelivery = true;
return ourNextBeforeRestHookDeliveryReturn;
}
@SubscriptionHook(Pointcut.AFTER_REST_HOOK_DELIVERY)
public boolean afterRestHookDelivery(ResourceDeliveryMessage theDeliveryMessage, CanonicalSubscription theSubscription) {
ourHitAfterRestHookDelivery = true;
return ourNextAfterRestHookDeliveryReturn;
}
}
}

View File

@ -72,12 +72,48 @@
<groupId>org.hibernate</groupId>
<artifactId>hibernate-search-orm</artifactId>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.jscience</groupId>
<artifactId>jscience</artifactId>
</dependency>
<!-- Java -->
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.api;
/**
* Value for {@link SubscriptionHook#value()}
*/
public enum Pointcut {
/**
* Invoked immediately after the delivery of a REST HOOK subscription.
* <p>
* When this hook is called, all processing is complete so this hook should not
* make any changes to the parameters.
* </p>
*/
AFTER_REST_HOOK_DELIVERY,
/**
* Invoked immediately before the delivery of a REST HOOK subscription.
* <p>
* Hooks may make changes to the delivery payload, or make changes to the
* canonical subscription such as adding headers, modifying the channel
* endpoint, etc.
* </p>
*/
BEFORE_REST_HOOK_DELIVERY;
}

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.api;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* This annotation should be placed on
* {@link SubscriptionInterceptor Subscription Interceptor}
* bean methods.
* <p>
* Methods with this annotation are invoked immediately before a REST HOOK
* subscription delivery
* </p>
*
* @see SubscriptionInterceptor
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SubscriptionHook {
/**
* Provides the specific point where this method should be invoked
*/
Pointcut[] value();
}

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.api;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* This annotation declares a bean as a subscription interceptor
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SubscriptionInterceptor {
}

View File

@ -0,0 +1,48 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.executor;
import org.apache.commons.lang3.Validate;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class HookParams {
private Map<Class<?>, Object> myParams = new HashMap<>();
/**
* Constructor
*/
public HookParams() {
}
/**
* Constructor
*/
public HookParams(Object... theParams) {
for (Object next : theParams) {
add(next);
}
}
@SuppressWarnings("unchecked")
private <T> void add(T theNext) {
Class<T> nextClass = (Class<T>) theNext.getClass();
add(nextClass, theNext);
}
public <T> HookParams add(Class<T> theType, T theParam) {
Validate.isTrue(myParams.containsKey(theType) == false, "Already have param of type %s", theType);
myParams.put(theType, theParam);
return this;
}
@SuppressWarnings("unchecked")
public <T> T get(Class<T> theParamType) {
return (T) myParams.get(theParamType);
}
Set<Class<?>> getTypes() {
return myParams.keySet();
}
}

View File

@ -0,0 +1,17 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.executor;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.Pointcut;
public interface ISubscriptionInterceptorRegistry {
/**
* Invoke the interceptor methods
*/
boolean callHooks(Pointcut thePointcut, HookParams theParams);
/**
* Invoke the interceptor methods
*/
boolean callHooks(Pointcut thePointcut, Object... theParams);
}

View File

@ -0,0 +1,159 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.executor;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionHook;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionInterceptor;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
@Component
public class SubscriptionInterceptorRegistry implements ISubscriptionInterceptorRegistry, ApplicationContextAware {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionInterceptorRegistry.class);
private ApplicationContext myAppCtx;
private List<Object> myGlobalInterceptors = new ArrayList<>();
private ListMultimap<Pointcut, Invoker> myInvokers = ArrayListMultimap.create();
List<Object> getGlobalInterceptors() {
return myGlobalInterceptors;
}
@PostConstruct
public void start() {
// Grab the global interceptors
String[] globalInterceptorNames = myAppCtx.getBeanNamesForAnnotation(SubscriptionInterceptor.class);
for (String nextName : globalInterceptorNames) {
Object nextGlobalInterceptor = myAppCtx.getBean(nextName);
myGlobalInterceptors.add(nextGlobalInterceptor);
}
// Sort them
sortByOrderAnnotation(myGlobalInterceptors);
// Pull out the hook methods
for (Object nextInterceptor : myGlobalInterceptors) {
for (Method nextMethod : nextInterceptor.getClass().getDeclaredMethods()) {
SubscriptionHook hook = AnnotationUtils.findAnnotation(nextMethod, SubscriptionHook.class);
if (hook != null) {
Invoker invoker = new Invoker(nextInterceptor, nextMethod);
for (Pointcut nextPointcut : hook.value()) {
myInvokers.put(nextPointcut, invoker);
}
}
}
}
}
private void sortByOrderAnnotation(List<Object> theObjects) {
IdentityHashMap<Object, Integer> interceptorToOrder = new IdentityHashMap<>();
for (Object next : theObjects) {
Order orderAnnotation = next.getClass().getAnnotation(Order.class);
int order = orderAnnotation != null ? orderAnnotation.value() : 0;
interceptorToOrder.put(next, order);
}
theObjects.sort((a, b) -> {
Integer orderA = interceptorToOrder.get(a);
Integer orderB = interceptorToOrder.get(b);
return orderA - orderB;
});
}
@Override
public void setApplicationContext(@Nonnull ApplicationContext theApplicationContext) throws BeansException {
myAppCtx = theApplicationContext;
}
@Override
public boolean callHooks(Pointcut thePointcut, HookParams theParams) {
/*
* Call each hook in order
*/
List<Invoker> invokers = myInvokers.get(thePointcut);
for (Invoker nextInvoker : invokers) {
boolean shouldContinue = nextInvoker.invoke(theParams);
if (!shouldContinue) {
return false;
}
}
return true;
}
@Override
public boolean callHooks(Pointcut thePointcut, Object... theParams) {
return callHooks(thePointcut, new HookParams(theParams));
}
private class Invoker {
private final Object myInterceptor;
private final boolean myReturnsBoolean;
private final Method myMethod;
private final Class<?>[] myParameterTypes;
/**
* Constructor
*/
private Invoker(@Nonnull Object theInterceptor, @Nonnull Method theHookMethod) {
myInterceptor = theInterceptor;
myParameterTypes = theHookMethod.getParameterTypes();
myMethod = theHookMethod;
Class<?> returnType = theHookMethod.getReturnType();
if (returnType.equals(boolean.class) || returnType.equals(Boolean.class)) {
myReturnsBoolean = true;
} else {
Validate.isTrue(Void.class.equals(returnType), "Method does not return boolean or void: %s", theHookMethod);
myReturnsBoolean = false;
}
}
boolean invoke(HookParams theParams) {
Object[] args = new Object[myParameterTypes.length];
for (int i = 0; i < myParameterTypes.length; i++) {
Class<?> nextParamType = myParameterTypes[i];
Object nextParamValue = theParams.get(nextParamType);
args[i] = nextParamValue;
}
// Invoke the method
try {
Object returnValue = myMethod.invoke(myInterceptor, args);
if (myReturnsBoolean) {
return (boolean) returnValue;
} else {
return true;
}
} catch (Exception e) {
ourLog.error("Failure executing interceptor method[{}]: {}", myMethod, e.toString(), e);
return true;
}
}
}
private static <T> boolean equals(Collection<T> theLhs, Collection<T> theRhs) {
return theLhs.size() == theRhs.size() && theLhs.containsAll(theRhs) && theRhs.containsAll(theLhs);
}
}

View File

@ -0,0 +1,157 @@
package ca.uhn.fhir.jpa.model.subscription.interceptor.executor;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionHook;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.SubscriptionInterceptor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.*;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {SubscriptionInterceptorRegistryTest.MyCtxConfig.class})
public class SubscriptionInterceptorRegistryTest {
private static boolean ourNext_beforeRestHookDelivery_Return2;
private static boolean ourNext_beforeRestHookDelivery_Return1;
private static List<String> ourInvocations = new ArrayList<>();
private static CanonicalSubscription ourLastCanonicalSubscription;
private static ResourceDeliveryMessage ourLastResourceDeliveryMessage;
@Autowired
private SubscriptionInterceptorRegistry mySubscriptionInterceptorRegistry;
@Test
public void testGlobalInterceptorsAreFound() {
List<Object> globalInterceptors = mySubscriptionInterceptorRegistry.getGlobalInterceptors();
assertEquals(2, globalInterceptors.size());
assertTrue(globalInterceptors.get(0).getClass().toString(), globalInterceptors.get(0) instanceof MyInterceptorOne);
assertTrue(globalInterceptors.get(1).getClass().toString(), globalInterceptors.get(1) instanceof MyInterceptorTwo);
}
@Test
public void testInvokeGlobalInterceptorMethods() {
ResourceDeliveryMessage msg = new ResourceDeliveryMessage();
CanonicalSubscription subs = new CanonicalSubscription();
HookParams params = new HookParams(msg, subs);
boolean outcome = mySubscriptionInterceptorRegistry.callHooks(Pointcut.BEFORE_REST_HOOK_DELIVERY, params);
assertTrue(outcome);
assertThat(ourInvocations, contains("MyInterceptorOne.beforeRestHookDelivery", "MyInterceptorTwo.beforeRestHookDelivery"));
assertSame(msg, ourLastResourceDeliveryMessage);
assertSame(subs, ourLastCanonicalSubscription);
}
@Test
public void testInvokeGlobalInterceptorMethods_MethodAbortsProcessing() {
ourNext_beforeRestHookDelivery_Return1 = false;
ResourceDeliveryMessage msg = new ResourceDeliveryMessage();
CanonicalSubscription subs = new CanonicalSubscription();
HookParams params = new HookParams(msg, subs);
boolean outcome = mySubscriptionInterceptorRegistry.callHooks(Pointcut.BEFORE_REST_HOOK_DELIVERY, params);
assertFalse(outcome);
assertThat(ourInvocations, contains("MyInterceptorOne.beforeRestHookDelivery"));
}
@Test
public void testCallHooksInvokedWithWrongParameters() {
Integer msg = 123;
CanonicalSubscription subs = new CanonicalSubscription();
HookParams params = new HookParams(msg, subs);
try {
mySubscriptionInterceptorRegistry.callHooks(Pointcut.BEFORE_REST_HOOK_DELIVERY, params);
fail();
} catch (AssertionError e) {
// good
}
}
@Before
public void before() {
ourNext_beforeRestHookDelivery_Return1 = true;
ourNext_beforeRestHookDelivery_Return2 = true;
ourLastCanonicalSubscription = null;
ourLastResourceDeliveryMessage = null;
ourInvocations.clear();
}
@Configuration
@ComponentScan(basePackages = "ca.uhn.fhir.jpa.model")
public static class MyCtxConfig {
@Bean
public SubscriptionInterceptorRegistry subscriptionInterceptorRegistry() {
return new SubscriptionInterceptorRegistry();
}
/**
* Note: Orders are deliberately reversed to make sure we get the orders right
* using the @Order annotation
*/
@Bean
public MyInterceptorTwo interceptor1() {
return new MyInterceptorTwo();
}
/**
* Note: Orders are deliberately reversed to make sure we get the orders right
* using the @Order annotation
*/
@Bean
public MyInterceptorOne interceptor2() {
return new MyInterceptorOne();
}
}
@SubscriptionInterceptor
@Order(100)
public static class MyInterceptorOne {
@SubscriptionHook(Pointcut.BEFORE_REST_HOOK_DELIVERY)
public boolean beforeRestHookDelivery(CanonicalSubscription theCanonicalSubscription) {
ourLastCanonicalSubscription = theCanonicalSubscription;
ourInvocations.add("MyInterceptorOne.beforeRestHookDelivery");
return ourNext_beforeRestHookDelivery_Return1;
}
}
@SubscriptionInterceptor
@Order(200)
public static class MyInterceptorTwo {
@SubscriptionHook(Pointcut.BEFORE_REST_HOOK_DELIVERY)
public boolean beforeRestHookDelivery(ResourceDeliveryMessage theResourceDeliveryMessage) {
ourLastResourceDeliveryMessage = theResourceDeliveryMessage;
ourInvocations.add("MyInterceptorTwo.beforeRestHookDelivery");
return ourNext_beforeRestHookDelivery_Return2;
}
}
/**
* Just a make-believe version of this class for the unit test
*/
private static class CanonicalSubscription {
}
/**
* Just a make-believe version of this class for the unit test
*/
private static class ResourceDeliveryMessage {
}
}

View File

@ -87,36 +87,6 @@
<artifactId>javax.annotation-api</artifactId>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.jscience</groupId>
<artifactId>jscience</artifactId>
</dependency>
<!-- Java -->
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<!-- Testing -->
<dependency>

View File

@ -28,9 +28,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.EventDefinition;
import org.hl7.fhir.r4.model.Subscription;
import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.*;
@ -38,7 +38,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class CanonicalSubscription implements Serializable {
public class CanonicalSubscription implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@ -65,6 +65,13 @@ public class CanonicalSubscription implements Serializable {
@JsonProperty("extensions")
private Map<String, String> myChannelExtensions;
/**
* Constructor
*/
public CanonicalSubscription() {
super();
}
/**
* For now we're using the R4 TriggerDefinition, but this
* may change in the future when things stabilize
@ -105,8 +112,9 @@ public class CanonicalSubscription implements Serializable {
myEndpointUrl = theEndpointUrl;
}
@Nonnull
public List<String> getHeaders() {
return Collections.unmodifiableList(myHeaders);
return myHeaders != null ? Collections.unmodifiableList(myHeaders) : Collections.emptyList();
}
public void setHeaders(List<? extends IPrimitiveType<String>> theHeader) {
@ -131,7 +139,7 @@ public class CanonicalSubscription implements Serializable {
public void setChannelExtensions(Map<String, String> theChannelExtensions) {
myChannelExtensions = new HashMap<>();
for (String url: theChannelExtensions.keySet()) {
for (String url : theChannelExtensions.keySet()) {
if (isNotBlank(url) && isNotBlank(theChannelExtensions.get(url))) {
myChannelExtensions.put(url, theChannelExtensions.get(url));
}
@ -228,14 +236,40 @@ public class CanonicalSubscription implements Serializable {
}
}
/**
* Adds a header
*
* @param theHeader The header, e.g. "Authorization: Bearer AAAAA"
*/
public void addHeader(String theHeader) {
if (isNotBlank(theHeader)) {
initHeaders();
myHeaders.add(theHeader);
}
}
private void initHeaders() {
if (myHeaders == null) {
myHeaders = new ArrayList<>();
}
}
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public static class EmailDetails {
@JsonProperty("from")
private String myFrom;
@JsonProperty("subjectTemplate")
private String mySubjectTemplate;
/**
* Construcor
*/
public EmailDetails() {
super();
}
public String getFrom() {
return myFrom;
}
@ -256,11 +290,19 @@ public class CanonicalSubscription implements Serializable {
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public static class RestHookDetails {
@JsonProperty("stripVersionId")
private boolean myStripVersionId;
@JsonProperty("deliverLatestVersion")
private boolean myDeliverLatestVersion;
/**
* Constructor
*/
public RestHookDetails() {
super();
}
public boolean isDeliverLatestVersion() {
return myDeliverLatestVersion;
}
@ -305,9 +347,13 @@ public class CanonicalSubscription implements Serializable {
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public static class CanonicalEventDefinition {
public CanonicalEventDefinition(EventDefinition theDef) {
/**
* Constructor
*/
public CanonicalEventDefinition() {
// nothing yet
}
}
}

View File

@ -33,12 +33,11 @@ import org.hl7.fhir.instance.model.api.IIdType;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@SuppressWarnings("WeakerAccess")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class ResourceDeliveryMessage implements IResourceMessage {
private static final long serialVersionUID = 1L;
@JsonIgnore
private transient CanonicalSubscription mySubscription;
@JsonProperty("subscription")
@ -52,10 +51,6 @@ public class ResourceDeliveryMessage implements IResourceMessage {
@JsonProperty("operationType")
private ResourceModifiedMessage.OperationTypeEnum myOperationType;
public ResourceModifiedMessage.OperationTypeEnum getOperationType() {
return myOperationType;
}
/**
* Constructor
*/
@ -63,6 +58,10 @@ public class ResourceDeliveryMessage implements IResourceMessage {
super();
}
public ResourceModifiedMessage.OperationTypeEnum getOperationType() {
return myOperationType;
}
public void setOperationType(ResourceModifiedMessage.OperationTypeEnum theOperationType) {
myOperationType = theOperationType;
}
@ -104,14 +103,15 @@ public class ResourceDeliveryMessage implements IResourceMessage {
myPayloadId = thePayload.getIdElement().toUnqualified().getValue();
}
@Override
public String getPayloadId() {
return myPayloadId;
}
public void setPayloadId(IIdType thePayloadId) {
myPayloadId = null;
if (thePayloadId != null) {
myPayloadId = thePayloadId.getValue();
}
}
@Override
public String getPayloadId() {
return myPayloadId;
}
}

View File

@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber;
*/
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.model.subscription.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.subscription.interceptor.executor.ISubscriptionInterceptorRegistry;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.client.api.*;
@ -46,14 +48,17 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Component
@Scope("prototype")
public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDeliverySubscriber {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
@Autowired
IResourceRetriever myResourceRetriever;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
@Autowired
private ISubscriptionInterceptorRegistry mySubscriptionInterceptorRegistry;
protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) {
IBaseResource payloadResource = getAndMassagePayload(theMsg, theSubscription);
if (payloadResource == null) return;
if (payloadResource == null) {
return;
}
doDelivery(theMsg, theSubscription, thePayloadType, theClient, payloadResource);
}
@ -62,32 +67,16 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
IClientExecutable<?, ?> operation;
switch (theMsg.getOperationType()) {
case CREATE:
if (thePayloadResource == null || thePayloadResource.isEmpty()) {
if (thePayloadType != null ) {
operation = theClient.create().resource(thePayloadResource);
} else {
sendNotification(theMsg);
return;
}
} else {
if (thePayloadType != null ) {
operation = theClient.update().resource(thePayloadResource);
} else {
sendNotification(theMsg);
return;
}
}
break;
case UPDATE:
if (thePayloadResource == null || thePayloadResource.isEmpty()) {
if (thePayloadType != null ) {
if (thePayloadType != null) {
operation = theClient.create().resource(thePayloadResource);
} else {
sendNotification(theMsg);
return;
}
} else {
if (thePayloadType != null ) {
if (thePayloadType != null) {
operation = theClient.update().resource(thePayloadResource);
} else {
sendNotification(theMsg);
@ -112,7 +101,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
try {
operation.execute();
} catch (ResourceNotFoundException e) {
ourLog.error("Cannot reach "+ theMsg.getSubscription().getEndpointUrl());
ourLog.error("Cannot reach " + theMsg.getSubscription().getEndpointUrl());
e.printStackTrace();
throw e;
}
@ -141,47 +130,57 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
@Override
public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException {
CanonicalSubscription subscription = theMessage.getSubscription();
CanonicalSubscription subscription = theMessage.getSubscription();
// Grab the endpoint from the subscription
String endpointUrl = subscription.getEndpointUrl();
// Interceptor call: BEFORE_REST_HOOK_DELIVERY
if (!mySubscriptionInterceptorRegistry.callHooks(Pointcut.BEFORE_REST_HOOK_DELIVERY, theMessage, subscription)) {
return;
}
// Grab the payload type (encoding mimetype) from the subscription
String payloadString = subscription.getPayloadString();
EncodingEnum payloadType = null;
if(payloadString != null) {
if (payloadString.contains(";")) {
payloadString = payloadString.substring(0, payloadString.indexOf(';'));
}
payloadString = payloadString.trim();
payloadType = EncodingEnum.forContentType(payloadString);
// Grab the endpoint from the subscription
String endpointUrl = subscription.getEndpointUrl();
// Grab the payload type (encoding mimetype) from the subscription
String payloadString = subscription.getPayloadString();
EncodingEnum payloadType = null;
if (payloadString != null) {
if (payloadString.contains(";")) {
payloadString = payloadString.substring(0, payloadString.indexOf(';'));
}
payloadString = payloadString.trim();
payloadType = EncodingEnum.forContentType(payloadString);
}
// Create the client request
myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
IGenericClient client = null;
if (isNotBlank(endpointUrl)) {
client = myFhirContext.newRestfulGenericClient(endpointUrl);
// Create the client request
myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
IGenericClient client = null;
if (isNotBlank(endpointUrl)) {
client = myFhirContext.newRestfulGenericClient(endpointUrl);
// Additional headers specified in the subscription
List<String> headers = subscription.getHeaders();
for (String next : headers) {
if (isNotBlank(next)) {
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next));
}
// Additional headers specified in the subscription
List<String> headers = subscription.getHeaders();
for (String next : headers) {
if (isNotBlank(next)) {
client.registerInterceptor(new SimpleRequestHeaderInterceptor(next));
}
}
}
deliverPayload(theMessage, subscription, payloadType, client);
// Interceptor call: AFTER_REST_HOOK_DELIVERY
if (!mySubscriptionInterceptorRegistry.callHooks(Pointcut.AFTER_REST_HOOK_DELIVERY, theMessage, subscription)) {
//noinspection UnnecessaryReturnStatement
return;
}
deliverPayload(theMessage, subscription, payloadType, client);
}
/**
* Sends a POST notification without a payload
* @param theMsg
*/
protected void sendNotification(ResourceDeliveryMessage theMsg) {
Map<String, List<String>> params = new HashMap();
Map<String, List<String>> params = new HashMap<>();
List<Header> headers = new ArrayList<>();
if (theMsg.getSubscription().getHeaders() != null) {
theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> {
@ -204,7 +203,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
// close connection in order to return a possible cached connection to the connection pool
response.close();
} catch (IOException e) {
ourLog.error("Error trying to reach "+ theMsg.getSubscription().getEndpointUrl());
ourLog.error("Error trying to reach " + theMsg.getSubscription().getEndpointUrl());
e.printStackTrace();
throw new ResourceNotFoundException(e.getMessage());
}

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionMatchResult;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -106,11 +107,15 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
ourLog.info("Subscription {} was matched by resource {} using matcher {}", nextActiveSubscription.getSubscription().getIdElement(myFhirContext).getValue(), resourceId.toUnqualifiedVersionless().getValue(), matchResult.matcherShortName());
IBaseResource payload = theMsg.getNewPayload(myFhirContext);
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPayload(myFhirContext, theMsg.getNewPayload(myFhirContext));
deliveryMsg.setPayload(myFhirContext, payload);
deliveryMsg.setSubscription(nextActiveSubscription.getSubscription());
deliveryMsg.setOperationType(theMsg.getOperationType());
deliveryMsg.setPayloadId(theMsg.getId(myFhirContext));
if (payload == null) {
deliveryMsg.setPayloadId(theMsg.getId(myFhirContext));
}
ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
MessageChannel deliveryChannel = nextActiveSubscription.getSubscribableChannel();

View File

@ -0,0 +1,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] - %msg%n
</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>