Merge pull request #1194 from jamesagnew/ja_more_subscription_logger

More subscription logger work
This commit is contained in:
James Agnew 2019-02-09 11:45:25 -05:00 committed by GitHub
commit a5e10740c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 61 deletions

View File

@ -54,7 +54,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
private static String ourListenerServerBase; private static String ourListenerServerBase;
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@After @After

View File

@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInter
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.util.PortUtil;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;
@ -173,6 +174,33 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
assertEquals(0, ourUpdatedObservations.size()); assertEquals(0, ourUpdatedObservations.size());
} }
@Test
public void testDeliveryFailed() throws Exception {
ourNextBeforeRestHookDeliveryReturn = false;
// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
Subscription subscription = newSubscription("Observation?status=final", "application/fhir+json");
subscription.getChannel().setEndpoint("http://localhost:" + PortUtil.findFreePort()); // this better not succeed!
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
mySubscriptionIds.add(methodOutcome.getId());
registerLatch.await(10, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, params -> {
latch.countDown();
});
sendObservation();
latch.await(10, TimeUnit.SECONDS);
}
protected Observation sendObservation() { protected Observation sendObservation() {
Observation observation = new Observation(); Observation observation = new Observation();
observation.setStatus(Observation.ObservationStatus.FINAL); observation.setStatus(Observation.ObservationStatus.FINAL);
@ -195,7 +223,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
SubscriptionDebugLogInterceptor interceptor = new SubscriptionDebugLogInterceptor(); SubscriptionDebugLogInterceptor interceptor = new SubscriptionDebugLogInterceptor();
myInterceptorRegistry.registerInterceptor(interceptor); myInterceptorRegistry.registerInterceptor(interceptor);
SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(loggerMock, Level.DEBUG); SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(t -> loggerMock, Level.DEBUG);
myInterceptorRegistry.registerInterceptor(interceptor2); myInterceptorRegistry.registerInterceptor(interceptor2);
try { try {
@ -235,7 +263,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n "))); ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n ")));
assertThat(messages.get(messages.size() - 1), matchesPattern("\\[SUBS50\\] Finished delivery of resource Observation.*")); assertThat(messages.get(messages.size() - 1), matchesPattern("Finished delivery of resource Observation.*"));
} finally { } finally {
myInterceptorRegistry.unregisterInterceptor(interceptor); myInterceptorRegistry.unregisterInterceptor(interceptor);

View File

@ -53,7 +53,6 @@ public interface IInterceptorRegistry {
@Deprecated @Deprecated
void unregisterGlobalInterceptor(Object theInterceptor); void unregisterGlobalInterceptor(Object theInterceptor);
@VisibleForTesting @VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook); void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook);

View File

@ -124,6 +124,28 @@ public enum Pointcut {
*/ */
SUBSCRIPTION_AFTER_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"), SUBSCRIPTION_AFTER_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"),
/**
* Invoked immediately after the attempted delivery of a subscription, if the delivery
* failed.
* <p>
* Hooks may accept the following parameters:
* </p>
* <ul>
* <li>java.lang.Exception - The exception that caused the failure</li>
* <li>ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription</li>
* <li>ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage</li>
* </ul>
* <p>
* Hooks may return <code>void</code> or may return a <code>boolean</code>. If the method returns
* <code>void</code> or <code>true</code>, processing will continue normally, meaning that
* an exception will be thrown by the delivery mechanism. This typically means that the
* message will be returned to the processing queue. If the method
* returns <code>false</code>, processing will be aborted and no further action will be
* taken for the delivery.
* </p>
*/
SUBSCRIPTION_AFTER_DELIVERY_FAILED("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"),
/** /**
* Invoked immediately after the delivery of a REST HOOK subscription. * Invoked immediately after the delivery of a REST HOOK subscription.
* <p> * <p>
@ -179,6 +201,7 @@ public enum Pointcut {
*/ */
SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"),
/** /**
* Invoked whenever a persisted resource (a resource that has just been stored in the * Invoked whenever a persisted resource (a resource that has just been stored in the
* database via a create/update/patch/etc.) has been checked for whether any subscriptions * database via a create/update/patch/etc.) has been checked for whether any subscriptions
@ -195,7 +218,6 @@ public enum Pointcut {
*/ */
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"), SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"),
/** /**
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when * Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
* a subscription * a subscription
@ -289,6 +311,7 @@ public enum Pointcut {
*/ */
OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"),
/** /**
* Invoked before a resource will be updated, immediately before the resource * Invoked before a resource will be updated, immediately before the resource
* is persisted to the database. * is persisted to the database.
@ -307,9 +330,7 @@ public enum Pointcut {
* Hooks should return <code>void</code>. * Hooks should return <code>void</code>.
* </p> * </p>
*/ */
OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"), OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource");
;
private final List<String> myParameterTypes; private final List<String> myParameterTypes;

