Merge branch 'master' of github.com:jamesagnew/hapi-fhir
This commit is contained in:
commit
b3fd1b91cf
|
@ -1341,7 +1341,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
outcome.setId(theResource.getIdElement());
|
||||
}
|
||||
|
||||
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulCreate", outcome.getId(), w.getMillisAndRestart());
|
||||
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "successfulUpdate", outcome.getId(), w.getMillisAndRestart());
|
||||
outcome.setOperationOutcome(createInfoOperationOutcome(msg));
|
||||
|
||||
ourLog.debug(msg);
|
||||
|
|
|
@ -54,7 +54,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
|
|||
protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
|
||||
private static String ourListenerServerBase;
|
||||
|
||||
private List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
|
||||
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
|
||||
@After
|
||||
|
|
|
@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
|
|||
import ca.uhn.fhir.jpa.subscription.NotificationServlet;
|
||||
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
|
||||
import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor;
|
||||
import ca.uhn.fhir.jpa.subscription.module.matcher.SubscriptionMatchingStrategy;
|
||||
import ca.uhn.fhir.rest.annotation.Create;
|
||||
import ca.uhn.fhir.rest.annotation.ResourceParam;
|
||||
|
@ -31,9 +32,10 @@ import javax.servlet.http.HttpServletRequest;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test the rest-hook subscriptions
|
||||
|
@ -54,12 +56,14 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
private SubscriptionTestUtil mySubscriptionTestUtil;
|
||||
private static NotificationServlet ourNotificationServlet;
|
||||
private static String ourNotificationListenerServer;
|
||||
private static CountDownLatch communicationRequestListenerLatch;
|
||||
private static SubscriptionDebugLogInterceptor ourSubscriptionDebugLogInterceptor = new SubscriptionDebugLogInterceptor();
|
||||
|
||||
@After
|
||||
public void afterUnregisterRestHookListener() {
|
||||
ourLog.info("**** Starting @After *****");
|
||||
|
||||
for (IIdType next : mySubscriptionIds){
|
||||
for (IIdType next : mySubscriptionIds) {
|
||||
ourClient.delete().resourceById(next).execute();
|
||||
}
|
||||
mySubscriptionIds.clear();
|
||||
|
@ -72,11 +76,13 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
|
||||
|
||||
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
|
||||
myInterceptorRegistry.unregisterInterceptor(ourSubscriptionDebugLogInterceptor);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeRegisterRestHookListener() {
|
||||
mySubscriptionTestUtil.registerRestHookInterceptor();
|
||||
myInterceptorRegistry.registerInterceptor(ourSubscriptionDebugLogInterceptor);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -112,7 +118,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
|
||||
waitForQueueToDrain();
|
||||
|
||||
return (Subscription)methodOutcome.getResource();
|
||||
return (Subscription) methodOutcome.getResource();
|
||||
}
|
||||
|
||||
private Observation sendObservation(String code, String system) {
|
||||
|
@ -201,7 +207,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
waitForSize(1, ourUpdatedObservations);
|
||||
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRestHookSubscriptionApplicationJson() throws Exception {
|
||||
String payload = "application/json";
|
||||
|
@ -435,7 +441,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
|
||||
Subscription subscriptionActivated = ourClient.read().resource(Subscription.class).withId(subscriptionId.toUnqualifiedVersionless()).execute();
|
||||
assertEquals(Subscription.SubscriptionStatus.ACTIVE, subscriptionActivated.getStatus());
|
||||
tags = subscriptionActivated.getMeta().getTag();
|
||||
tags = subscriptionActivated.getMeta().getTag();
|
||||
assertEquals(1, tags.size());
|
||||
tag = tags.get(0);
|
||||
assertEquals(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY, tag.getSystem());
|
||||
|
@ -459,14 +465,32 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
|
||||
Subscription subscription = ourClient.read().resource(Subscription.class).withId(subscriptionId.toUnqualifiedVersionless()).execute();
|
||||
assertEquals(Subscription.SubscriptionStatus.ACTIVE, subscription.getStatus());
|
||||
tags = subscription.getMeta().getTag();
|
||||
tags = subscription.getMeta().getTag();
|
||||
assertEquals(1, tags.size());
|
||||
tag = tags.get(0);
|
||||
tag = tags.get(0);
|
||||
assertEquals(SubscriptionConstants.EXT_SUBSCRIPTION_MATCHING_STRATEGY, tag.getSystem());
|
||||
assertEquals(SubscriptionMatchingStrategy.DATABASE.toString(), tag.getCode());
|
||||
assertEquals("Database", tag.getDisplay());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommunicationRequestWithRef() throws InterruptedException {
|
||||
Organization org = new Organization();
|
||||
MethodOutcome methodOutcome = ourClient.create().resource(org).execute();
|
||||
String orgId = methodOutcome.getId().getIdPart();
|
||||
|
||||
String criteria = "CommunicationRequest?requester=1276," + orgId + "&occurrence=ge2019-02-08T00:00:00-05:00&occurrence=le2019-02-09T00:00:00-05:00";
|
||||
String payload = "application/fhir+xml";
|
||||
createSubscription(criteria, payload, ourListenerServerBase);
|
||||
|
||||
CommunicationRequest cr = new CommunicationRequest();
|
||||
cr.getRequester().getAgent().setReference("Organization/" + orgId);
|
||||
cr.setOccurrence(new DateTimeType("2019-02-08T00:01:00-05:00"));
|
||||
communicationRequestListenerLatch = new CountDownLatch(1);
|
||||
ourClient.create().resource(cr).execute();
|
||||
assertTrue("Timed out waiting for subscription to match", communicationRequestListenerLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startListenerServer() throws Exception {
|
||||
ourListenerPort = PortUtil.findFreePort();
|
||||
|
@ -475,7 +499,8 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
ourNotificationListenerServer = "http://localhost:" + ourListenerPort + "/fhir/subscription";
|
||||
|
||||
ObservationListener obsListener = new ObservationListener();
|
||||
ourListenerRestServer.setResourceProviders(obsListener);
|
||||
CommunicationRequestListener crListener = new CommunicationRequestListener();
|
||||
ourListenerRestServer.setResourceProviders(obsListener, crListener);
|
||||
|
||||
ourListenerServer = new Server(ourListenerPort);
|
||||
ourNotificationServlet = new NotificationServlet();
|
||||
|
@ -505,7 +530,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
|
||||
ourLog.info("Received Listener Create");
|
||||
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
|
||||
ourCreatedObservations.add(theObservation);
|
||||
ourCreatedObservations.add((Observation) theObservation);
|
||||
return new MethodOutcome(new IdType("Observation/1"), true);
|
||||
}
|
||||
|
||||
|
@ -521,7 +546,27 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size());
|
||||
return new MethodOutcome(new IdType("Observation/1"), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class CommunicationRequestListener implements IResourceProvider {
|
||||
|
||||
@Create
|
||||
public MethodOutcome create(@ResourceParam CommunicationRequest theResource, HttpServletRequest theRequest) {
|
||||
ourLog.info("Received CommunicationRequestListener Create");
|
||||
communicationRequestListenerLatch.countDown();
|
||||
return new MethodOutcome(new IdType("CommunicationRequest/1"), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends IBaseResource> getResourceType() {
|
||||
return CommunicationRequest.class;
|
||||
}
|
||||
|
||||
@Update
|
||||
public MethodOutcome update(@ResourceParam CommunicationRequest theResource, HttpServletRequest theRequest) {
|
||||
ourLog.info("Received CommunicationRequestListener Update");
|
||||
communicationRequestListenerLatch.countDown();
|
||||
return new MethodOutcome(new IdType("CommunicationRequest/1"), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,37 +3,26 @@ package ca.uhn.fhir.jpa.subscription.resthook;
|
|||
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
|
||||
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
|
||||
import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor;
|
||||
import ca.uhn.fhir.rest.api.CacheControlDirective;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import org.hl7.fhir.instance.model.api.IBaseBundle;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.helpers.MessageFormatter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.matchesPattern;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Test the rest-hook subscriptions
|
||||
|
@ -773,9 +762,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
|
||||
}
|
||||
|
||||
// TODO: reenable
|
||||
@Test
|
||||
@Ignore
|
||||
public void testRestHookSubscriptionInvalidCriteria() throws Exception {
|
||||
String payload = "application/xml";
|
||||
|
||||
|
@ -784,8 +771,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
try {
|
||||
createSubscription(criteria1, payload);
|
||||
fail();
|
||||
} catch (InvalidRequestException e) {
|
||||
assertEquals("HTTP 400 Bad Request: Invalid criteria: Failed to parse match URL[Observation?codeeeee=SNOMED-CT] - Resource type Observation does not have a parameter with name: codeeeee", e.getMessage());
|
||||
} catch (UnprocessableEntityException e) {
|
||||
assertEquals("HTTP 422 Unprocessable Entity: Invalid subscription criteria submitted: Observation?codeeeee=SNOMED-CT Failed to parse match URL[Observation?codeeeee=SNOMED-CT] - Resource type Observation does not have a parameter with name: codeeeee", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInter
|
|||
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||
import ca.uhn.fhir.util.PortUtil;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
|
@ -173,6 +174,33 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
|
|||
assertEquals(0, ourUpdatedObservations.size());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDeliveryFailed() throws Exception {
|
||||
ourNextBeforeRestHookDeliveryReturn = false;
|
||||
|
||||
// Create a subscription
|
||||
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
|
||||
Subscription subscription = newSubscription("Observation?status=final", "application/fhir+json");
|
||||
subscription.getChannel().setEndpoint("http://localhost:" + PortUtil.findFreePort()); // this better not succeed!
|
||||
|
||||
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
|
||||
subscription.setId(methodOutcome.getId().getIdPart());
|
||||
mySubscriptionIds.add(methodOutcome.getId());
|
||||
|
||||
registerLatch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, params -> {
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
sendObservation();
|
||||
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
protected Observation sendObservation() {
|
||||
Observation observation = new Observation();
|
||||
observation.setStatus(Observation.ObservationStatus.FINAL);
|
||||
|
@ -195,7 +223,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
|
|||
|
||||
SubscriptionDebugLogInterceptor interceptor = new SubscriptionDebugLogInterceptor();
|
||||
myInterceptorRegistry.registerInterceptor(interceptor);
|
||||
SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(loggerMock, Level.DEBUG);
|
||||
SubscriptionDebugLogInterceptor interceptor2 = new SubscriptionDebugLogInterceptor(t -> loggerMock, Level.DEBUG);
|
||||
myInterceptorRegistry.registerInterceptor(interceptor2);
|
||||
try {
|
||||
|
||||
|
@ -235,7 +263,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
|
|||
|
||||
ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n ")));
|
||||
|
||||
assertThat(messages.get(messages.size() - 1), matchesPattern("\\[SUBS50\\] Finished delivery of resource Observation.*"));
|
||||
assertThat(messages.get(messages.size() - 1), matchesPattern("Finished delivery of resource Observation.*"));
|
||||
|
||||
} finally {
|
||||
myInterceptorRegistry.unregisterInterceptor(interceptor);
|
||||
|
|
|
@ -11,6 +11,7 @@ import ca.uhn.fhir.jpa.util.DerbyTenSevenHapiFhirDialect;
|
|||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.hibernate.jpa.HibernatePersistenceProvider;
|
||||
import org.hl7.fhir.instance.model.Subscription;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowire;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -43,6 +44,10 @@ public class FhirServerConfig extends BaseJavaConfigDstu3 {
|
|||
public DaoConfig daoConfig() {
|
||||
DaoConfig retVal = new DaoConfig();
|
||||
retVal.setAllowMultipleDelete(true);
|
||||
retVal.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET);
|
||||
retVal.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK);
|
||||
retVal.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.EMAIL);
|
||||
retVal.setSubscriptionMatchingEnabled(true);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
@ -54,7 +59,7 @@ public class FhirServerConfig extends BaseJavaConfigDstu3 {
|
|||
/**
|
||||
* The following bean configures the database connection. The 'url' property value of "jdbc:derby:directory:jpaserver_derby_files;create=true" indicates that the server should save resources in a
|
||||
* directory called "jpaserver_derby_files".
|
||||
*
|
||||
*
|
||||
* A URL to a remote database could also be placed here, along with login credentials and other properties supported by BasicDataSource.
|
||||
*/
|
||||
@Bean(destroyMethod = "close")
|
||||
|
|
|
@ -53,7 +53,6 @@ public interface IInterceptorRegistry {
|
|||
@Deprecated
|
||||
void unregisterGlobalInterceptor(Object theInterceptor);
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook);
|
||||
|
||||
|
|
|
@ -124,6 +124,28 @@ public enum Pointcut {
|
|||
*/
|
||||
SUBSCRIPTION_AFTER_DELIVERY("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"),
|
||||
|
||||
/**
|
||||
* Invoked immediately after the attempted delivery of a subscription, if the delivery
|
||||
* failed.
|
||||
* <p>
|
||||
* Hooks may accept the following parameters:
|
||||
* </p>
|
||||
* <ul>
|
||||
* <li>java.lang.Exception - The exception that caused the failure</li>
|
||||
* <li>ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription</li>
|
||||
* <li>ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Hooks may return <code>void</code> or may return a <code>boolean</code>. If the method returns
|
||||
* <code>void</code> or <code>true</code>, processing will continue normally, meaning that
|
||||
* an exception will be thrown by the delivery mechanism. This typically means that the
|
||||
* message will be returned to the processing queue. If the method
|
||||
* returns <code>false</code>, processing will be aborted and no further action will be
|
||||
* taken for the delivery.
|
||||
* </p>
|
||||
*/
|
||||
SUBSCRIPTION_AFTER_DELIVERY_FAILED("ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription", "ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage"),
|
||||
|
||||
/**
|
||||
* Invoked immediately after the delivery of a REST HOOK subscription.
|
||||
* <p>
|
||||
|
@ -179,6 +201,7 @@ public enum Pointcut {
|
|||
*/
|
||||
SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"),
|
||||
|
||||
|
||||
/**
|
||||
* Invoked whenever a persisted resource (a resource that has just been stored in the
|
||||
* database via a create/update/patch/etc.) has been checked for whether any subscriptions
|
||||
|
@ -195,7 +218,6 @@ public enum Pointcut {
|
|||
*/
|
||||
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage"),
|
||||
|
||||
|
||||
/**
|
||||
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
|
||||
* a subscription
|
||||
|
@ -289,6 +311,7 @@ public enum Pointcut {
|
|||
*/
|
||||
OP_PRECOMMIT_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"),
|
||||
|
||||
|
||||
/**
|
||||
* Invoked before a resource will be updated, immediately before the resource
|
||||
* is persisted to the database.
|
||||
|
@ -307,9 +330,7 @@ public enum Pointcut {
|
|||
* Hooks should return <code>void</code>.
|
||||
* </p>
|
||||
*/
|
||||
OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource"),
|
||||
|
||||
;
|
||||
OP_PRESTORAGE_RESOURCE_UPDATED("org.hl7.fhir.instance.model.api.IBaseResource", "org.hl7.fhir.instance.model.api.IBaseResource");
|
||||
|
||||
private final List<String> myParameterTypes;
|
||||
|
||||
|
|
|
@ -218,12 +218,12 @@ public final class ResourceIndexedSearchParams {
|
|||
return populatedResourceLinkParameters;
|
||||
}
|
||||
|
||||
public boolean matchParam(String theResourceName, String theParamName, RuntimeSearchParam paramDef, IQueryParameterType theParam) {
|
||||
if (paramDef == null) {
|
||||
public boolean matchParam(String theResourceName, String theParamName, RuntimeSearchParam theParamDef, IQueryParameterType theParam) {
|
||||
if (theParamDef == null) {
|
||||
return false;
|
||||
}
|
||||
Collection<? extends BaseResourceIndexedSearchParam> resourceParams;
|
||||
switch (paramDef.getParamType()) {
|
||||
switch (theParamDef.getParamType()) {
|
||||
case TOKEN:
|
||||
resourceParams = tokenParams;
|
||||
break;
|
||||
|
@ -243,7 +243,7 @@ public final class ResourceIndexedSearchParams {
|
|||
resourceParams = dateParams;
|
||||
break;
|
||||
case REFERENCE:
|
||||
return matchResourceLinks(theResourceName, theParamName, theParam);
|
||||
return matchResourceLinks(theResourceName, theParamName, theParam, theParamDef.getPath());
|
||||
case COMPOSITE:
|
||||
case HAS:
|
||||
case SPECIAL:
|
||||
|
@ -260,11 +260,11 @@ public final class ResourceIndexedSearchParams {
|
|||
return resourceParams.stream().anyMatch(namedParamPredicate);
|
||||
}
|
||||
|
||||
private boolean matchResourceLinks(String theResourceName, String theParamName, IQueryParameterType theParam) {
|
||||
private boolean matchResourceLinks(String theResourceName, String theParamName, IQueryParameterType theParam, String theParamPath) {
|
||||
ReferenceParam reference = (ReferenceParam)theParam;
|
||||
|
||||
Predicate<ResourceLink> namedParamPredicate = resourceLink ->
|
||||
resourceLinkMatches(theResourceName, resourceLink, theParamName)
|
||||
resourceLinkMatches(theResourceName, resourceLink, theParamName, theParamPath)
|
||||
&& resourceIdMatches(resourceLink, reference);
|
||||
|
||||
return links.stream().anyMatch(namedParamPredicate);
|
||||
|
@ -274,7 +274,7 @@ public final class ResourceIndexedSearchParams {
|
|||
ResourceTable target = theResourceLink.getTargetResource();
|
||||
IdDt idDt = target.getIdDt();
|
||||
if (idDt.isIdPartValidLong()) {
|
||||
return theReference.getIdPartAsLong() == idDt.getIdPartAsLong();
|
||||
return theReference.getIdPartAsLong().equals(idDt.getIdPartAsLong());
|
||||
} else {
|
||||
ForcedId forcedId = target.getForcedId();
|
||||
if (forcedId != null) {
|
||||
|
@ -285,9 +285,9 @@ public final class ResourceIndexedSearchParams {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean resourceLinkMatches(String theResourceName, ResourceLink theResourceLink, String theParamName) {
|
||||
private boolean resourceLinkMatches(String theResourceName, ResourceLink theResourceLink, String theParamName, String theParamPath) {
|
||||
return theResourceLink.getTargetResource().getResourceType().equalsIgnoreCase(theParamName) ||
|
||||
theResourceLink.getSourcePath().equalsIgnoreCase(theResourceName+"."+theParamName);
|
||||
theResourceLink.getSourcePath().equalsIgnoreCase(theParamPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,34 +32,58 @@ import org.slf4j.LoggerFactory;
|
|||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.EnumMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* This interceptor can be used for troubleshooting subscription processing. It provides very
|
||||
* detailed logging about the subscription processing pipeline.
|
||||
* <p>
|
||||
* This interceptor loges each step in the processing pipeline with a
|
||||
* different event code, using the event codes itemized in
|
||||
* {@link EventCodeEnum}. By default these are each placed in a logger with
|
||||
* a different name (e.g. <code>ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor.SUBS20</code>
|
||||
* in order to facilitate fine-grained logging controls where some codes are omitted and
|
||||
* some are not.
|
||||
* </p>
|
||||
* <p>
|
||||
* A custom log factory can also be passed in, in which case the logging
|
||||
* creation may use another strategy.
|
||||
* </p>
|
||||
*
|
||||
* @see EventCodeEnum
|
||||
* @since 3.7.0
|
||||
*/
|
||||
@Interceptor
|
||||
public class SubscriptionDebugLogInterceptor {
|
||||
|
||||
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class);
|
||||
private static final String SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK = "SubscriptionDebugLogInterceptor_precheck";
|
||||
private final Logger myLogger;
|
||||
private final Level myLevel;
|
||||
private final EnumMap<EventCodeEnum, Logger> myLoggers;
|
||||
|
||||
/**
|
||||
* Constructor that logs at INFO level to the logger <code>ca.uhn.fhir.jpa.subscription.module.interceptor.SubscriptionDebugLogInterceptor</code>
|
||||
*/
|
||||
public SubscriptionDebugLogInterceptor() {
|
||||
this(DEFAULT_LOGGER, Level.INFO);
|
||||
this(defaultLogFactory(), Level.INFO);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using a specific logger
|
||||
*/
|
||||
public SubscriptionDebugLogInterceptor(Logger theLog, Level theLevel) {
|
||||
myLogger = theLog;
|
||||
public SubscriptionDebugLogInterceptor(Function<EventCodeEnum, Logger> theLogFactory, Level theLevel) {
|
||||
myLevel = theLevel;
|
||||
myLoggers = new EnumMap<>(EventCodeEnum.class);
|
||||
for (EventCodeEnum next : EventCodeEnum.values()) {
|
||||
myLoggers.put(next, theLogFactory.apply(next));
|
||||
}
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)
|
||||
public void step10_resourceModified(ResourceModifiedMessage theMessage) {
|
||||
String value = Long.toString(System.currentTimeMillis());
|
||||
theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value);
|
||||
log(EventCodeEnum.SUBS1, "Resource {} was submitted to the processing pipeline (op={})", theMessage.getPayloadId(), theMessage.getOperationType());
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -72,31 +96,29 @@ public class SubscriptionDebugLogInterceptor {
|
|||
* gaps to add things if we ever need them.
|
||||
*/
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED)
|
||||
public void step10_resourceModified(ResourceModifiedMessage theMessage) {
|
||||
String value = Long.toString(System.currentTimeMillis());
|
||||
theMessage.setAttribute(SUBSCRIPTION_DEBUG_LOG_INTERCEPTOR_PRECHECK, value);
|
||||
log("SUBS10","Resource {} was submitted to the processing pipeline", theMessage.getPayloadId(), theMessage.getOperationType());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED)
|
||||
public void step20_beforeChecked(ResourceModifiedMessage theMessage) {
|
||||
log("SUBS20","Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType());
|
||||
log(EventCodeEnum.SUBS2, "Checking resource {} (op={}) for matching subscriptions", theMessage.getPayloadId(), theMessage.getOperationType());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED)
|
||||
public void step30_subscriptionMatched(ResourceDeliveryMessage theMessage, SubscriptionMatchResult theResult) {
|
||||
log("SUBS30","Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory());
|
||||
log(EventCodeEnum.SUBS3, "Resource {} matched by subscription {} (memory match={})", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theResult.isInMemory());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS)
|
||||
public void step35_subscriptionMatched(ResourceModifiedMessage theMessage) {
|
||||
log("SUBS35","Resource {} did not match any subscriptions", theMessage.getPayloadId());
|
||||
public void step35_subscriptionNotMatched(ResourceModifiedMessage theMessage) {
|
||||
log(EventCodeEnum.SUBS4, "Resource {} did not match any subscriptions", theMessage.getPayloadId());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY)
|
||||
public void step40_beforeDelivery(ResourceDeliveryMessage theMessage) {
|
||||
log("SUBS40","Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl());
|
||||
log(EventCodeEnum.SUBS5, "Delivering resource {} for subscription {} to channel of type {} to endpoint {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theMessage.getSubscription().getEndpointUrl());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED)
|
||||
public void step45_deliveryFailed(ResourceDeliveryMessage theMessage, Exception theFailure) {
|
||||
log(EventCodeEnum.SUBS6, "Delivery of resource {} for subscription {} to channel of type {} - Failure: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), theFailure.toString());
|
||||
}
|
||||
|
||||
@Hook(Pointcut.SUBSCRIPTION_AFTER_DELIVERY)
|
||||
|
@ -108,28 +130,70 @@ public class SubscriptionDebugLogInterceptor {
|
|||
.map(start -> new StopWatch(start).toString())
|
||||
.orElse("(unknown)");
|
||||
|
||||
log("SUBS50","Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime);
|
||||
log(EventCodeEnum.SUBS7, "Finished delivery of resource {} for subscription {} to channel of type {} - Total processing time: {}", theMessage.getPayloadId(), theMessage.getSubscription().getIdElementString(), theMessage.getSubscription().getChannelType(), processingTime);
|
||||
}
|
||||
|
||||
private void log(String theCode, String theMessage, Object... theArguments) {
|
||||
String msg = "[" + theCode + "] " + theMessage;
|
||||
switch (myLevel) {
|
||||
case ERROR:
|
||||
myLogger.error(msg, theArguments);
|
||||
break;
|
||||
case WARN:
|
||||
myLogger.warn(msg, theArguments);
|
||||
break;
|
||||
case INFO:
|
||||
myLogger.info(msg, theArguments);
|
||||
break;
|
||||
case DEBUG:
|
||||
myLogger.debug(msg, theArguments);
|
||||
break;
|
||||
case TRACE:
|
||||
myLogger.trace(msg, theArguments);
|
||||
break;
|
||||
protected void log(EventCodeEnum theEventCode, String theMessage, Object... theArguments) {
|
||||
Logger logger = myLoggers.get(theEventCode);
|
||||
if (logger != null) {
|
||||
switch (myLevel) {
|
||||
case ERROR:
|
||||
logger.error(theMessage, theArguments);
|
||||
break;
|
||||
case WARN:
|
||||
logger.warn(theMessage, theArguments);
|
||||
break;
|
||||
case INFO:
|
||||
logger.info(theMessage, theArguments);
|
||||
break;
|
||||
case DEBUG:
|
||||
logger.debug(theMessage, theArguments);
|
||||
break;
|
||||
case TRACE:
|
||||
logger.trace(theMessage, theArguments);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public enum EventCodeEnum {
|
||||
/**
|
||||
* A new/updated resource has been submitted to the processing pipeline and is about
|
||||
* to be placed on the matchign queue.
|
||||
*/
|
||||
SUBS1,
|
||||
/**
|
||||
* A resources has been dequeued from the matching queue and is about to be checked
|
||||
* for any matching subscriptions.
|
||||
*/
|
||||
SUBS2,
|
||||
/**
|
||||
* The resource has matched a subscription (logged once for each matching subscription)
|
||||
* and is about to be queued for delivery.
|
||||
*/
|
||||
SUBS3,
|
||||
/**
|
||||
* The resource did not match any subscriptions and processing is complete.
|
||||
*/
|
||||
SUBS4,
|
||||
/**
|
||||
* The resource has been dequeued from the delivery queue and is about to be
|
||||
* delivered.
|
||||
*/
|
||||
SUBS5,
|
||||
/**
|
||||
* Delivery failed
|
||||
*/
|
||||
SUBS6,
|
||||
/**
|
||||
* Delivery is now complete and processing is finished.
|
||||
*/
|
||||
SUBS7
|
||||
}
|
||||
|
||||
|
||||
private static Function<EventCodeEnum, Logger> defaultLogFactory() {
|
||||
return code -> LoggerFactory.getLogger(SubscriptionDebugLogInterceptor.class.getName() + "." + code.name());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -49,16 +49,15 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
|
|||
return;
|
||||
}
|
||||
|
||||
String subscriptionId = "(unknown?)";
|
||||
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
|
||||
String subscriptionId = msg.getSubscriptionId(myFhirContext);
|
||||
|
||||
ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
|
||||
if (updatedSubscription != null) {
|
||||
msg.setSubscription(updatedSubscription.getSubscription());
|
||||
}
|
||||
|
||||
try {
|
||||
ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
|
||||
subscriptionId = msg.getSubscription().getIdElement(myFhirContext).getValue();
|
||||
|
||||
ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
|
||||
if (updatedSubscription != null) {
|
||||
msg.setSubscription(updatedSubscription.getSubscription());
|
||||
}
|
||||
|
||||
// Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY
|
||||
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, msg, msg.getSubscription())) {
|
||||
|
@ -71,9 +70,16 @@ public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandl
|
|||
myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, msg, msg.getSubscription());
|
||||
|
||||
} catch (Exception e) {
|
||||
String msg = "Failure handling subscription payload for subscription: " + subscriptionId;
|
||||
ourLog.error(msg, e);
|
||||
throw new MessagingException(theMessage, msg, e);
|
||||
|
||||
String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId;
|
||||
ourLog.error(errorMsg, e);
|
||||
|
||||
// Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
|
||||
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, msg, msg.getSubscription(), e)) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw new MessagingException(theMessage, errorMsg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,16 +28,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
|
@ -133,4 +127,11 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
|
|||
.append("myOperationType", myOperationType)
|
||||
.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to fetch the subscription ID
|
||||
*/
|
||||
public String getSubscriptionId(FhirContext theFhirContext) {
|
||||
return getSubscription().getIdElement(theFhirContext).getValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -541,7 +541,21 @@ public class InMemorySubscriptionMatcherTestR3 extends BaseSubscriptionDstu3Test
|
|||
}
|
||||
}
|
||||
|
||||
// These last two are covered by other tests above
|
||||
// String criteria = "ProcedureRequest?intent=original-order&category=Laboratory,Ancillary%20Orders,Hemodialysis&status=suspended,entered-in-error,cancelled";
|
||||
// String criteria = "Observation?code=70965-9&context.type=IHD";
|
||||
@Test
|
||||
public void testCommunicationRequestWithRefAndDate() {
|
||||
String criteria = "CommunicationRequest?requester=O1271,O1276&occurrence=ge2019-02-08T00:00:00-05:00&occurrence=le2019-02-09T00:00:00-05:00";
|
||||
CommunicationRequest cr = new CommunicationRequest();
|
||||
cr.getRequester().getAgent().setReference("Organization/O1276");
|
||||
cr.setOccurrence(new DateTimeType("2019-02-08T00:01:00-05:00"));
|
||||
assertUnsupported(cr, criteria);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCommunicationRequestWithRef() {
|
||||
String criteria = "CommunicationRequest?requester=O1271,O1276";
|
||||
CommunicationRequest cr = new CommunicationRequest();
|
||||
cr.getRequester().getAgent().setReference("Organization/O1276");
|
||||
assertMatched(cr, criteria);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue