Work on subscription cleanup

This commit is contained in:
jamesagnew 2020-04-05 18:43:27 -04:00
parent d0884663d2
commit 40d0c27ae3
15 changed files with 211 additions and 84 deletions

View File

@ -51,7 +51,15 @@ public class DaoRegistry implements ApplicationContextAware, IDaoRegistry {
* Constructor * Constructor
*/ */
public DaoRegistry() { public DaoRegistry() {
this(null);
}
/**
* Constructor
*/
public DaoRegistry(FhirContext theFhirContext) {
super(); super();
myContext = theFhirContext;
} }
public void setSupportedResourceTypes(Collection<String> theSupportedResourceTypes) { public void setSupportedResourceTypes(Collection<String> theSupportedResourceTypes) {

View File

@ -163,7 +163,6 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class); mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class);
ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class); ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class);
ourSubscriptionMatcherInterceptor.start();
confProvider.setSearchParamRegistry(ourSearchParamRegistry); confProvider.setSearchParamRegistry(ourSearchParamRegistry);

View File

@ -166,7 +166,6 @@ public abstract class BaseResourceProviderR5Test extends BaseJpaR5Test {
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class); mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryImpl.class);
ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class); ourSubscriptionMatcherInterceptor = wac.getBean(SubscriptionMatcherInterceptor.class);
ourSubscriptionMatcherInterceptor.start();
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
confProvider.setSearchParamRegistry(ourSearchParamRegistry); confProvider.setSearchParamRegistry(ourSearchParamRegistry);

View File

@ -33,13 +33,13 @@ public class SubscriptionChannelConfig {
* Create a @Primary @Bean if you need a different implementation * Create a @Primary @Bean if you need a different implementation
*/ */
@Bean @Bean
public IQueueChannelFactory subscribableChannelFactory() { public IQueueChannelFactory queueChannelFactory() {
return new LinkedBlockingQueueChannelFactory(); return new LinkedBlockingQueueChannelFactory();
} }
@Bean @Bean
public SubscriptionChannelFactory subscriptionChannelFactory() { public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
return new SubscriptionChannelFactory(); return new SubscriptionChannelFactory(theQueueChannelFactory);
} }
} }

View File

@ -21,11 +21,13 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
*/ */
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
@ -35,24 +37,32 @@ import org.springframework.messaging.support.AbstractSubscribableChannel;
public class SubscriptionChannelFactory { public class SubscriptionChannelFactory {
@Autowired private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionChannelFactory.class);
private IQueueChannelFactory mySubscribableChannelFactory; private final IQueueChannelFactory myQueueChannelFactory;
/**
* Constructor
*/
public SubscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
Validate.notNull(theQueueChannelFactory);
myQueueChannelFactory = theQueueChannelFactory;
}
public MessageChannel newDeliverySendingChannel(String theChannelName) { public MessageChannel newDeliverySendingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
} }
public SubscribableChannel newDeliveryChannel(String theChannelName) { public SubscribableChannel newDeliveryChannel(String theChannelName) {
SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, getDeliveryChannelConcurrentConsumers());
return new BroadcastingSubscribableChannelWrapper(channel); return new BroadcastingSubscribableChannelWrapper(channel);
} }
public MessageChannel newMatchingSendingChannel(String theChannelName) { public MessageChannel newMatchingSendingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingReceivingChannel(String theChannelName) { public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {
SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); SubscribableChannel channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, getMatchingChannelConcurrentConsumers());
return new BroadcastingSubscribableChannelWrapper(channel); return new BroadcastingSubscribableChannelWrapper(channel);
} }
@ -64,8 +74,7 @@ public class SubscriptionChannelFactory {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS; return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
} }
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean {
private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler, DisposableBean {
private final SubscribableChannel myWrappedChannel; private final SubscribableChannel myWrappedChannel;
@ -74,13 +83,21 @@ public class SubscriptionChannelFactory {
myWrappedChannel = theChannel; myWrappedChannel = theChannel;
} }
public SubscribableChannel getWrappedChannel() {
return myWrappedChannel;
}
@Override @Override
protected boolean sendInternal(Message<?> theMessage, long timeout) { protected boolean sendInternal(Message<?> theMessage, long timeout) {
for (MessageHandler next : getSubscribers()) { // try {
next.handleMessage(theMessage); for (MessageHandler next : getSubscribers()) {
} next.handleMessage(theMessage);
return true; }
return true;
// } catch (Exception e) {
// ourLog.error("Failiure handling message", e);
// return false;
// }
} }
@Override @Override
@ -94,7 +111,6 @@ public class SubscriptionChannelFactory {
((DisposableBean) myWrappedChannel).destroy(); ((DisposableBean) myWrappedChannel).destroy();
} }
} }
} }
} }