View File

@ -32,34 +32,58 @@ import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.util.Date; import java.util.Date;
import java.util.EnumMap;
import java.util.function.Function;
/** /**
* This interceptor can be used for troubleshooting subscription processing. It provides very * This interceptor can be used for troubleshooting subscription processing. It provides very
* detailed logging about the subscription processing pipeline. * detailed logging about the subscription processing pipeline.
* <p>
* This interceptor loges each step in the processing pipeline with a
* different event code, using the event codes itemized in
* {@link EventCodeEnum}. By default these are each placed in a logger with
* a different name (e.g. <code>ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor.SUBS20</code>
* in order to facilitate fine-grained logging controls where some codes are omitted and
* some are not.
* </p>
* <p>
* A custom log factory can also be passed in, in which case the logging
* creation may use another strategy.
* </p>
* *
* @see EventCodeEnum
* @since 3.7.0 * @since 3.7.0
*/ */
@Interceptor @Interceptor
public class SubscriptionDebugLogInterceptor { public class SubscriptionDebugLogInterceptor {
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class);
private static final String SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK = "SubscriptionDebugLogInterceptor_precheck"; private static final String SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK = "SubscriptionDebugLogInterceptor_precheck";
private final Logger myLogger;
private final Level myLevel; private final Level myLevel;
private final EnumMap<EventCodeEnum, Logger> myLoggers;
/** /**
* Constructor that logs at INFO level to the logger <code>ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor</code> * Constructor that logs at INFO level to the logger <code>ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor</code>
*/ */
public SubscriptionDebugLogInterceptor() { public SubscriptionDebugLogInterceptor() {
this(DEFAULT_LOGGER, Level.INFO); this(defaultLogFactory(), Level.INFO);
} }
/** /**
* Constructor using a specific logger * Constructor using a specific logger
*/ */
public SubscriptionDebugLogInterceptor(Logger theLog, Level theLevel) { public SubscriptionDebugLogInterceptor(Function<EventCodeEnum, Logger> theLogFactory, Level theLevel) {
myLogger = theLog;
myLevel = theLevel; myLevel = theLevel;
myLoggers = new EnumMap<>(EventCodeEnum.class);
for (EventCodeEnum next : EventCodeEnum.values()) {
myLoggers.put(next, theLogFactory.apply(next));
}
}
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)
public void step10_resourceModified(ResourceModifiedMessage theMessage) {
String value = Long.toString(System.currentTimeMillis());
theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value);
log(EventCodeEnum.SUBS1, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType());
} }
/* /*
@ -72,31 +96,29 @@ public class SubscriptionDebugLogInterceptor {
* gaps to add things if we ever need them. * gaps to add things if we ever need them.
*/ */
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)
public void step10_resourceModified(ResourceModifiedMessage theMessage) {
String value = Long.toString(System.currentTimeMillis());
theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value);
log("SUBS10","Resource {} was submitted to the processing pipeline", theMessage.getPayloadId(), theMessage.getOperationType());
}
@Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED) @Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED)
public void step20_beforeChecked(ResourceModifiedMessage theMessage) { public void step20_beforeChecked(ResourceModifiedMessage theMessage) {
log("SUBS20","Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType()); log(EventCodeEnum.SUBS2, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType());
} }
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED) @Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED)
public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) { public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) {
log("SUBS30","Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory()); log(EventCodeEnum.SUBS3, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory());
} }
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS) @Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS)
public void step35_subscriptionMatched(ResourceModifiedMessage theMessage) { public void step35_subscriptionNotMatched(ResourceModifiedMessage theMessage) {
log("SUBS35","Resource {} did not match any subscriptions", theMessage.getPayloadId()); log(EventCodeEnum.SUBS4, "Resource {} did not match any subscriptions", theMessage.getPayloadId());
} }
@Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY) @Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY)
public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) { public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) {
log("SUBS40","Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl()); log(EventCodeEnum.SUBS5, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl());
}
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED)
public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) {
log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString());
} }
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY) @Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY)
@ -108,28 +130,70 @@ public class SubscriptionDebugLogInterceptor {
.map(start -> new StopWatch(start).toString()) .map(start -> new StopWatch(start).toString())
.orElse("(unknown)"); .orElse("(unknown)");
log("SUBS50","Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime); log(EventCodeEnum.SUBS7, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime);
} }
private void log(String theCode, String theMessage, Object... theArguments) { protected void log(EventCodeEnum theEventCode, String theMessage, Object... theArguments) {
String msg = "[" + theCode + "] " + theMessage; Logger logger = myLoggers.get(theEventCode);
switch (myLevel) { if (logger != null) {
case ERROR: switch (myLevel) {
myLogger.error(msg, theArguments); case ERROR:
break; logger.error(theMessage, theArguments);
case WARN: break;
myLogger.warn(msg, theArguments); case WARN:
break; logger.warn(theMessage, theArguments);
case INFO: break;
myLogger.info(msg, theArguments); case INFO:
break; logger.info(theMessage, theArguments);
case DEBUG: break;
myLogger.debug(msg, theArguments); case DEBUG:
break; logger.debug(theMessage, theArguments);
case TRACE: break;
myLogger.trace(msg, theArguments); case TRACE:
break; logger.trace(theMessage, theArguments);
break;
}
} }
} }
public enum EventCodeEnum {
/**
* A new/updated resource has been submitted to the processing pipeline and is about
* to be placed on the matchign queue.
*/
SUBS1,
/**
* A resources has been dequeued from the matching queue and is about to be checked
* for any matching subscriptions.
*/
SUBS2,
/**
* The resource has matched a subscription (logged once for each matching subscription)
* and is about to be queued for delivery.
*/
SUBS3,
/**
* The resource did not match any subscriptions and processing is complete.
*/
SUBS4,
/**
* The resource has been dequeued from the delivery queue and is about to be
* delivered.
*/
SUBS5,
/**
* Delivery failed
*/
SUBS6,
/**
* Delivery is now complete and processing is finished.
*/
SUBS7
}
private static Function<EventCodeEnum, Logger> defaultLogFactory() {
return code -> LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class.getName() + "." + code.name());
}
} }

