From c3c7d156e9017a1b9395dc50c404ba5d95ef95c9 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Sun, 20 Jan 2019 10:16:18 -0500 Subject: [PATCH] Interceptor cleanup --- .../java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java | 11 +++ .../RestHookWithInterceptorR4Test.java | 43 +++++++---- .../interceptor/api/IInterceptorRegistry.java | 5 ++ .../jpa/model/interceptor/api/Pointcut.java | 18 ++++- .../executor/InterceptorRegistry.java | 71 +++++++++++++++---- .../module/cache/SubscriptionRegistry.java | 16 +++-- 6 files changed, 127 insertions(+), 37 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index 3ef8a954096..dacf263e0ac 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -2,6 +2,8 @@ package ca.uhn.fhir.jpa.dao; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.entity.TermConcept; +import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry; +import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; @@ -51,6 +53,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -85,6 +88,8 @@ public abstract class BaseJpaTest { protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class); @Autowired protected DatabaseBackedPagingProvider myDatabaseBackedPagingProvider; + @Autowired + protected IInterceptorRegistry myInterceptorRegistry; @After public void afterPerformCleanup() { @@ -129,6 +134,12 @@ public abstract class BaseJpaTest { when(mySrd.getHeaders(eq(JpaConstants.HEADER_META_SNAPSHOT_MODE))).thenReturn(new ArrayList<>()); } + protected CountDownLatch registerLatchHookInterceptor(int theCount, Pointcut theLatchPointcut) { + CountDownLatch deliveryLatch = new CountDownLatch(theCount); + myInterceptorRegistry.registerAnonymousHookForUnitTest(theLatchPointcut, Integer.MAX_VALUE, t -> deliveryLatch.countDown()); + return deliveryLatch; + } + protected abstract FhirContext getContext(); protected abstract PlatformTransactionManager getTxManager(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java index 202c715e472..4382cf5c80f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java @@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.resthook; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.model.interceptor.api.Hook; +import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry; import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; @@ -22,9 +23,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.context.ContextConfiguration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.*; /** * Test the rest-hook subscriptions @@ -66,35 +69,43 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { public void testBeforeRestHookDelivery_ModifyResourceId() throws Exception { ourNextModifyResourceId = true; + // Create a subscription + CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); createSubscription("Observation?status=final", "application/fhir+json"); - waitForActivatedSubscriptionCount(1); + registerLatch.await(10, TimeUnit.SECONDS); + // Creating a matching resource + CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY); sendObservation(); + deliveryLatch.await(10, TimeUnit.SECONDS); - waitForSize(0, ourCreatedObservations); - waitForSize(1, ourUpdatedObservations); + assertEquals(0, ourCreatedObservations.size()); + assertEquals(1, ourUpdatedObservations.size()); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); assertEquals("Observation/A", ourUpdatedObservations.get(0).getId()); - // TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first - waitForTrue(() -> ourHitBeforeRestHookDelivery); - waitForTrue(() -> ourHitAfterRestHookDelivery); + assertTrue(ourHitBeforeRestHookDelivery); + assertTrue(ourHitAfterRestHookDelivery); } @Test public void testBeforeRestHookDelivery_AddHeader() throws Exception { ourNextAddHeader = true; + // Create a subscription + CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); createSubscription("Observation?status=final", "application/fhir+json"); - waitForActivatedSubscriptionCount(1); + registerLatch.await(10, TimeUnit.SECONDS); + // Creating a matching resource + CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY); sendObservation(); + deliveryLatch.await(10, TimeUnit.SECONDS); - waitForSize(0, ourCreatedObservations); - waitForSize(1, ourUpdatedObservations); + assertEquals(0, ourCreatedObservations.size()); + assertEquals(1, ourUpdatedObservations.size()); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - // TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first - waitForTrue(() -> ourHitBeforeRestHookDelivery); - waitForTrue(() -> ourHitAfterRestHookDelivery); + assertTrue(ourHitBeforeRestHookDelivery); + assertTrue(ourHitAfterRestHookDelivery); assertThat(ourHeaders, hasItem("X-Foo: Bar")); } @@ -103,8 +114,10 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { public void testBeforeRestHookDelivery_AbortDelivery() throws Exception { ourNextBeforeRestHookDeliveryReturn = false; + // Create a subscription + CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); createSubscription("Observation?status=final", "application/fhir+json"); - waitForActivatedSubscriptionCount(1); + registerLatch.await(10, TimeUnit.SECONDS); sendObservation(); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java index f23efbfc7bb..3c914295797 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java @@ -24,9 +24,14 @@ import com.google.common.annotations.VisibleForTesting; public interface IInterceptorRegistry { + int DEFAULT_ORDER = 0; + @VisibleForTesting void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook); + @VisibleForTesting + void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook); + @VisibleForTesting void clearAnonymousHookForUnitTest(); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java index 462d864c454..979d4331c02 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java @@ -67,9 +67,23 @@ public enum Pointcut { *
  • ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage
  • * */ - SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage") + SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage"), - ; + + /** + * Invoked immediately after an active subscription is "registered". In HAPI FHIR, when + * a subscription + *

    + * Hooks may make changes to the canonicalized subscription and this will have an effect + * on processing across this server. Note however that timing issues may occur, since the + * subscription is already technically live by the time this hook is called. + *

    + * Hooks may accept the following parameters: + * + */ + SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED("CanonicalSubscription"); private final List myParameterTypes; diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/executor/InterceptorRegistry.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/executor/InterceptorRegistry.java index 818b44b3d6d..1e4d1f53c65 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/executor/InterceptorRegistry.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/executor/InterceptorRegistry.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.model.interceptor.executor; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -66,10 +66,15 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon @Override @VisibleForTesting public void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook) { + registerAnonymousHookForUnitTest(thePointcut, DEFAULT_ORDER, theHook); + } + + @Override + public void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook) { Validate.notNull(thePointcut); Validate.notNull(theHook); - myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook)); + myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook, theOrder)); } @Override @@ -88,15 +93,27 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon myGlobalInterceptors.add(nextGlobalInterceptor); } - // Sort them - sortByOrderAnnotation(myGlobalInterceptors); - // Pull out the hook methods for (Object nextInterceptor : myGlobalInterceptors) { + + int typeOrder = DEFAULT_ORDER; + Order typeOrderAnnotation = AnnotationUtils.findAnnotation(nextInterceptor.getClass(), Order.class); + if (typeOrderAnnotation != null) { + typeOrder = typeOrderAnnotation.value(); + } + for (Method nextMethod : nextInterceptor.getClass().getDeclaredMethods()) { Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class); + if (hook != null) { - HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod); + + int methodOrder = typeOrder; + Order methodOrderAnnotation = AnnotationUtils.findAnnotation(nextMethod, Order.class); + if (methodOrderAnnotation != null) { + methodOrder = methodOrderAnnotation.value(); + } + + HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod, methodOrder); for (Pointcut nextPointcut : hook.value()) { myInvokers.put(nextPointcut, invoker); } @@ -104,6 +121,12 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon } } + // Sort everything by declared order + sortByOrderAnnotation(myGlobalInterceptors); + for (Pointcut nextPointcut : myInvokers.keys()) { + List nextInvokerList = myInvokers.get(nextPointcut); + nextInvokerList.sort(Comparator.naturalOrder()); + } } private void sortByOrderAnnotation(List theObjects) { @@ -130,10 +153,16 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon public boolean callHooks(Pointcut thePointcut, HookParams theParams) { assert haveAppropriateParams(thePointcut, theParams); - // Anonymous hooks first - List invokers = ListUtils.union( - myAnonymousInvokers.get(thePointcut), - myInvokers.get(thePointcut)); + List globalInvokers = myInvokers.get(thePointcut); + List anonymousInvokers = myAnonymousInvokers.get(thePointcut); + + List invokers = globalInvokers; + if (anonymousInvokers.isEmpty() == false) { + invokers = ListUtils.union( + anonymousInvokers, + globalInvokers); + invokers.sort(Comparator.naturalOrder()); + } /* * Call each hook in order @@ -167,14 +196,27 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon return callHooks(thePointcut, new HookParams(theParams)); } - private abstract class BaseInvoker { + private abstract class BaseInvoker implements Comparable { + + private final int myOrder; + + protected BaseInvoker(int theOrder) { + myOrder = theOrder; + } + abstract boolean invoke(HookParams theParams); + + @Override + public int compareTo(BaseInvoker o) { + return myOrder - o.myOrder; + } } private class AnonymousLambdaInvoker extends BaseInvoker { private final IAnonymousLambdaHook myHook; - public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook) { + public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook, int theOrder) { + super(theOrder); myHook = theHook; } @@ -196,7 +238,8 @@ public class InterceptorRegistry implements IInterceptorRegistry, ApplicationCon /** * Constructor */ - private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod) { + private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod, int theOrder) { + super(theOrder); myInterceptor = theInterceptor; myParameterTypes = theHookMethod.getParameterTypes(); myMethod = theHookMethod; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 363a7bcea9b..b18b4a3627e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache; */ import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry; +import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import org.apache.commons.lang3.Validate; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -37,7 +39,6 @@ import java.util.Collections; import java.util.Optional; /** - * * Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created * and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and * handlers are all caches in this registry so they can be removed it the subscription is deleted. @@ -47,7 +48,7 @@ import java.util.Optional; @Component public class SubscriptionRegistry { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class); - + private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); @Autowired SubscriptionCanonicalizer mySubscriptionCanonicalizer; @Autowired @@ -56,8 +57,8 @@ public class SubscriptionRegistry { SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; @Autowired ModelConfig myModelConfig; - - private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); + @Autowired + private IInterceptorRegistry myInterceptorRegistry; public ActiveSubscription get(String theIdPart) { return myActiveSubscriptionCache.get(theIdPart); @@ -98,6 +99,9 @@ public class SubscriptionRegistry { myActiveSubscriptionCache.put(subscriptionId, activeSubscription); + // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED + myInterceptorRegistry.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized); + return canonicalized; }