Interceptor cleanup

This commit is contained in:
James Agnew 2019-01-20 10:16:18 -05:00
parent b878925884
commit c3c7d156e9
6 changed files with 127 additions and 37 deletions
hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa
hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor
hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache

View File

@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
@ -51,6 +53,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -85,6 +88,8 @@ public abstract class BaseJpaTest {
protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class);
@Autowired
protected DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
@Autowired
protected IInterceptorRegistry myInterceptorRegistry;
@After
public void afterPerformCleanup() {
@ -129,6 +134,12 @@ public abstract class BaseJpaTest {
when(mySrd.getHeaders(eq(JpaConstants.HEADER_META_SNAPSHOT_MODE))).thenReturn(new ArrayList<>());
}
protected CountDownLatch registerLatchHookInterceptor(int theCount, Pointcut theLatchPointcut) {
CountDownLatch deliveryLatch = new CountDownLatch(theCount);
myInterceptorRegistry.registerAnonymousHookForUnitTest(theLatchPointcut, Integer.MAX_VALUE, t -> deliveryLatch.countDown());
return deliveryLatch;
}
protected abstract FhirContext getContext();
protected abstract PlatformTransactionManager getTxManager();

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.model.interceptor.api.Hook;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
@ -22,9 +23,11 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.*;
/**
* Test the rest-hook subscriptions
@ -66,35 +69,43 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
public void testBeforeRestHookDelivery_ModifyResourceId() throws Exception {
ourNextModifyResourceId = true;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals("Observation/A", ourUpdatedObservations.get(0).getId());
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
}
@Test
public void testBeforeRestHookDelivery_AddHeader() throws Exception {
ourNextAddHeader = true;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
assertThat(ourHeaders, hasItem("X-Foo: Bar"));
}
@ -103,8 +114,10 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
public void testBeforeRestHookDelivery_AbortDelivery() throws Exception {
ourNextBeforeRestHookDeliveryReturn = false;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);
sendObservation();

View File

@ -24,9 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
public interface IInterceptorRegistry {
int DEFAULT_ORDER = 0;
@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook);
@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook);
@VisibleForTesting
void clearAnonymousHookForUnitTest();

View File

@ -67,9 +67,23 @@ public enum Pointcut {
* <li>ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage</li>
* </ul>
*/
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage")
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage"),
;
/**
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
* a subscription
* <p>
* Hooks may make changes to the canonicalized subscription and this will have an effect
* on processing across this server. Note however that timing issues may occur, since the
* subscription is already technically live by the time this hook is called.
* </p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription</li>
* </ul>
*/
SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED("CanonicalSubscription");
private final List<String> myParameterTypes;

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.model.interceptor.executor;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -66,10 +66,15 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
@Override
@VisibleForTesting
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook) {
registerAnonymousHookForUnitTest(thePointcut, DEFAULT_ORDER, theHook);
}
@Override
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook) {
Validate.notNull(thePointcut);
Validate.notNull(theHook);
myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook));
myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook, theOrder));
}
@Override
@ -88,15 +93,27 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
myGlobalInterceptors.add(nextGlobalInterceptor);
}
// Sort them
sortByOrderAnnotation(myGlobalInterceptors);
// Pull out the hook methods
for (Object nextInterceptor : myGlobalInterceptors) {
int typeOrder = DEFAULT_ORDER;
Order typeOrderAnnotation = AnnotationUtils.findAnnotation(nextInterceptor.getClass(), Order.class);
if (typeOrderAnnotation != null) {
typeOrder = typeOrderAnnotation.value();
}
for (Method nextMethod : nextInterceptor.getClass().getDeclaredMethods()) {
Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class);
if (hook != null) {
HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod);
int methodOrder = typeOrder;
Order methodOrderAnnotation = AnnotationUtils.findAnnotation(nextMethod, Order.class);
if (methodOrderAnnotation != null) {
methodOrder = methodOrderAnnotation.value();
}
HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod, methodOrder);
for (Pointcut nextPointcut : hook.value()) {
myInvokers.put(nextPointcut, invoker);
}
@ -104,6 +121,12 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
}
}
// Sort everything by declared order
sortByOrderAnnotation(myGlobalInterceptors);
for (Pointcut nextPointcut : myInvokers.keys()) {
List<BaseInvoker> nextInvokerList = myInvokers.get(nextPointcut);
nextInvokerList.sort(Comparator.naturalOrder());
}
}
private void sortByOrderAnnotation(List<Object> theObjects) {
@ -130,10 +153,16 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
public boolean callHooks(Pointcut thePointcut, HookParams theParams) {
assert haveAppropriateParams(thePointcut, theParams);
// Anonymous hooks first
List<BaseInvoker> invokers = ListUtils.union(
myAnonymousInvokers.get(thePointcut),
myInvokers.get(thePointcut));
List<BaseInvoker> globalInvokers = myInvokers.get(thePointcut);
List<BaseInvoker> anonymousInvokers = myAnonymousInvokers.get(thePointcut);
List<BaseInvoker> invokers = globalInvokers;
if (anonymousInvokers.isEmpty() == false) {
invokers = ListUtils.union(
anonymousInvokers,
globalInvokers);
invokers.sort(Comparator.naturalOrder());
}
/*
* Call each hook in order
@ -167,14 +196,27 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
return callHooks(thePointcut, new HookParams(theParams));
}
private abstract class BaseInvoker {
private abstract class BaseInvoker implements Comparable<BaseInvoker> {
private final int myOrder;
protected BaseInvoker(int theOrder) {
myOrder = theOrder;
}
abstract boolean invoke(HookParams theParams);
@Override
public int compareTo(BaseInvoker o) {
return myOrder - o.myOrder;
}
}
private class AnonymousLambdaInvoker extends BaseInvoker {
private final IAnonymousLambdaHook myHook;
public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook) {
public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook, int theOrder) {
super(theOrder);
myHook = theHook;
}
@ -196,7 +238,8 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon
/**
* Constructor
*/
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod) {
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod, int theOrder) {
super(theOrder);
myInterceptor = theInterceptor;
myParameterTypes = theHookMethod.getParameterTypes();
myMethod = theHookMethod;

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -37,7 +39,6 @@ import java.util.Collections;
import java.util.Optional;
/**
*
* Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created
* and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and
* handlers are all caches in this registry so they can be removed it the subscription is deleted.
@ -47,7 +48,7 @@ import java.util.Optional;
@Component
public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@Autowired
@ -56,8 +57,8 @@ public class SubscriptionRegistry {
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
ModelConfig myModelConfig;
private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired
private IInterceptorRegistry myInterceptorRegistry;
public ActiveSubscription get(String theIdPart) {
return myActiveSubscriptionCache.get(theIdPart);
@ -98,6 +99,9 @@ public class SubscriptionRegistry {
myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
myInterceptorRegistry.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized);
return canonicalized;
}