View File

@ -87,9 +87,9 @@ public class SubscriptionChannelRegistry {
channel.close(); channel.close();
} }
myDeliveryReceiverChannels.closeAndRemove(channelName); myDeliveryReceiverChannels.closeAndRemove(channelName);
myChannelNameToSender.remove(channelName);
} }
myChannelNameToSender.remove(channelName);
} }
public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) { public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) {

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.FhirTerser;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,10 +36,25 @@ import org.springframework.beans.factory.annotation.Autowired;
public class DaoResourceRetriever implements IResourceRetriever { public class DaoResourceRetriever implements IResourceRetriever {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
/**
* Constructor
*/
public DaoResourceRetriever() {
super();
}
/**
* Constructor
*/
public DaoResourceRetriever(FhirContext theFhirContext, DaoRegistry theDaoRegistry) {
myFhirContext = theFhirContext;
myDaoRegistry = theDaoRegistry;
}
@Autowired @Autowired
FhirContext myFhirContext; private FhirContext myFhirContext;
@Autowired @Autowired
DaoRegistry myDaoRegistry; private DaoRegistry myDaoRegistry;
@Override @Override
public IBaseResource getResource(IIdType payloadId) throws ResourceGoneException { public IBaseResource getResource(IIdType payloadId) throws ResourceGoneException {

View File

@ -23,18 +23,21 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import static org.apache.commons.lang3.StringUtils.isBlank;
public abstract class BaseSubscriberForSubscriptionResources implements MessageHandler { public abstract class BaseSubscriberForSubscriptionResources implements MessageHandler {
@Autowired @Autowired
protected FhirContext myFhirContext; protected FhirContext myFhirContext;
protected boolean isSubscription(ResourceModifiedMessage theNewResource) { protected boolean isSubscription(ResourceModifiedMessage theNewResource) {
IIdType payloadId = theNewResource.getId(myFhirContext); IBaseResource payload = theNewResource.getNewPayload(myFhirContext);
String payloadIdType = payloadId.getResourceType(); String payloadIdType = myFhirContext.getResourceDefinition(payload).getName();
return payloadIdType.equals(ResourceTypeEnum.SUBSCRIPTION.getCode()); return payloadIdType.equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
} }

View File

@ -4,6 +4,9 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFact
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
@ -46,8 +49,8 @@ public class MatchingQueueSubscriberLoader {
protected SubscribableChannel myMatchingChannel; protected SubscribableChannel myMatchingChannel;
@PostConstruct @EventListener(classes = {ContextRefreshedEvent.class})
public void start() { public void handleContextRefreshEvent() {
if (myMatchingChannel == null) { if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME); myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
} }

View File

@ -20,6 +20,8 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -56,7 +58,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@Autowired @Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory; private SubscriptionChannelFactory mySubscriptionChannelFactory;
private MessageChannel myMatchingChannel; private volatile MessageChannel myMatchingChannel;
/** /**
* Constructor * Constructor
@ -65,23 +67,28 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
super(); super();
} }
@PostConstruct @EventListener(classes = {ContextRefreshedEvent.class})
public void start() { public void startIfNeeded() {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME); if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME);
}
} }
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED) @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) { public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
startIfNeeded();
submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest); submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
} }
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED) @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) { public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
startIfNeeded();
submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest); submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
} }
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED) @Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) { public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
startIfNeeded();
submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest); submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
} }

View File

@ -53,7 +53,6 @@ public class SubscriptionSubmitInterceptorLoader {
if (supportedSubscriptionTypes.isEmpty()) { if (supportedSubscriptionTypes.isEmpty()) {
ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions."); ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions.");
} else { } else {
mySubscriptionMatcherInterceptor.start();
ourLog.info("Registering subscription matcher interceptor"); ourLog.info("Registering subscription matcher interceptor");
myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor); myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor);
} }

View File

@ -37,7 +37,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration @Configuration
@Import({SearchParamConfig.class}) @Import({SearchParamConfig.class})
@EnableScheduling @EnableScheduling
public class SubscriptionConfig { public class SubscriptionTestConfig {
@Autowired @Autowired
private FhirContext myFhirContext; private FhirContext myFhirContext;
@ -54,8 +54,8 @@ public class SubscriptionConfig {
} }
@Bean @Bean
public SubscriptionChannelFactory subscriptionChannelFactory() { public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
return new SubscriptionChannelFactory(); return new SubscriptionChannelFactory(theQueueChannelFactory);
} }

View File

@ -31,7 +31,11 @@ import ca.uhn.fhir.model.api.IResource;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.rest.annotation.*; import ca.uhn.fhir.rest.annotation.*;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.*; import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.IPreResourceShowDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SimplePreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.SimplePreResourceShowDetails;
import ca.uhn.fhir.rest.param.TokenAndListParam; import ca.uhn.fhir.rest.param.TokenAndListParam;
import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.TokenParam;
@ -48,7 +52,13 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
@ -119,8 +129,8 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
} }
@Create @Create
public MethodOutcome create(@ResourceParam T theResource) { public MethodOutcome create(@ResourceParam T theResource, RequestDetails theRequestDetails) {
createInternal(theResource); createInternal(theResource, theRequestDetails);
myCreateCount.incrementAndGet(); myCreateCount.incrementAndGet();
@ -130,17 +140,17 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
.setId(theResource.getIdElement()); .setId(theResource.getIdElement());
} }
private void createInternal(@ResourceParam T theResource) { private void createInternal(@ResourceParam T theResource, RequestDetails theRequestDetails) {
long idPart = myNextId++; long idPart = myNextId++;
String idPartAsString = Long.toString(idPart); String idPartAsString = Long.toString(idPart);
Long versionIdPart = 1L; Long versionIdPart = 1L;
IIdType id = store(theResource, idPartAsString, versionIdPart); IIdType id = store(theResource, idPartAsString, versionIdPart, theRequestDetails);
theResource.setId(id); theResource.setId(id);
} }
@Delete @Delete
public MethodOutcome delete(@IdParam IIdType theId) { public MethodOutcome delete(@IdParam IIdType theId, RequestDetails theRequestDetails) {
TreeMap<Long, T> versions = myIdToVersionToResourceMap.get(theId.getIdPart()); TreeMap<Long, T> versions = myIdToVersionToResourceMap.get(theId.getIdPart());
if (versions == null || versions.isEmpty()) { if (versions == null || versions.isEmpty()) {
throw new ResourceNotFoundException(theId); throw new ResourceNotFoundException(theId);
@ -148,7 +158,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
long nextVersion = versions.lastEntry().getKey() + 1L; long nextVersion = versions.lastEntry().getKey() + 1L;
IIdType id = store(null, theId.getIdPart(), nextVersion); IIdType id = store(null, theId.getIdPart(), nextVersion, theRequestDetails);
myDeleteCount.incrementAndGet(); myDeleteCount.incrementAndGet();
@ -310,7 +320,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
return fireInterceptorsAndFilterAsNeeded(retVal, theRequestDetails); return fireInterceptorsAndFilterAsNeeded(retVal, theRequestDetails);
} }
private IIdType store(@ResourceParam T theResource, String theIdPart, Long theVersionIdPart) { private IIdType store(@ResourceParam T theResource, String theIdPart, Long theVersionIdPart, RequestDetails theRequestDetails) {
IIdType id = myFhirContext.getVersion().newIdType(); IIdType id = myFhirContext.getVersion().newIdType();
String versionIdPart = Long.toString(theVersionIdPart); String versionIdPart = Long.toString(theVersionIdPart);
id.setParts(null, myResourceName, theIdPart, versionIdPart); id.setParts(null, myResourceName, theIdPart, versionIdPart);
@ -348,6 +358,35 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
TreeMap<Long, T> versionToResource = getVersionToResource(theIdPart); TreeMap<Long, T> versionToResource = getVersionToResource(theIdPart);
versionToResource.put(theVersionIdPart, theResource); versionToResource.put(theVersionIdPart, theResource);
if (theRequestDetails != null) {
IInterceptorBroadcaster interceptorBroadcaster = theRequestDetails.getInterceptorBroadcaster();
if (theResource != null) {
if (!myIdToHistory.containsKey(theIdPart)) {
// Interceptor call: STORAGE_PRESTORAGE_RESOURCE_CREATED
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
.add(IBaseResource.class, theResource);
interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED, params);
interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, params);
} else {
// Interceptor call: STORAGE_PRESTORAGE_RESOURCE_UPDATED
HookParams params = new HookParams()
.add(RequestDetails.class, theRequestDetails)
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
.add(IBaseResource.class, myIdToHistory.get(theIdPart).getFirst())
.add(IBaseResource.class, theResource);
interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED, params);
interceptorBroadcaster.callHooks(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED, params);
}
}
}
// Store to type history map // Store to type history map
myTypeHistory.addFirst(theResource); myTypeHistory.addFirst(theResource);
@ -365,11 +404,12 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
@Update @Update
public MethodOutcome update( public MethodOutcome update(
@ResourceParam T theResource, @ResourceParam T theResource,
@ConditionalUrlParam String theConditional) { @ConditionalUrlParam String theConditional,
RequestDetails theRequestDetails) {
ValidateUtil.isTrueOrThrowInvalidRequest(isBlank(theConditional), "This server doesn't support conditional update"); ValidateUtil.isTrueOrThrowInvalidRequest(isBlank(theConditional), "This server doesn't support conditional update");
boolean created = updateInternal(theResource); boolean created = updateInternal(theResource, theRequestDetails);
myUpdateCount.incrementAndGet(); myUpdateCount.incrementAndGet();
return new MethodOutcome() return new MethodOutcome()
@ -378,7 +418,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
.setId(theResource.getIdElement()); .setId(theResource.getIdElement());
} }
private boolean updateInternal(@ResourceParam T theResource) { private boolean updateInternal(@ResourceParam T theResource, RequestDetails theRequestDetails) {
String idPartAsString = theResource.getIdElement().getIdPart(); String idPartAsString = theResource.getIdElement().getIdPart();
TreeMap<Long, T> versionToResource = getVersionToResource(idPartAsString); TreeMap<Long, T> versionToResource = getVersionToResource(idPartAsString);
@ -392,7 +432,7 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
created = false; created = false;
} }
IIdType id = store(theResource, idPartAsString, versionIdPart); IIdType id = store(theResource, idPartAsString, versionIdPart, theRequestDetails);
theResource.setId(id); theResource.setId(id);
return created; return created;
} }
@ -411,9 +451,9 @@ public class HashMapResourceProvider<T extends IBaseResource> implements IResour
*/ */
public IIdType store(T theResource) { public IIdType store(T theResource) {
if (theResource.getIdElement().hasIdPart()) { if (theResource.getIdElement().hasIdPart()) {
updateInternal(theResource); updateInternal(theResource, null);
} else { } else {
createInternal(theResource); createInternal(theResource, null);
} }
return theResource.getIdElement(); return theResource.getIdElement();
} }

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.rest.server.provider; package ca.uhn.fhir.rest.server.provider;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.gclient.IDeleteTyped; import ca.uhn.fhir.rest.gclient.IDeleteTyped;
@ -8,6 +10,7 @@ import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.fhir.util.TestUtil; import ca.uhn.fhir.util.TestUtil;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
@ -21,6 +24,10 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,11 +35,17 @@ import javax.servlet.ServletException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.*; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.matchesPattern;
import ca.uhn.fhir.test.utilities.JettyUtil; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class HashMapResourceProviderTest { public class HashMapResourceProviderTest {
private static final Logger ourLog = LoggerFactory.getLogger(HashMapResourceProviderTest.class); private static final Logger ourLog = LoggerFactory.getLogger(HashMapResourceProviderTest.class);
@ -43,6 +56,9 @@ public class HashMapResourceProviderTest {
private static HashMapResourceProvider<Patient> myPatientResourceProvider; private static HashMapResourceProvider<Patient> myPatientResourceProvider;
private static HashMapResourceProvider<Observation> myObservationResourceProvider; private static HashMapResourceProvider<Observation> myObservationResourceProvider;
@Mock
private IAnonymousInterceptor myAnonymousInterceptor;
@Before @Before
public void before() { public void before() {
ourRestServer.clearData(); ourRestServer.clearData();
@ -52,6 +68,9 @@ public class HashMapResourceProviderTest {
@Test @Test
public void testCreateAndRead() { public void testCreateAndRead() {
ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED, myAnonymousInterceptor);
ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED, myAnonymousInterceptor);
// Create // Create
Patient p = new Patient(); Patient p = new Patient();
p.setActive(true); p.setActive(true);
@ -59,6 +78,9 @@ public class HashMapResourceProviderTest {
assertThat(id.getIdPart(), matchesPattern("[0-9]+")); assertThat(id.getIdPart(), matchesPattern("[0-9]+"));
assertEquals("1", id.getVersionIdPart()); assertEquals("1", id.getVersionIdPart());
verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED), any());
verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED), any());
// Read // Read
p = (Patient) ourClient.read().resource("Patient").withId(id).execute(); p = (Patient) ourClient.read().resource("Patient").withId(id).execute();
assertEquals(true, p.getActive()); assertEquals(true, p.getActive());
@ -282,6 +304,9 @@ public class HashMapResourceProviderTest {
assertEquals("1", id.getVersionIdPart()); assertEquals("1", id.getVersionIdPart());
// Update // Update
ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED, myAnonymousInterceptor);
ourRestServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED, myAnonymousInterceptor);
p = new Patient(); p = new Patient();
p.setId(id); p.setId(id);
p.setActive(false); p.setActive(false);
@ -289,6 +314,9 @@ public class HashMapResourceProviderTest {
assertThat(id.getIdPart(), matchesPattern("[0-9]+")); assertThat(id.getIdPart(), matchesPattern("[0-9]+"));
assertEquals("2", id.getVersionIdPart()); assertEquals("2", id.getVersionIdPart());
verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED), any());
verify(myAnonymousInterceptor, Mockito.times(1)).invoke(eq(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED), any());
assertEquals(1, myPatientResourceProvider.getCountCreate()); assertEquals(1, myPatientResourceProvider.getCountCreate());
assertEquals(1, myPatientResourceProvider.getCountUpdate()); assertEquals(1, myPatientResourceProvider.getCountUpdate());
@ -305,33 +333,6 @@ public class HashMapResourceProviderTest {
} }
} }
@AfterClass
public static void afterClassClearContext() throws Exception {
JettyUtil.closeServer(ourListenerServer);
TestUtil.clearAllStaticFieldsForUnitTest();
}
@BeforeClass
public static void startListenerServer() throws Exception {
ourRestServer = new MyRestfulServer();
ourListenerServer = new Server(0);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourRestServer);
proxyHandler.addServlet(servletHolder, "/*");
ourListenerServer.setHandler(proxyHandler);
JettyUtil.startServer(ourListenerServer);
int ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer);
String ourBase = "http://localhost:" + ourListenerPort + "/";
ourCtx.getRestfulClientFactory().setSocketTimeout(120000);
ourClient = ourCtx.newRestfulGenericClient(ourBase);
}
private static class MyRestfulServer extends RestfulServer { private static class MyRestfulServer extends RestfulServer {
MyRestfulServer() { MyRestfulServer() {
@ -359,5 +360,32 @@ public class HashMapResourceProviderTest {
} }
@AfterClass
public static void afterClassClearContext() throws Exception {
JettyUtil.closeServer(ourListenerServer);
TestUtil.clearAllStaticFieldsForUnitTest();
}
@BeforeClass
public static void startListenerServer() throws Exception {
ourRestServer = new MyRestfulServer();
ourListenerServer = new Server(0);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourRestServer);
proxyHandler.addServlet(servletHolder, "/*");
ourListenerServer.setHandler(proxyHandler);
JettyUtil.startServer(ourListenerServer);
int ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer);
String ourBase = "http://localhost:" + ourListenerPort + "/";
ourCtx.getRestfulClientFactory().setSocketTimeout(120000);
ourClient = ourCtx.newRestfulGenericClient(ourBase);
}
} }

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,6 +48,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private final String myName; private final String myName;
private final AtomicLong myLastInvoke = new AtomicLong(); private final AtomicLong myLastInvoke = new AtomicLong();
private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>(); private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>();
private final AtomicReference<String> myCountdownLatchSetStacktrace = new AtomicReference<>();
private final AtomicReference<List<String>> myFailures = new AtomicReference<>(); private final AtomicReference<List<String>> myFailures = new AtomicReference<>();
private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>(); private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>();
private final Pointcut myPointcut; private final Pointcut myPointcut;
@ -80,7 +82,8 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
public void setExpectedCount(int theCount, boolean theExactMatch) { public void setExpectedCount(int theCount, boolean theExactMatch) {
if (myCountdownLatch.get() != null) { if (myCountdownLatch.get() != null) {
throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed."); String previousStack = myCountdownLatchSetStacktrace.get();
throw new PointcutLatchException("setExpectedCount() called before previous awaitExpected() completed. Previous set stack:\n" + previousStack);
} }
myExactMatch = theExactMatch; myExactMatch = theExactMatch;
createLatch(theCount); createLatch(theCount);
@ -99,6 +102,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
myFailures.set(Collections.synchronizedList(new ArrayList<>())); myFailures.set(Collections.synchronizedList(new ArrayList<>()));
myCalledWith.set(Collections.synchronizedList(new ArrayList<>())); myCalledWith.set(Collections.synchronizedList(new ArrayList<>()));
myCountdownLatch.set(new CountDownLatch(theCount)); myCountdownLatch.set(new CountDownLatch(theCount));
try {
throw new Exception();
} catch (Exception e) {
myCountdownLatchSetStacktrace.set(ExceptionUtils.getStackTrace(e));
}
myInitialCount = theCount; myInitialCount = theCount;
} }
@ -151,6 +159,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override @Override
public void clear() { public void clear() {
myCountdownLatch.set(null); myCountdownLatch.set(null);
myCountdownLatchSetStacktrace.set(null);
} }
private String toCalledWithString() { private String toCalledWithString() {