View File

@ -49,16 +49,15 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
return; return;
} }
String subscriptionId = "(unknown?)"; ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
String subscriptionId = msg.getSubscriptionId(myFhirContext);
ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
if (updatedSubscription != null) {
msg.setSubscription(updatedSubscription.getSubscription());
}
try { try {
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue();
ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
if (updatedSubscription != null) {
msg.setSubscription(updatedSubscription.getSubscription());
}
// Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, msg, msg.getSubscription())) { if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, msg, msg.getSubscription())) {
@ -71,9 +70,16 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, msg, msg.getSubscription()); myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, msg, msg.getSubscription());
} catch (Exception e) { } catch (Exception e) {
String msg = "Failure handling subscription payload for subscription: " + subscriptionId;
ourLog.error(msg, e); String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId;
throw new MessagingException(theMessage, msg, e); ourLog.error(errorMsg, e);
// Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, msg, msg.getSubscription(), e)) {
return;
}
throw new MessagingException(theMessage, errorMsg, e);
} }
} }

View File

@ -28,16 +28,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson; import com.google.gson.Gson;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
@ -133,4 +127,11 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
.append("myOperationType", myOperationType) .append("myOperationType", myOperationType)
.toString(); .toString();
} }
/**
* Helper method to fetch the subscription ID
*/
public String getSubscriptionId(FhirContext theFhirContext) {
return getSubscription().getIdElement(theFhirContext).getValue();
}
} }