From 40d0c27ae3d743909595fab45a50c4f8143c8b29 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Sun, 5 Apr 2020 18:43:27 -0400 Subject: [PATCH] Work on subscription cleanup --- .../ca/uhn/fhir/jpa/api/dao/DaoRegistry.java | 8 ++ .../r4/BaseResourceProviderR4Test.java | 1 - .../r5/BaseResourceProviderR5Test.java | 1 - .../config/SubscriptionChannelConfig.java | 6 +- .../SubscriptionChannelFactory.java | 50 +++++++---- .../SubscriptionChannelRegistry.java | 2 +- .../process/deliver/DaoResourceRetriever.java | 20 ++++- ...aseSubscriberForSubscriptionResources.java | 7 +- .../MatchingQueueSubscriberLoader.java | 7 +- .../SubscriptionMatcherInterceptor.java | 15 +++- .../SubscriptionSubmitInterceptorLoader.java | 1 - ...onfig.java => SubscriptionTestConfig.java} | 6 +- .../provider/HashMapResourceProvider.java | 70 +++++++++++---- .../provider/HashMapResourceProviderTest.java | 90 ++++++++++++------- .../uhn/test/concurrency/PointcutLatch.java | 11 ++- 15 files changed, 211 insertions(+), 84 deletions(-) rename hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/{SubscriptionConfig.java => SubscriptionTestConfig.java} (90%) diff --git a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/dao/DaoRegistry.java b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/dao/DaoRegistry.java index 1a0e12bb6e8..d0c26ad60f3 100644 --- a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/dao/DaoRegistry.java +++ b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/dao/DaoRegistry.java @@ -51,7 +51,15 @@ public class DaoRegistry implements ApplicationContextAware, IDaoRegistry { * Constructor */ public DaoRegistry() { + this(null); + } + + /** + * Constructor + */ + public DaoRegistry(FhirContext theFhirContext) { super(); + myContext = theFhirContext; } public void setSupportedResourceTypes(Collection theSupportedResourceTypes) { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java index 3558f486ebd..dc45df8c348 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java @@ -163,7 +163,6 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class); ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class); - ourSubscriptionMatcherInterceptor.start(); confProvider.setSearchParamRegistry(ourSearchParamRegistry); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java index aef1545bb36..fb3c410147f 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r5/BaseResourceProviderR5Test.java @@ -166,7 +166,6 @@ public abstract class BaseResourceProviderR5Test extends BaseJpaR5Test { mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class); ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class); - ourSubscriptionMatcherInterceptor.start(); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); confProvider.setSearchParamRegistry(ourSearchParamRegistry); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java index 64d9c62d8a9..cad03f33097 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/config/SubscriptionChannelConfig.java @@ -33,13 +33,13 @@ public class SubscriptionChannelConfig { * Create a @Primary @Bean if you need a different implementation */ @Bean - public IQueueChannelFactory subscribableChannelFactory() { + public IQueueChannelFactory queueChannelFactory() { return new LinkedBlockingQueueChannelFactory(); } @Bean - public SubscriptionChannelFactory subscriptionChannelFactory() { - return new SubscriptionChannelFactory(); + public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) { + return new SubscriptionChannelFactory(theQueueChannelFactory); } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index 873cca9be92..3603fbd8f23 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -21,11 +21,13 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription; */ import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; -import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -35,24 +37,32 @@ import org.springframework.messaging.support.AbstractSubscribableChannel; public class SubscriptionChannelFactory { - @Autowired - private IQueueChannelFactory mySubscribableChannelFactory; + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionChannelFactory.class); + private final IQueueChannelFactory myQueueChannelFactory; + + /** + * Constructor + */ + public SubscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) { + Validate.notNull(theQueueChannelFactory); + myQueueChannelFactory = theQueueChannelFactory; + } public MessageChannel newDeliverySendingChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); + return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers()); } public SubscribableChannel newDeliveryChannel(String theChannelName) { - SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); + SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers()); return new BroadcastingSubscribableChannelWrapper(channel); } public MessageChannel newMatchingSendingChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); + return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers()); } public SubscribableChannel newMatchingReceivingChannel(String theChannelName) { - SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); + SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers()); return new BroadcastingSubscribableChannelWrapper(channel); } @@ -64,8 +74,7 @@ public class SubscriptionChannelFactory { return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS; } - - private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean { + public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean { private final SubscribableChannel myWrappedChannel; @@ -74,13 +83,21 @@ public class SubscriptionChannelFactory { myWrappedChannel = theChannel; } + public SubscribableChannel getWrappedChannel() { + return myWrappedChannel; + } @Override protected boolean sendInternal(Message theMessage, long timeout) { - for (MessageHandler next : getSubscribers()) { - next.handleMessage(theMessage); - } - return true; +// try { + for (MessageHandler next : getSubscribers()) { + next.handleMessage(theMessage); + } + return true; +// } catch (Exception e) { +// ourLog.error("Failiure handling message", e); +// return false; +// } } @Override @@ -94,7 +111,6 @@ public class SubscriptionChannelFactory { ((DisposableBean) myWrappedChannel).destroy(); } } + } - - } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java index 62b4b2d00d6..d5bffd1cb5c 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java @@ -87,9 +87,9 @@ public class SubscriptionChannelRegistry { channel.close(); } myDeliveryReceiverChannels.closeAndRemove(channelName); + myChannelNameToSender.remove(channelName); } - myChannelNameToSender.remove(channelName); } public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/DaoResourceRetriever.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/DaoResourceRetriever.java index 0b7721be732..2b7d740841a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/DaoResourceRetriever.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/deliver/DaoResourceRetriever.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; +import ca.uhn.fhir.util.FhirTerser; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; @@ -35,10 +36,25 @@ import org.springframework.beans.factory.annotation.Autowired; public class DaoResourceRetriever implements IResourceRetriever { private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); + /** + * Constructor + */ + public DaoResourceRetriever() { + super(); + } + + /** + * Constructor + */ + public DaoResourceRetriever(FhirContext theFhirContext, DaoRegistry theDaoRegistry) { + myFhirContext = theFhirContext; + myDaoRegistry = theDaoRegistry; + } + @Autowired - FhirContext myFhirContext; + private FhirContext myFhirContext; @Autowired - DaoRegistry myDaoRegistry; + private DaoRegistry myDaoRegistry; @Override public IBaseResource getResource(IIdType payloadId) throws ResourceGoneException { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java index f1544848335..730296ab0f8 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/BaseSubscriberForSubscriptionResources.java @@ -23,18 +23,21 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHandler; +import static org.apache.commons.lang3.StringUtils.isBlank; + public abstract class BaseSubscriberForSubscriptionResources implements MessageHandler { @Autowired protected FhirContext myFhirContext; protected boolean isSubscription(ResourceModifiedMessage theNewResource) { - IIdType payloadId = theNewResource.getId(myFhirContext); - String payloadIdType = payloadId.getResourceType(); + IBaseResource payload = theNewResource.getNewPayload(myFhirContext); + String payloadIdType = myFhirContext.getResourceDefinition(payload).getName(); return payloadIdType.equals(ResourceTypeEnum.SUBSCRIPTION.getCode()); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java index 82202cb777f..ddb73db0863 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java @@ -4,6 +4,9 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFact import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.ContextStartedEvent; +import org.springframework.context.event.EventListener; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; @@ -46,8 +49,8 @@ public class MatchingQueueSubscriberLoader { protected SubscribableChannel myMatchingChannel; - @PostConstruct - public void start() { + @EventListener(classes = {ContextRefreshedEvent.class}) + public void handleContextRefreshEvent() { if (myMatchingChannel == null) { myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java index 0877bc1f4c5..49b6c74380d 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java @@ -20,6 +20,8 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.messaging.MessageChannel; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -56,7 +58,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer @Autowired private SubscriptionChannelFactory mySubscriptionChannelFactory; - private MessageChannel myMatchingChannel; + private volatile MessageChannel myMatchingChannel; /** * Constructor @@ -65,23 +67,28 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer super(); } - @PostConstruct - public void start() { - myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME); + @EventListener(classes = {ContextRefreshedEvent.class}) + public void startIfNeeded() { + if (myMatchingChannel == null) { + myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME); + } } @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED) public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) { + startIfNeeded(); submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest); } @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED) public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) { + startIfNeeded(); submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest); } @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED) public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) { + startIfNeeded(); submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java index a827abc7462..72814d0e088 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java @@ -53,7 +53,6 @@ public class SubscriptionSubmitInterceptorLoader { if (supportedSubscriptionTypes.isEmpty()) { ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions."); } else { - mySubscriptionMatcherInterceptor.start(); ourLog.info("Registering subscription matcher interceptor"); myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor); } diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionConfig.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java similarity index 90% rename from hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionConfig.java rename to hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java index 2e3589d9116..21b31aaeb13 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/SubscriptionTestConfig.java @@ -37,7 +37,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @Import({SearchParamConfig.class}) @EnableScheduling -public class SubscriptionConfig { +public class SubscriptionTestConfig { @Autowired private FhirContext myFhirContext; @@ -54,8 +54,8 @@ public class SubscriptionConfig { } @Bean - public SubscriptionChannelFactory subscriptionChannelFactory() { - return new SubscriptionChannelFactory(); + public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) { + return new SubscriptionChannelFactory(theQueueChannelFactory); } diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProvider.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProvider.java index 2780e3c2da5..8b8d5097de7 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProvider.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProvider.java @@ -31,7 +31,11 @@ import ca.uhn.fhir.model.api.IResource; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; import ca.uhn.fhir.rest.annotation.*; import ca.uhn.fhir.rest.api.MethodOutcome; -import ca.uhn.fhir.rest.api.server.*; +import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; +import ca.uhn.fhir.rest.api.server.IPreResourceShowDetails; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SimplePreResourceAccessDetails; +import ca.uhn.fhir.rest.api.server.SimplePreResourceShowDetails; import ca.uhn.fhir.rest.param.TokenAndListParam; import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenParam; @@ -48,7 +52,13 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -119,8 +129,8 @@ public class HashMapResourceProvider implements IResour } @Create - public MethodOutcome create(@ResourceParam T theResource) { - createInternal(theResource); + public MethodOutcome create(@ResourceParam T theResource, RequestDetails theRequestDetails) { + createInternal(theResource, theRequestDetails); myCreateCount.incrementAndGet(); @@ -130,17 +140,17 @@ public class HashMapResourceProvider implements IResour .setId(theResource.getIdElement()); } - private void createInternal(@ResourceParam T theResource) { + private void createInternal(@ResourceParam T theResource, RequestDetails theRequestDetails) { long idPart = myNextId++; String idPartAsString = Long.toString(idPart); Long versionIdPart = 1L; - IIdType id = store(theResource, idPartAsString, versionIdPart); + IIdType id = store(theResource, idPartAsString, versionIdPart, theRequestDetails); theResource.setId(id); } @Delete - public MethodOutcome delete(@IdParam IIdType theId) { + public MethodOutcome delete(@IdParam IIdType theId, RequestDetails theRequestDetails) { TreeMap versions = myIdToVersionToResourceMap.get(theId.getIdPart()); if (versions == null || versions.isEmpty()) { throw new ResourceNotFoundException(theId); @@ -148,7 +158,7 @@ public class HashMapResourceProvider implements IResour long nextVersion = versions.lastEntry().getKey() + 1L; - IIdType id = store(null, theId.getIdPart(), nextVersion); + IIdType id = store(null, theId.getIdPart(), nextVersion, theRequestDetails); myDeleteCount.incrementAndGet(); @@ -310,7 +320,7 @@ public class HashMapResourceProvider implements IResour return fireInterceptorsAndFilterAsNeeded(retVal, theRequestDetails); } - private IIdType store(@ResourceParam T theResource, String theIdPart, Long theVersionIdPart) { + private IIdType store(@ResourceParam T theResource, String theIdPart, Long theVersionIdPart, RequestDetails theRequestDetails) { IIdType id = myFhirContext.getVersion().newIdType(); String versionIdPart = Long.toString(theVersionIdPart); id.setParts(null, myResourceName, theIdPart, versionIdPart); @@ -348,6 +358,35 @@ public class HashMapResourceProvider implements IResour TreeMap versionToResource = getVersionToResource(theIdPart); versionToResource.put(theVersionIdPart, theResource); + if (theRequestDetails != null) { + IInterceptorBroadcaster interceptorBroadcaster = theRequestDetails.getInterceptorBroadcaster(); + + if (theResource != null) { + if (!myIdToHistory.containsKey(theIdPart)) { + + // Interceptor call: STORAGE_PRESTORAGE_RESOURCE_CREATED + HookParams params = new HookParams() + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails) + .add(IBaseResource.class, theResource); + interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED, params); + interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, params); + + } else { + + // Interceptor call: STORAGE_PRESTORAGE_RESOURCE_UPDATED + HookParams params = new HookParams() + .add(RequestDetails.class, theRequestDetails) + .addIfMatchesType(ServletRequestDetails.class, theRequestDetails) + .add(IBaseResource.class, myIdToHistory.get(theIdPart).getFirst()) + .add(IBaseResource.class, theResource); + interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED, params); + interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED, params); + + } + } + } + // Store to type history map myTypeHistory.addFirst(theResource); @@ -365,11 +404,12 @@ public class HashMapResourceProvider implements IResour @Update public MethodOutcome update( @ResourceParam T theResource, - @ConditionalUrlParam String theConditional) { + @ConditionalUrlParam String theConditional, + RequestDetails theRequestDetails) { ValidateUtil.isTrueOrThrowInvalidRequest(isBlank(theConditional), "This server doesn't support conditional update"); - boolean created = updateInternal(theResource); + boolean created = updateInternal(theResource, theRequestDetails); myUpdateCount.incrementAndGet(); return new MethodOutcome() @@ -378,7 +418,7 @@ public class HashMapResourceProvider implements IResour .setId(theResource.getIdElement()); } - private boolean updateInternal(@ResourceParam T theResource) { + private boolean updateInternal(@ResourceParam T theResource, RequestDetails theRequestDetails) { String idPartAsString = theResource.getIdElement().getIdPart(); TreeMap versionToResource = getVersionToResource(idPartAsString); @@ -392,7 +432,7 @@ public class HashMapResourceProvider implements IResour created = false; } - IIdType id = store(theResource, idPartAsString, versionIdPart); + IIdType id = store(theResource, idPartAsString, versionIdPart, theRequestDetails); theResource.setId(id); return created; } @@ -411,9 +451,9 @@ public class HashMapResourceProvider implements IResour */ public IIdType store(T theResource) { if (theResource.getIdElement().hasIdPart()) { - updateInternal(theResource); + updateInternal(theResource, null); } else { - createInternal(theResource); + createInternal(theResource, null); } return theResource.getIdElement(); } diff --git a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProviderTest.java b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProviderTest.java index 220b0094db3..d63632d313c 100644 --- a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProviderTest.java +++ b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/provider/HashMapResourceProviderTest.java @@ -1,6 +1,8 @@ package ca.uhn.fhir.rest.server.provider; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.gclient.IDeleteTyped; @@ -8,6 +10,7 @@ import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import ca.uhn.fhir.test.utilities.JettyUtil; import ca.uhn.fhir.util.TestUtil; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -21,6 +24,10 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,11 +35,17 @@ import javax.servlet.ServletException; import java.util.List; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; - -import ca.uhn.fhir.test.utilities.JettyUtil; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.matchesPattern; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +@RunWith(MockitoJUnitRunner.class) public class HashMapResourceProviderTest { private static final Logger ourLog = LoggerFactory.getLogger(HashMapResourceProviderTest.class); @@ -43,6 +56,9 @@ public class HashMapResourceProviderTest { private static HashMapResourceProvider myPatientResourceProvider; private static HashMapResourceProvider myObservationResourceProvider; + @Mock + private IAnonymousInterceptor myAnonymousInterceptor; + @Before public void before() { ourRestServer.clearData(); @@ -52,6 +68,9 @@ public class HashMapResourceProviderTest { @Test public void testCreateAndRead() { + ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED, myAnonymousInterceptor); + ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, myAnonymousInterceptor); + // Create Patient p = new Patient(); p.setActive(true); @@ -59,6 +78,9 @@ public class HashMapResourceProviderTest { assertThat(id.getIdPart(), matchesPattern("[0-9]+")); assertEquals("1", id.getVersionIdPart()); + verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED), any()); + verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED), any()); + // Read p = (Patient) ourClient.read().resource("Patient").withId(id).execute(); assertEquals(true, p.getActive()); @@ -282,6 +304,9 @@ public class HashMapResourceProviderTest { assertEquals("1", id.getVersionIdPart()); // Update + ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED, myAnonymousInterceptor); + ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED, myAnonymousInterceptor); + p = new Patient(); p.setId(id); p.setActive(false); @@ -289,6 +314,9 @@ public class HashMapResourceProviderTest { assertThat(id.getIdPart(), matchesPattern("[0-9]+")); assertEquals("2", id.getVersionIdPart()); + verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED), any()); + verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED), any()); + assertEquals(1, myPatientResourceProvider.getCountCreate()); assertEquals(1, myPatientResourceProvider.getCountUpdate()); @@ -305,33 +333,6 @@ public class HashMapResourceProviderTest { } } - @AfterClass - public static void afterClassClearContext() throws Exception { - JettyUtil.closeServer(ourListenerServer); - TestUtil.clearAllStaticFieldsForUnitTest(); - } - - @BeforeClass - public static void startListenerServer() throws Exception { - ourRestServer = new MyRestfulServer(); - - ourListenerServer = new Server(0); - - ServletContextHandler proxyHandler = new ServletContextHandler(); - proxyHandler.setContextPath("/"); - - ServletHolder servletHolder = new ServletHolder(); - servletHolder.setServlet(ourRestServer); - proxyHandler.addServlet(servletHolder, "/*"); - - ourListenerServer.setHandler(proxyHandler); - JettyUtil.startServer(ourListenerServer); - int ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); - String ourBase = "http://localhost:" + ourListenerPort + "/"; - ourCtx.getRestfulClientFactory().setSocketTimeout(120000); - ourClient = ourCtx.newRestfulGenericClient(ourBase); - } - private static class MyRestfulServer extends RestfulServer { MyRestfulServer() { @@ -359,5 +360,32 @@ public class HashMapResourceProviderTest { } + @AfterClass + public static void afterClassClearContext() throws Exception { + JettyUtil.closeServer(ourListenerServer); + TestUtil.clearAllStaticFieldsForUnitTest(); + } + + @BeforeClass + public static void startListenerServer() throws Exception { + ourRestServer = new MyRestfulServer(); + + ourListenerServer = new Server(0); + + ServletContextHandler proxyHandler = new ServletContextHandler(); + proxyHandler.setContextPath("/"); + + ServletHolder servletHolder = new ServletHolder(); + servletHolder.setServlet(ourRestServer); + proxyHandler.addServlet(servletHolder, "/*"); + + ourListenerServer.setHandler(proxyHandler); + JettyUtil.startServer(ourListenerServer); + int ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); + String ourBase = "http://localhost:" + ourListenerPort + "/"; + ourCtx.getRestfulClientFactory().setSocketTimeout(120000); + ourClient = ourCtx.newRestfulGenericClient(ourBase); + } + } diff --git a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java index ed50d5f9469..e9128ae289a 100644 --- a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java +++ b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java @@ -26,6 +26,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.Pointcut; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { private final String myName; private final AtomicLong myLastInvoke = new AtomicLong(); private final AtomicReference myCountdownLatch = new AtomicReference<>(); + private final AtomicReference myCountdownLatchSetStacktrace = new AtomicReference<>(); private final AtomicReference> myFailures = new AtomicReference<>(); private final AtomicReference> myCalledWith = new AtomicReference<>(); private final Pointcut myPointcut; @@ -80,7 +82,8 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { public void setExpectedCount(int theCount, boolean theExactMatch) { if (myCountdownLatch.get() != null) { - throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); + String previousStack = myCountdownLatchSetStacktrace.get(); + throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed. Previous set stack:\n" + previousStack); } myExactMatch = theExactMatch; createLatch(theCount); @@ -99,6 +102,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { myFailures.set(Collections.synchronizedList(new ArrayList<>())); myCalledWith.set(Collections.synchronizedList(new ArrayList<>())); myCountdownLatch.set(new CountDownLatch(theCount)); + try { + throw new Exception(); + } catch (Exception e) { + myCountdownLatchSetStacktrace.set(ExceptionUtils.getStackTrace(e)); + } myInitialCount = theCount; } @@ -151,6 +159,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { @Override public void clear() { myCountdownLatch.set(null); + myCountdownLatchSetStacktrace.set(null); } private String toCalledWithString() {