Merge branch 'master' into windows-fixes

# Conflicts:
#	hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java
#	hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java
This commit is contained in:
Ken Stevens 2019-01-20 17:04:16 -05:00
commit 10aa6c9f07
6 changed files with 154 additions and 57 deletions

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
@ -51,6 +53,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -85,6 +88,8 @@ public abstract class BaseJpaTest {
protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class);
@Autowired
protected DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
@Autowired
protected IInterceptorRegistry myInterceptorRegistry;
@After
public void afterPerformCleanup() {
@ -129,6 +134,12 @@ public abstract class BaseJpaTest {
when(mySrd.getHeaders(eq(JpaConstants.HEADER_META_SNAPSHOT_MODE))).thenReturn(new ArrayList<>());
}
protected CountDownLatch registerLatchHookInterceptor(int theCount, Pointcut theLatchPointcut) {
CountDownLatch deliveryLatch = new CountDownLatch(theCount);
myInterceptorRegistry.registerAnonymousHookForUnitTest(theLatchPointcut, Integer.MAX_VALUE, t -> deliveryLatch.countDown());
return deliveryLatch;
}
protected abstract FhirContext getContext();
protected abstract PlatformTransactionManager getTxManager();

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.model.interceptor.api.Hook;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
@ -22,9 +23,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.*;
/**
* Test the rest-hook subscriptions
@ -66,35 +69,43 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
public void testBeforeRestHookDelivery_ModifyResourceId() throws Exception {
ourNextModifyResourceId = true;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals("Observation/A", ourUpdatedObservations.get(0).getId());
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
}
@Test
public void testBeforeRestHookDelivery_AddHeader() throws Exception {
ourNextAddHeader = true;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
assertThat(ourHeaders, hasItem("X-Foo: Bar"));
}
@ -103,8 +114,10 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
public void testBeforeRestHookDelivery_AbortDelivery() throws Exception {
ourNextBeforeRestHookDeliveryReturn = false;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
sendObservation();

View File

@ -24,12 +24,25 @@ import com.google.common.annotations.VisibleForTesting;
public interface IInterceptorRegistry {
int DEFAULT_ORDER = 0;
@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook);
@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook);
@VisibleForTesting
void clearAnonymousHookForUnitTest();
/**
* Register an interceptor
*
* @param theInterceptor The interceptor to register
* @return Returns <code>true</code> if at least one valid hook method was found on this interceptor
*/
boolean registerGlobalInterceptor(Object theInterceptor);
/**
* Invoke the interceptor methods
*/

View File

@ -69,10 +69,21 @@ public enum Pointcut {
*/
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage"),
// FIXME KHS
SUBSCRIPTION_AFTER_SUBSCRIPTION_MATCHING("ResourceModifiedMessage"),
// FIXME KHS
SUBSCRIPTION_AFTER_SUBSCRIPTION_ACTIVATED("CanonicalSubscription");
/**
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
* a subscription
* <p>
* Hooks may make changes to the canonicalized subscription and this will have an effect
* on processing across this server. Note however that timing issues may occur, since the
* subscription is already technically live by the time this hook is called.
* </p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription</li>
* </ul>
*/
SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED("CanonicalSubscription");
private final List<String> myParameterTypes;

View File

@ -46,9 +46,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class InterceptorRegistry implements IInterceptorRegistry, ApplicationContextAware {
private static final Logger ourLog = LoggerFactory.getLogger(InterceptorRegistry.class);
private ApplicationContext myAppCtx;
private List<Object> myGlobalInterceptors = new ArrayList<>();
private ListMultimap<Pointcut, BaseInvoker> myInvokers = ArrayListMultimap.create();
private ListMultimap<Pointcut, BaseInvoker> myAnonymousInvokers = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private final List<Object> myGlobalInterceptors = new ArrayList<>();
private final ListMultimap<Pointcut, BaseInvoker> myInvokers = ArrayListMultimap.create();
private final ListMultimap<Pointcut, BaseInvoker> myAnonymousInvokers = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
/**
* Constructor
@ -66,10 +66,15 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
@Override
@VisibleForTesting
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook) {
registerAnonymousHookForUnitTest(thePointcut, DEFAULT_ORDER, theHook);
}
@Override
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook) {
Validate.notNull(thePointcut);
Validate.notNull(theHook);
myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook));
myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook, theOrder));
}
@Override
@ -84,26 +89,53 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
// Grab the global interceptors
String[] globalInterceptorNames = myAppCtx.getBeanNamesForAnnotation(Interceptor.class);
for (String nextName : globalInterceptorNames) {
Object nextGlobalInterceptor = myAppCtx.getBean(nextName);
myGlobalInterceptors.add(nextGlobalInterceptor);
Object nextInterceptor = myAppCtx.getBean(nextName);
registerGlobalInterceptor(nextInterceptor);
}
// Sort them
sortByOrderAnnotation(myGlobalInterceptors);
}
// Pull out the hook methods
for (Object nextInterceptor : myGlobalInterceptors) {
for (Method nextMethod : nextInterceptor.getClass().getDeclaredMethods()) {
@Override
public boolean registerGlobalInterceptor(Object theInterceptor) {
boolean retVal = false;
int typeOrder = DEFAULT_ORDER;
Order typeOrderAnnotation = AnnotationUtils.findAnnotation(theInterceptor.getClass(), Order.class);
if (typeOrderAnnotation != null) {
typeOrder = typeOrderAnnotation.value();
}
for (Method nextMethod : theInterceptor.getClass().getDeclaredMethods()) {
Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class);
if (hook != null) {
HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod);
int methodOrder = typeOrder;
Order methodOrderAnnotation = AnnotationUtils.findAnnotation(nextMethod, Order.class);
if (methodOrderAnnotation != null) {
methodOrder = methodOrderAnnotation.value();
}
HookInvoker invoker = new HookInvoker(hook, theInterceptor, nextMethod, methodOrder);
for (Pointcut nextPointcut : hook.value()) {
myInvokers.put(nextPointcut, invoker);
}
}
retVal = true;
}
}
myGlobalInterceptors.add(theInterceptor);
// Make sure we're always sorted according to the order declared in
// @Order
sortByOrderAnnotation(myGlobalInterceptors);
for (Pointcut nextPointcut : myInvokers.keys()) {
List<BaseInvoker> nextInvokerList = myInvokers.get(nextPointcut);
nextInvokerList.sort(Comparator.naturalOrder());
}
return retVal;
}
private void sortByOrderAnnotation(List<Object> theObjects) {
@ -130,10 +162,16 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
public boolean callHooks(Pointcut thePointcut, HookParams theParams) {
assert haveAppropriateParams(thePointcut, theParams);
// Anonymous hooks first
List<BaseInvoker> invokers = ListUtils.union(
myAnonymousInvokers.get(thePointcut),
myInvokers.get(thePointcut));
List<BaseInvoker> globalInvokers = myInvokers.get(thePointcut);
List<BaseInvoker> anonymousInvokers = myAnonymousInvokers.get(thePointcut);
List<BaseInvoker> invokers = globalInvokers;
if (anonymousInvokers.isEmpty() == false) {
invokers = ListUtils.union(
anonymousInvokers,
globalInvokers);
invokers.sort(Comparator.naturalOrder());
}
/*
* Call each hook in order
@ -167,14 +205,27 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
return callHooks(thePointcut, new HookParams(theParams));
}
private abstract class BaseInvoker {
private abstract class BaseInvoker implements Comparable<BaseInvoker> {
private final int myOrder;
protected BaseInvoker(int theOrder) {
myOrder = theOrder;
}
abstract boolean invoke(HookParams theParams);
@Override
public int compareTo(BaseInvoker o) {
return myOrder - o.myOrder;
}
}
private class AnonymousLambdaInvoker extends BaseInvoker {
private final IAnonymousLambdaHook myHook;
public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook) {
public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook, int theOrder) {
super(theOrder);
myHook = theHook;
}
@ -196,7 +247,8 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
/**
* Constructor
*/
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod) {
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod, int theOrder) {
super(theOrder);
myInterceptor = theInterceptor;
myParameterTypes = theHookMethod.getParameterTypes();
myMethod = theHookMethod;

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.model.interceptor.executor.InterceptorRegistry;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.Optional;
/**
*
* Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created
* and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and
* handlers are all caches in this registry so they can be removed it the subscription is deleted.
@ -49,8 +48,7 @@ import java.util.Optional;
@Component
public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);
public static final String INTERCEPTOR_POST_ACTIVATED = "SubscriptionRegistry.postActivated";
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@Autowired
@ -60,9 +58,7 @@ public class SubscriptionRegistry {
@Autowired
ModelConfig myModelConfig;
@Autowired
InterceptorRegistry myInterceptorRegistry;
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
private IInterceptorRegistry myInterceptorRegistry;
public ActiveSubscription get(String theIdPart) {
return myActiveSubscriptionCache.get(theIdPart);
@ -103,7 +99,8 @@ public class SubscriptionRegistry {
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
myInterceptorRegistry.callHooks(Pointcut.SUBSCRIPTION_AFTER_SUBSCRIPTION_ACTIVATED, canonicalized);
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
myInterceptorRegistry.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized);
return canonicalized;
}