diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java index 46f47e283a0..98d545032c7 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR4Test.java @@ -54,7 +54,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); private static String ourListenerServerBase; - private List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); + protected List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); @After diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java index 5554aa28ef3..d61ae544bad 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookWithInterceptorR4Test.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInter import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.util.PortUtil; import org.apache.commons.lang3.Validate; import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.Observation; @@ -173,6 +174,33 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { assertEquals(0, ourUpdatedObservations.size()); } + + @Test + public void testDeliveryFailed() throws Exception { + ourNextBeforeRestHookDeliveryReturn = false; + + // Create a subscription + CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); + Subscription subscription = newSubscription("Observation?status=final", "application/fhir+json"); + subscription.getChannel().setEndpoint("http://localhost:" + PortUtil.findFreePort()); // this better not succeed! + + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + mySubscriptionIds.add(methodOutcome.getId()); + + registerLatch.await(10, TimeUnit.SECONDS); + + CountDownLatch latch = new CountDownLatch(1); + myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, params -> { + latch.countDown(); + }); + + sendObservation(); + + latch.await(10, TimeUnit.SECONDS); + } + + protected Observation sendObservation() { Observation observation = new Observation(); observation.setStatus(Observation.ObservationStatus.FINAL); @@ -195,7 +223,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { SubscriptionDebugLogInterceptor interceptor = new SubscriptionDebugLogInterceptor(); myInterceptorRegistry.registerInterceptor(interceptor); - SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(loggerMock, Level.DEBUG); + SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(t -> loggerMock, Level.DEBUG); myInterceptorRegistry.registerInterceptor(interceptor2); try { @@ -235,7 +263,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test { ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n "))); - assertThat(messages.get(messages.size() - 1), matchesPattern("\\[SUBS50\\] Finished delivery of resource Observation.*")); + assertThat(messages.get(messages.size() - 1), matchesPattern("Finished delivery of resource Observation.*")); } finally { myInterceptorRegistry.unregisterInterceptor(interceptor); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java index f302c316d39..af5a9258911 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/IInterceptorRegistry.java @@ -53,7 +53,6 @@ public interface IInterceptorRegistry { @Deprecated void unregisterGlobalInterceptor(Object theInterceptor); - @VisibleForTesting void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook); diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java index dc87004bccd..eab60633a37 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/interceptor/api/Pointcut.java @@ -124,6 +124,28 @@ public enum Pointcut { */ SUBSCRIPTION_AFTER_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + /** + * Invoked immediately after the attempted delivery of a subscription, if the delivery + * failed. + *

+ * Hooks may accept the following parameters: + *

+ *
    + *
  • java.lang.Exception - The exception that caused the failure
  • + *
  • ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription
  • + *
  • ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage
  • + *
+ *

+ * Hooks may return void or may return a boolean. If the method returns + * void or true, processing will continue normally, meaning that + * an exception will be thrown by the delivery mechanism. This typically means that the + * message will be returned to the processing queue. If the method + * returns false, processing will be aborted and no further action will be + * taken for the delivery. + *

+ */ + SUBSCRIPTION_AFTER_DELIVERY_FAILED("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), + /** * Invoked immediately after the delivery of a REST HOOK subscription. *

@@ -179,6 +201,7 @@ public enum Pointcut { */ SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), + /** * Invoked whenever a persisted resource (a resource that has just been stored in the * database via a create/update/patch/etc.) has been checked for whether any subscriptions @@ -195,7 +218,6 @@ public enum Pointcut { */ SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), - /** * Invoked immediately after an active subscription is "registered". In HAPI FHIR, when * a subscription @@ -289,6 +311,7 @@ public enum Pointcut { */ OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), + /** * Invoked before a resource will be updated, immediately before the resource * is persisted to the database. @@ -307,9 +330,7 @@ public enum Pointcut { * Hooks should return void. *

*/ - OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), - - ; + OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"); private final List myParameterTypes; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java index 0cca62ab699..ef86819fb14 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/interceptor/SubscriptionDebugLogInterceptor.java @@ -32,34 +32,58 @@ import org.slf4j.LoggerFactory; import org.slf4j.event.Level; import java.util.Date; +import java.util.EnumMap; +import java.util.function.Function; /** * This interceptor can be used for troubleshooting subscription processing. It provides very * detailed logging about the subscription processing pipeline. + *

