From 87a6c24bded14d7f9e161038d3dc97c4c581e3d5 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Mon, 4 Feb 2019 13:49:10 -0500 Subject: [PATCH 1/3] More subscription logger work --- .../subscription/BaseSubscriptionsR4Test.java | 2 +- .../RestHookWithInterceptorR4Test.java | 32 ++++- .../interceptor/api/IInterceptorRegistry.java | 1 - .../jpa/model/interceptor/api/Pointcut.java | 35 ++++- .../SubscriptionDebugLogInterceptor.java | 136 +++++++++++++----- .../BaseSubscriptionDeliverySubscriber.java | 27 ++-- 6 files changed, 175 insertions(+), 58 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java index 46f47e283a0..98d545032c7 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java @@ -54,7 +54,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); private static String ourListenerServerBase; - private List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); + protected List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); @After diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java index d796eadd75b..dc06023c876 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInter import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.util.PortUtil; import org.apache.commons.lang3.Validate; import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.Observation; @@ -170,6 +171,33 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { assertEquals(0, ourUpdatedObservations.size()); } + + @Test + public void testDeliveryFailed() throws Exception { + ourNextBeforeRestHookDeliveryReturn = false; + + // Create a subscription + CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); + Subscription subscription = newSubscription("Observation?status=final", "application/fhir+json"); + subscription.getChannel().setEndpoint("http://localhost:" + PortUtil.findFreePort()); // this better not succeed! + + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + mySubscriptionIds.add(methodOutcome.getId()); + + registerLatch.await(10, TimeUnit.SECONDS); + + CountDownLatch latch = new CountDownLatch(1); + myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, params -> { + latch.countDown(); + }); + + sendObservation(); + + latch.await(10, TimeUnit.SECONDS); + } + + protected Observation sendObservation() { Observation observation = new Observation(); observation.setStatus(Observation.ObservationStatus.FINAL); @@ -192,7 +220,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { SubscriptionDebugLogInterceptor interceptor = new SubscriptionDebugLogInterceptor(); myInterceptorRegistry.registerInterceptor(interceptor); - SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(loggerMock, Level.DEBUG); + SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(t -> loggerMock, Level.DEBUG); myInterceptorRegistry.registerInterceptor(interceptor2); try { @@ -232,7 +260,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n "))); - assertThat(messages.get(messages.size() - 1), matchesPattern("\\[SUBS50\\] Finished delivery of resource Observation.*")); + assertThat(messages.get(messages.size() - 1), matchesPattern("Finished delivery of resource Observation.*")); } finally { myInterceptorRegistry.unregisterInterceptor(interceptor); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java index f302c316d39..af5a9258911 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java @@ -53,7 +53,6 @@ public interface IInterceptorRegistry { @Deprecated void unregisterGlobalInterceptor(Object theInterceptor); - @VisibleForTesting void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java index 70f0e263b61..745a8a29151 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java @@ -124,6 +124,28 @@ public enum Pointcut { */ SUBSCRIPTION_AFTER_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + /** + * Invoked immediately after the attempted delivery of a subscription, if the delivery + * failed. + *

+ * Hooks may accept the following parameters: + *

+ *
    + *
  • java.lang.Exception - The exception that caused the failure
  • + *
  • ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription
  • + *
  • ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage
  • + *
+ *

+ * Hooks may return void or may return a boolean. If the method returns + * void or true, processing will continue normally, meaning that + * an exception will be thrown by the delivery mechanism. This typically means that the + * message will be returned to the processing queue. If the method + * returns false, processing will be aborted and no further action will be + * taken for the delivery. + *

+ */ + SUBSCRIPTION_AFTER_DELIVERY_FAILED("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + /** * Invoked immediately after the delivery of a REST HOOK subscription. *

@@ -139,7 +161,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription","ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), /** * Invoked immediately before the delivery of a REST HOOK subscription. @@ -159,7 +181,7 @@ public enum Pointcut { * returns false, processing will be aborted. *

*/ - SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription","ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), /** * Invoked whenever a persisted resource (a resource that has just been stored in the @@ -179,6 +201,7 @@ public enum Pointcut { */ SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), + /** * Invoked whenever a persisted resource (a resource that has just been stored in the * database via a create/update/patch/etc.) has been checked for whether any subscriptions @@ -195,7 +218,6 @@ public enum Pointcut { */ SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), - /** * Invoked immediately after an active subscription is "registered". In HAPI FHIR, when * a subscription @@ -287,7 +309,8 @@ public enum Pointcut { * Hooks should return void. *

*/ - OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), + OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource","org.hl7.fhir.instance.model.api.IBaseResource"), + /** * Invoked before a resource will be updated, immediately before the resource @@ -307,9 +330,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), - - ; + OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource","org.hl7.fhir.instance.model.api.IBaseResource"); private final List myParameterTypes; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java index a6cc377fe68..aff3ddb019d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java @@ -12,34 +12,58 @@ import org.slf4j.LoggerFactory; import org.slf4j.event.Level; import java.util.Date; +import java.util.EnumMap; +import java.util.function.Function; /** * This interceptor can be used for troubleshooting subscription processing. It provides very * detailed logging about the subscription processing pipeline. + *

+ * This interceptor loges each step in the processing pipeline with a + * different event code, using the event codes itemized in + * {@link EventCodeEnum}. By default these are each placed in a logger with + * a different name (e.g. ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor.SUBS20 + * in order to facilitate fine-grained logging controls where some codes are omitted and + * some are not. + *

+ *

+ * A custom log factory can also be passed in, in which case the logging + * creation may use another strategy. + *

* * @since 3.7.0 + * @see EventCodeEnum */ @Interceptor public class SubscriptionDebugLogInterceptor { - private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class); private static final String SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK = "SubscriptionDebugLogInterceptor_precheck"; - private final Logger myLogger; private final Level myLevel; + private final EnumMap myLoggers; /** * Constructor that logs at INFO level to the logger ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor */ public SubscriptionDebugLogInterceptor() { - this(DEFAULT_LOGGER, Level.INFO); + this(defaultLogFactory(), Level.INFO); } /** * Constructor using a specific logger */ - public SubscriptionDebugLogInterceptor(Logger theLog, Level theLevel) { - myLogger = theLog; + public SubscriptionDebugLogInterceptor(Function theLogFactory, Level theLevel) { myLevel = theLevel; + myLoggers = new EnumMap<>(EventCodeEnum.class); + for (EventCodeEnum next : EventCodeEnum.values()) { + myLoggers.put(next, theLogFactory.apply(next)); + } + } + + @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED) + public void step10_resourceModified(ResourceModifiedMessage theMessage) { + String value = Long.toString(System.currentTimeMillis()); + theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value); + log(EventCodeEnum.SUBS10, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType()); } /* @@ -52,31 +76,29 @@ public class SubscriptionDebugLogInterceptor { * gaps to add things if we ever need them. */ - @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED) - public void step10_resourceModified(ResourceModifiedMessage theMessage) { - String value = Long.toString(System.currentTimeMillis()); - theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value); - log("SUBS10","Resource {} was submitted to the processing pipeline", theMessage.getPayloadId(), theMessage.getOperationType()); - } - @Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED) public void step20_beforeChecked(ResourceModifiedMessage theMessage) { - log("SUBS20","Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); + log(EventCodeEnum.SUBS20, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED) public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) { - log("SUBS30","Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); + log(EventCodeEnum.SUBS30, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS) - public void step35_subscriptionMatched(ResourceModifiedMessage theMessage) { - log("SUBS35","Resource {} did not match any subscriptions", theMessage.getPayloadId()); + public void step35_subscriptionNotMatched(ResourceModifiedMessage theMessage) { + log(EventCodeEnum.SUBS35, "Resource {} did not match any subscriptions", theMessage.getPayloadId()); } @Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY) public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) { - log("SUBS40","Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); + log(EventCodeEnum.SUBS40, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); + } + + @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED) + public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) { + log(EventCodeEnum.SUBS45, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString()); } @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY) @@ -88,28 +110,70 @@ public class SubscriptionDebugLogInterceptor { .map(start -> new StopWatch(start).toString()) .orElse("(unknown)"); - log("SUBS50","Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); + log(EventCodeEnum.SUBS50, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); } - private void log(String theCode, String theMessage, Object... theArguments) { - String msg = "[" + theCode + "] " + theMessage; - switch (myLevel) { - case ERROR: - myLogger.error(msg, theArguments); - break; - case WARN: - myLogger.warn(msg, theArguments); - break; - case INFO: - myLogger.info(msg, theArguments); - break; - case DEBUG: - myLogger.debug(msg, theArguments); - break; - case TRACE: - myLogger.trace(msg, theArguments); - break; + protected void log(EventCodeEnum theEventCode, String theMessage, Object... theArguments) { + Logger logger = myLoggers.get(theEventCode); + if (logger != null) { + switch (myLevel) { + case ERROR: + logger.error(theMessage, theArguments); + break; + case WARN: + logger.warn(theMessage, theArguments); + break; + case INFO: + logger.info(theMessage, theArguments); + break; + case DEBUG: + logger.debug(theMessage, theArguments); + break; + case TRACE: + logger.trace(theMessage, theArguments); + break; + } } } + public enum EventCodeEnum { + /** + * A new/updated resource has been submitted to the processing pipeline and is about + * to be placed on the matchign queue. + */ + SUBS10, + /** + * A resources has been dequeued from the matching queue and is about to be checked + * for any matching subscriptions. + */ + SUBS20, + /** + * The resource has matched a subscription (logged once for each matching subscription) + * and is about to be queued for delivery. + */ + SUBS30, + /** + * The resource did not match any subscriptions and processing is complete. + */ + SUBS35, + /** + * The resource has been dequeued from the delivery queue and is about to be + * delivered. + */ + SUBS40, + /** + * Delivery failed + */ + SUBS45, + /** + * Delivery is now complete and processing is finished. + */ + SUBS50 + } + + + private static Function defaultLogFactory() { + return code -> LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class.getName() + "." + code.name()); + } + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java index 7bca03781de..2fc7e4227f1 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java @@ -49,16 +49,15 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl return; } - String subscriptionId = "(unknown?)"; + ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); + String subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue(); + + ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); + if (updatedSubscription != null) { + msg.setSubscription(updatedSubscription.getSubscription()); + } try { - ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); - subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue(); - - ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); - if (updatedSubscription != null) { - msg.setSubscription(updatedSubscription.getSubscription()); - } // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, msg, msg.getSubscription())) { @@ -71,9 +70,15 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, msg, msg.getSubscription()); } catch (Exception e) { - String msg = "Failure handling subscription payload for subscription: " + subscriptionId; - ourLog.error(msg, e); - throw new MessagingException(theMessage, msg, e); + + // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY + if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, msg, msg.getSubscription(), e)) { + return; + } + + String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; + ourLog.error(errorMsg, e); + throw new MessagingException(theMessage, errorMsg, e); } } From 88eb2c424e820a051c92c7c64e12526fe55a1124 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Mon, 4 Feb 2019 13:50:47 -0500 Subject: [PATCH 2/3] formatting --- .../ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java | 8 ++++---- .../interceptor/SubscriptionDebugLogInterceptor.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java index 745a8a29151..d2891307190 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java @@ -161,7 +161,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription","ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), /** * Invoked immediately before the delivery of a REST HOOK subscription. @@ -181,7 +181,7 @@ public enum Pointcut { * returns false, processing will be aborted. *