+ * This interceptor loges each step in the processing pipeline with a + * different event code, using the event codes itemized in + * {@link EventCodeEnum}. By default these are each placed in a logger with + * a different name (e.g. ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor.SUBS20 + * in order to facilitate fine-grained logging controls where some codes are omitted and + * some are not. + *

+ *

+ * A custom log factory can also be passed in, in which case the logging + * creation may use another strategy. + *

* + * @see EventCodeEnum * @since 3.7.0 */ @Interceptor public class SubscriptionDebugLogInterceptor { - private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class); private static final String SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK = "SubscriptionDebugLogInterceptor_precheck"; - private final Logger myLogger; private final Level myLevel; + private final EnumMap myLoggers; /** * Constructor that logs at INFO level to the logger ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor */ public SubscriptionDebugLogInterceptor() { - this(DEFAULT_LOGGER, Level.INFO); + this(defaultLogFactory(), Level.INFO); } /** * Constructor using a specific logger */ - public SubscriptionDebugLogInterceptor(Logger theLog, Level theLevel) { - myLogger = theLog; + public SubscriptionDebugLogInterceptor(Function theLogFactory, Level theLevel) { myLevel = theLevel; + myLoggers = new EnumMap<>(EventCodeEnum.class); + for (EventCodeEnum next : EventCodeEnum.values()) { + myLoggers.put(next, theLogFactory.apply(next)); + } + } + + @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED) + public void step10_resourceModified(ResourceModifiedMessage theMessage) { + String value = Long.toString(System.currentTimeMillis()); + theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value); + log(EventCodeEnum.SUBS1, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType()); } /* @@ -72,31 +96,29 @@ public class SubscriptionDebugLogInterceptor { * gaps to add things if we ever need them. */ - @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED) - public void step10_resourceModified(ResourceModifiedMessage theMessage) { - String value = Long.toString(System.currentTimeMillis()); - theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value); - log("SUBS10","Resource {} was submitted to the processing pipeline", theMessage.getPayloadId(), theMessage.getOperationType()); - } - @Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED) public void step20_beforeChecked(ResourceModifiedMessage theMessage) { - log("SUBS20","Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); + log(EventCodeEnum.SUBS2, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED) public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) { - log("SUBS30","Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); + log(EventCodeEnum.SUBS3, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); } @Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS) - public void step35_subscriptionMatched(ResourceModifiedMessage theMessage) { - log("SUBS35","Resource {} did not match any subscriptions", theMessage.getPayloadId()); + public void step35_subscriptionNotMatched(ResourceModifiedMessage theMessage) { + log(EventCodeEnum.SUBS4, "Resource {} did not match any subscriptions", theMessage.getPayloadId()); } @Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY) public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) { - log("SUBS40","Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); + log(EventCodeEnum.SUBS5, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); + } + + @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED) + public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) { + log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString()); } @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY) @@ -108,28 +130,70 @@ public class SubscriptionDebugLogInterceptor { .map(start -> new StopWatch(start).toString()) .orElse("(unknown)"); - log("SUBS50","Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); + log(EventCodeEnum.SUBS7, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); } - private void log(String theCode, String theMessage, Object... theArguments) { - String msg = "[" + theCode + "] " + theMessage; - switch (myLevel) { - case ERROR: - myLogger.error(msg, theArguments); - break; - case WARN: - myLogger.warn(msg, theArguments); - break; - case INFO: - myLogger.info(msg, theArguments); - break; - case DEBUG: - myLogger.debug(msg, theArguments); - break; - case TRACE: - myLogger.trace(msg, theArguments); - break; + protected void log(EventCodeEnum theEventCode, String theMessage, Object... theArguments) { + Logger logger = myLoggers.get(theEventCode); + if (logger != null) { + switch (myLevel) { + case ERROR: + logger.error(theMessage, theArguments); + break; + case WARN: + logger.warn(theMessage, theArguments); + break; + case INFO: + logger.info(theMessage, theArguments); + break; + case DEBUG: + logger.debug(theMessage, theArguments); + break; + case TRACE: + logger.trace(theMessage, theArguments); + break; + } } } + public enum EventCodeEnum { + /** + * A new/updated resource has been submitted to the processing pipeline and is about + * to be placed on the matchign queue. + */ + SUBS1, + /** + * A resources has been dequeued from the matching queue and is about to be checked + * for any matching subscriptions. + */ + SUBS2, + /** + * The resource has matched a subscription (logged once for each matching subscription) + * and is about to be queued for delivery. + */ + SUBS3, + /** + * The resource did not match any subscriptions and processing is complete. + */ + SUBS4, + /** + * The resource has been dequeued from the delivery queue and is about to be + * delivered. + */ + SUBS5, + /** + * Delivery failed + */ + SUBS6, + /** + * Delivery is now complete and processing is finished. + */ + SUBS7 + } + + + private static Function defaultLogFactory() { + return code -> LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class.getName() + "." + code.name()); + } + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java index 7bca03781de..5b7f3ca675f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/BaseSubscriptionDeliverySubscriber.java @@ -49,16 +49,15 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl return; } - String subscriptionId = "(unknown?)"; + ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); + String subscriptionId = msg.getSubscriptionId(myFhirContext); + + ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); + if (updatedSubscription != null) { + msg.setSubscription(updatedSubscription.getSubscription()); + } try { - ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); - subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue(); - - ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); - if (updatedSubscription != null) { - msg.setSubscription(updatedSubscription.getSubscription()); - } // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, msg, msg.getSubscription())) { @@ -71,9 +70,16 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, msg, msg.getSubscription()); } catch (Exception e) { - String msg = "Failure handling subscription payload for subscription: " + subscriptionId; - ourLog.error(msg, e); - throw new MessagingException(theMessage, msg, e); + + String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; + ourLog.error(errorMsg, e); + + // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY + if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, msg, msg.getSubscription(), e)) { + return; + } + + throw new MessagingException(theMessage, errorMsg, e); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java index df76dd070a0..05a5d293f37 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/ResourceDeliveryMessage.java @@ -28,16 +28,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; -import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.builder.ToStringBuilder; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - import static org.apache.commons.lang3.StringUtils.isNotBlank; @SuppressWarnings("WeakerAccess") @@ -133,4 +127,11 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes .append("myOperationType", myOperationType) .toString(); } + + /** + * Helper method to fetch the subscription ID + */ + public String getSubscriptionId(FhirContext theFhirContext) { + return getSubscription().getIdElement(theFhirContext).getValue(); + } }