*/ - SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription","ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), /** * Invoked whenever a persisted resource (a resource that has just been stored in the @@ -309,7 +309,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource","org.hl7.fhir.instance.model.api.IBaseResource"), + OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), /** @@ -330,7 +330,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource","org.hl7.fhir.instance.model.api.IBaseResource"); + OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"); private final List myParameterTypes; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java index aff3ddb019d..8d35fdb6ce6 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java @@ -31,8 +31,8 @@ import java.util.function.Function; * creation may use another strategy. *

* - * @since 3.7.0 * @see EventCodeEnum + * @since 3.7.0 */ @Interceptor public class SubscriptionDebugLogInterceptor { From 4699c5199701cb68ca37d10f90bc5179d762f75b Mon Sep 17 00:00:00 2001 From: James Agnew Date: Fri, 8 Feb 2019 17:26:34 -0500 Subject: [PATCH 3/3] Address review comments --- .../SubscriptionDebugLogInterceptor.java | 28 +++++++++---------- .../BaseSubscriptionDeliverySubscriber.java | 7 +++-- .../subscriber/ResourceDeliveryMessage.java | 13 +++++---- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java index 8d35fdb6ce6..f17b99e963e 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java @@ -63,7 +63,7 @@ public class SubscriptionDebugLogInterceptor { public void step10_resourceModified(ResourceModifiedMessage theMessage) { String value = Long.toString(System.currentTimeMillis()); theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value); - log(EventCodeEnum.SUBS10, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType()); + log(EventCodeEnum.SUBS1, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType()); } /* @@ -78,27 +78,27 @@ public class SubscriptionDebugLogInterceptor { @Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED) public void step20_beforeChecked(ResourceModifiedMessage theMessage) { - log(EventCodeEnum.SUBS20, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); + log(EventCodeEnum.SUBS2, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED) public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) { - log(EventCodeEnum.SUBS30, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); + log(EventCodeEnum.SUBS3, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS) public void step35_subscriptionNotMatched(ResourceModifiedMessage theMessage) { - log(EventCodeEnum.SUBS35, "Resource {} did not match any subscriptions", theMessage.getPayloadId()); + log(EventCodeEnum.SUBS4, "Resource {} did not match any subscriptions", theMessage.getPayloadId()); } @Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY) public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) { - log(EventCodeEnum.SUBS40, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); + log(EventCodeEnum.SUBS5, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); } @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED) public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) { - log(EventCodeEnum.SUBS45, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString()); + log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString()); } @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY) @@ -110,7 +110,7 @@ public class SubscriptionDebugLogInterceptor { .map(start -> new StopWatch(start).toString()) .orElse("(unknown)"); - log(EventCodeEnum.SUBS50, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); + log(EventCodeEnum.SUBS7, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); } protected void log(EventCodeEnum theEventCode, String theMessage, Object... theArguments) { @@ -141,34 +141,34 @@ public class SubscriptionDebugLogInterceptor { * A new/updated resource has been submitted to the processing pipeline and is about * to be placed on the matchign queue. */ - SUBS10, + SUBS1, /** * A resources has been dequeued from the matching queue and is about to be checked * for any matching subscriptions. */ - SUBS20, + SUBS2, /** * The resource has matched a subscription (logged once for each matching subscription) * and is about to be queued for delivery. */ - SUBS30, + SUBS3, /** * The resource did not match any subscriptions and processing is complete. */ - SUBS35, + SUBS4, /** * The resource has been dequeued from the delivery queue and is about to be * delivered. */ - SUBS40, + SUBS5, /** * Delivery failed */ - SUBS45, + SUBS6, /** * Delivery is now complete and processing is finished. */ - SUBS50 + SUBS7 } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java index 2fc7e4227f1..5b7f3ca675f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java @@ -50,7 +50,7 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl } ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); - String subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue(); + String subscriptionId = msg.getSubscriptionId(myFhirContext); ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); if (updatedSubscription != null) { @@ -71,13 +71,14 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl } catch (Exception e) { + String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; + ourLog.error(errorMsg, e); + // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, msg, msg.getSubscription(), e)) { return; } - String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; - ourLog.error(errorMsg, e); throw new MessagingException(theMessage, errorMsg, e); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java index ce58da1f14a..d4c65aaf132 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java @@ -28,16 +28,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; -import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.builder.ToStringBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - import static org.apache.commons.lang3.StringUtils.isNotBlank; @SuppressWarnings("WeakerAccess") @@ -133,4 +127,11 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes .append("myOperationType", myOperationType) .toString(); } + + /** + * Helper method to fetch the subscription ID + */ + public String getSubscriptionId(FhirContext theFhirContext) { + return getSubscription().getIdElement(theFhirContext).getValue(); + } }