Work on subscription cleanup

This commit is contained in:
jamesagnew 2020-04-04 16:07:01 -04:00
parent 68db40d057
commit fef447afee
23 changed files with 293 additions and 233 deletions

View File

@ -36,7 +36,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.provider.r4.JpaConformanceProviderR4;
import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.model.dstu2.composite.MetaDt;
import ca.uhn.fhir.model.dstu2.resource.Bundle;
@ -173,9 +173,6 @@ public class JpaServerDemo extends RestfulServer {
daoConfig.setEnforceReferentialIntegrityOnWrite(!ContextHolder.isDisableReferentialIntegrity());
daoConfig.setReuseCachedSearchResultsForMillis(ContextHolder.getReuseCachedSearchResultsForMillis());
SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class);
submitInterceptorLoader.registerInterceptors();
DaoRegistry daoRegistry = myAppCtx.getBean(DaoRegistry.class);
IInterceptorBroadcaster interceptorBroadcaster = myAppCtx.getBean(IInterceptorBroadcaster.class);
CascadingDeleteInterceptor cascadingDeleteInterceptor = new CascadingDeleteInterceptor(daoRegistry, interceptorBroadcaster);

View File

@ -1559,28 +1559,6 @@ public class DaoConfig {
myEnableInMemorySubscriptionMatching = theEnableInMemorySubscriptionMatching;
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
*
* @since 3.7.0
*/
public boolean isSubscriptionMatchingEnabled() {
return myModelConfig.isSubscriptionMatchingEnabled();
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
*
* @since 3.7.0
*/
public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
myModelConfig.setSubscriptionMatchingEnabled(theSubscriptionMatchingEnabled);
}
public ModelConfig getModelConfig() {
return myModelConfig;
}
@ -1703,6 +1681,8 @@ public class DaoConfig {
/**
* This setting indicates which subscription channel types are supported by the server. Any subscriptions submitted
* to the server matching these types will be activated.
*
* @see #addSupportedSubscriptionType(Subscription.SubscriptionChannelType)
*/
public Set<Subscription.SubscriptionChannelType> getSupportedSubscriptionTypes() {
return myModelConfig.getSupportedSubscriptionTypes();

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
@ -22,7 +22,7 @@ public class SubscriptionTestUtil {
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private SubmitInterceptorLoader mySubmitInterceptorLoader;
private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
@ -50,22 +50,22 @@ public class SubscriptionTestUtil {
public void registerEmailInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.EMAIL);
mySubmitInterceptorLoader.registerInterceptors();
mySubscriptionSubmitInterceptorLoader.start();
}
public void registerRestHookInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK);
mySubmitInterceptorLoader.registerInterceptors();
mySubscriptionSubmitInterceptorLoader.start();
}
public void registerWebSocketInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET);
mySubmitInterceptorLoader.registerInterceptors();
mySubscriptionSubmitInterceptorLoader.start();
}
public void unregisterSubscriptionInterceptor() {
myDaoConfig.clearSupportedSubscriptionTypesForUnitTest();
mySubmitInterceptorLoader.unregisterInterceptorsForUnitTest();
mySubscriptionSubmitInterceptorLoader.unregisterInterceptorsForUnitTest();
}
public int getExecutorQueueSizeForUnitTests() {

View File

@ -58,7 +58,6 @@ public class ModelConfig {
private boolean myDefaultSearchParamsCanBeOverridden = false;
private Set<Subscription.SubscriptionChannelType> mySupportedSubscriptionTypes = new HashSet<>();
private String myEmailFromAddress = "noreply@unknown.com";
private boolean mySubscriptionMatchingEnabled = true;
private String myWebsocketContextPath = DEFAULT_WEBSOCKET_CONTEXT_PATH;
/**
@ -330,27 +329,6 @@ public class ModelConfig {
return Collections.unmodifiableSet(mySupportedSubscriptionTypes);
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
* @since 3.7.0
*/
public boolean isSubscriptionMatchingEnabled() {
return mySubscriptionMatchingEnabled;
}
/**
* If set to <code>true</code> (default is true) the server will match incoming resources against active subscriptions
* and send them to the subscription channel. If set to <code>false</code> no matching or sending occurs.
* @since 3.7.0
*/
public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
}
@VisibleForTesting
public void clearSupportedSubscriptionTypesForUnitTest() {
mySupportedSubscriptionTypes.clear();

View File

@ -78,6 +78,11 @@
<!-- test dependencies -->
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-test-utilities</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>

View File

@ -50,9 +50,4 @@ public interface IQueueChannelFactory {
*/
MessageChannel getOrCreateSender(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers);
// FIXME: can these be removed?
int getDeliveryChannelConcurrentConsumers();
// FIXME: can these be removed?
int getMatchingChannelConcurrentConsumers();
}

View File

@ -33,6 +33,13 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
private Map<String, SubscribableChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
/**
* Constructor
*/
public LinkedBlockingQueueChannelFactory() {
super();
}
@Override
public SubscribableChannel getOrCreateReceiver(String theChannelName, Class<?> theMessageType, int theConcurrentConsumers) {
return getOrCreateChannel(theChannelName, theConcurrentConsumers);
@ -44,17 +51,11 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
}
private SubscribableChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
return myChannels.computeIfAbsent(theChannelName, t ->
new LinkedBlockingQueueChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers));
return myChannels.computeIfAbsent(theChannelName, t -> {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
String threadNamingPattern = theChannelName + "-%d";
return new LinkedBlockingQueueChannel(queue, threadNamingPattern, theConcurrentConsumers);
});
}
@Override
public int getDeliveryChannelConcurrentConsumers() {
return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
}
@Override
public int getMatchingChannelConcurrentConsumers() {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
}
}

View File

@ -20,12 +20,17 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.AbstractSubscribableChannel;
public class SubscriptionChannelFactory {
@ -33,15 +38,48 @@ public class SubscriptionChannelFactory {
private IQueueChannelFactory mySubscribableChannelFactory;
public SubscribableChannel newDeliveryChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers());
return new BroadcastingSubscribableChannelWrapper(channel);
}
public MessageChannel newMatchingSendingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers());
}
public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers());
return new BroadcastingSubscribableChannelWrapper(channel);
}
public int getDeliveryChannelConcurrentConsumers() {
return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
}
public int getMatchingChannelConcurrentConsumers() {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
}
private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler {
public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) {
theChannel.subscribe(this);
}
@Override
protected boolean sendInternal(Message<?> theMessage, long timeout) {
for (MessageHandler next : getSubscribers()) {
next.handleMessage(theMessage);
}
return true;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
send(message);
}
}
}

View File

@ -51,9 +51,6 @@ public class SubscriptionChannelRegistry {
private ModelConfig myModelConfig;
public synchronized void add(ActiveSubscription theActiveSubscription) {
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
return;
}
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
@ -75,9 +72,6 @@ public class SubscriptionChannelRegistry {
}
public synchronized void remove(ActiveSubscription theActiveSubscription) {
if (!myModelConfig.isSubscriptionMatchingEnabled()) {
return;
}
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());

View File

@ -0,0 +1,44 @@
package ca.uhn.fhir.jpa.subscription.model.config;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2020 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SubscriptionModelConfig {
@Bean
public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
}
@Bean
public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() {
return new SubscriptionStrategyEvaluator();
}
}

View File

@ -20,10 +20,10 @@ package ca.uhn.fhir.jpa.subscription.process.config;
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.DaoResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.IResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender;
@ -34,19 +34,18 @@ import ca.uhn.fhir.jpa.subscription.process.matcher.matching.CompositeInMemoryDa
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.MatchingQueueSubscriberLoader;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionRegisteringSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.DaoSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.process.registry.ISubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
@ -55,6 +54,7 @@ import org.springframework.context.annotation.Scope;
* This Spring config should be imported by a system that pulls messages off of the
* matching queue for processing, and handles delivery
*/
@Import(SubscriptionModelConfig.class)
public class SubscriptionProcessorConfig {
@Bean
@ -102,16 +102,6 @@ public class SubscriptionProcessorConfig {
return new WebsocketConnectionValidator();
}
@Bean
public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() {
return new SubscriptionStrategyEvaluator();
}
@Bean
public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
return new SubscriptionCanonicalizer(theFhirContext);
}
@Bean
public SubscriptionLoader subscriptionLoader() {
return new SubscriptionLoader();
@ -127,12 +117,6 @@ public class SubscriptionProcessorConfig {
return new SubscriptionDeliveryHandlerFactory();
}
@Bean
@Lazy
public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() {
return new SubscriptionTriggeringSvcImpl();
}
@Bean
@Scope("prototype")
public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() {

View File

@ -21,7 +21,19 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.matching;
*/
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
public interface IResourceModifiedConsumer {
/**
* This is an internal API - Use with caution!
*/
void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest);
/**
* This is an internal API - Use with caution!
*/
void submitResourceModified(ResourceModifiedMessage theMsg);
}

View File

@ -29,6 +29,13 @@ public class SubscriptionStrategyEvaluator {
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
/**
* Constructor
*/
public SubscriptionStrategyEvaluator() {
super();
}
public SubscriptionMatchingStrategy determineStrategy(String theCriteria) {
InMemoryMatchResult result = myInMemoryResourceMatcher.match(theCriteria, null, null);
if (result.supported()) {

View File

@ -24,43 +24,29 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* Responsible for transitioning subscription resources from REQUESTED to ACTIVE
@ -69,14 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
*/
public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
@Autowired
private PlatformTransactionManager myTransactionManager;
// FIXME: use constant if this is still needed
@Autowired
@Qualifier("hapiJpaTaskExecutor")
private AsyncTaskExecutor myTaskExecutor;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
@ -133,43 +114,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/*
* If we're in a transaction, we don't want to try and change the status from
* requested to active within the same transaction because it's too late by
* the time we get here to make modifications to the payload.
*
* So, we register a synchronization, meaning that when the transaction is
* finished, we'll schedule a task to do this in a separate worker thread
* to avoid any possibility of conflict.
*/
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
@Override
public void run() {
activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
}
});
/*
* If we're running in a unit test, it's nice to be predictable in
* terms of order... In the real world it's a recipe for deadlocks
*/
if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
try {
activationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
ourLog.error("Failed to activate subscription", e);
}
}
}
});
return true;
} else {
return activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
}
return activateSubscription(theSubscription);
} else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) {
return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else {
@ -179,14 +124,14 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
}
@SuppressWarnings("unchecked")
private boolean activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
private boolean activateSubscription(final IBaseResource theSubscription) {
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement());
subscription.setId(subscription.getIdElement().toVersionless());
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus);
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
try {
SubscriptionUtil.setStatus(myFhirContext, subscription, theActiveStatus);
SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
subscriptionDao.update(subscription);
return true;
} catch (final UnprocessableEntityException e) {
@ -214,9 +159,4 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
});
}
@VisibleForTesting
public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
}
}

View File

@ -67,6 +67,13 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
/**
* Constructor
*/
public SubscriptionMatchingSubscriber() {
super();
}
@Override
public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {

View File

@ -59,6 +59,13 @@ public class SubscriptionLoader {
@Autowired
private ISchedulerService mySchedulerService;
/**
* Constructor
*/
public SubscriptionLoader() {
super();
}
/**
* Read the existing subscriptions from the database
*/
@ -76,18 +83,14 @@ public class SubscriptionLoader {
}
}
@VisibleForTesting
void acquireSemaphoreForUnitTest() throws InterruptedException {
mySyncSubscriptionsSemaphore.acquire();
}
@PostConstruct
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail);
syncSubscriptions();
}
public static class Job implements HapiJob {
@ -158,9 +161,5 @@ public class SubscriptionLoader {
}
}
@VisibleForTesting
public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
mySubscriptionProvider = theSubscriptionProvider;
}
}

View File

@ -20,17 +20,23 @@ package ca.uhn.fhir.jpa.subscription.submit.config;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
/**
* This Spring config should be imported by a system that submits resources to the
* matching queue for processing
*/
@Configuration
@Import(SubscriptionModelConfig.class)
public class SubscriptionSubmitterConfig {
@Bean
@ -44,8 +50,15 @@ public class SubscriptionSubmitterConfig {
}
@Bean
public SubmitInterceptorLoader subscriptionMatcherInterceptorLoader() {
return new SubmitInterceptorLoader();
public SubscriptionSubmitInterceptorLoader subscriptionMatcherInterceptorLoader() {
return new SubscriptionSubmitInterceptorLoader();
}
@Bean
@Lazy
public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() {
return new SubscriptionTriggeringSvcImpl();
}
}

View File

@ -52,8 +52,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@Autowired
private FhirContext myFhirContext;
@Autowired
private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@ -87,7 +85,11 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
}
private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
/**
* This is an internal API - Use with caution!
*/
@Override
public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
@ -101,16 +103,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
submitResourceModified(msg);
}
protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
}
public void setFhirContext(FhirContext theCtx) {
myFhirContext = theCtx;
}
/**
* This is an internal API - Use with caution!
*/
@ -138,6 +130,16 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
}
}
protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
}
public void setFhirContext(FhirContext theCtx) {
myFhirContext = theCtx;
}
@VisibleForTesting
public LinkedBlockingQueueChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingQueueChannel) myMatchingChannel;

View File

@ -22,10 +22,6 @@ package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.dstu2.model.Subscription;
import org.slf4j.Logger;
@ -33,52 +29,38 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import javax.annotation.PostConstruct;
import java.util.Set;
public class SubmitInterceptorLoader {
private static final Logger ourLog = LoggerFactory.getLogger(SubmitInterceptorLoader.class);
public class SubscriptionSubmitInterceptorLoader {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionSubmitInterceptorLoader.class);
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
private SubscriptionValidatingInterceptor mySubscriptionValidatingInterceptor;
@Autowired
DaoConfig myDaoConfig;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
private DaoConfig myDaoConfig;
@Autowired
private ApplicationContext myApplicationContext;
@Autowired
private IInterceptorService myInterceptorRegistry;
public void registerInterceptors() {
@PostConstruct
public void start() {
Set<Subscription.SubscriptionChannelType> supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();
if (supportedSubscriptionTypes.isEmpty()) {
ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions.");
} else {
loadSubscriptions();
if (myDaoConfig.isSubscriptionMatchingEnabled()) {
mySubscriptionMatcherInterceptor.start();
ourLog.info("Registering subscription matcher interceptor");
myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor);
}
mySubscriptionMatcherInterceptor.start();
ourLog.info("Registering subscription matcher interceptor");
myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor);
}
myInterceptorRegistry.registerInterceptor(mySubscriptionValidatingInterceptor);
}
private void loadSubscriptions() {
ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
// Load active subscriptions into the SubscriptionRegistry and activate their channels
SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class);
loader.syncSubscriptions();
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
ourLog.info("...{} subscription channels started", mySubscriptionChannelRegistry.size());
}
@VisibleForTesting
public void unregisterInterceptorsForUnitTest() {
myInterceptorRegistry.unregisterInterceptor(mySubscriptionMatcherInterceptor);

View File

@ -108,12 +108,7 @@ public class SubscriptionValidatingInterceptor {
throw new UnprocessableEntityException("Subscription.criteria must be in the form \"{Resource Type}?[params]\"");
}
if (subscription.getChannelType() == null) {
throw new UnprocessableEntityException("Subscription.channel.type must be populated");
} else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
validateChannelPayload(subscription);
validateChannelEndpoint(subscription);
}
validateChannelType(subscription);
if (!myDaoRegistry.isResourceTypeSupported(resType)) {
throw new UnprocessableEntityException("Subscription.criteria contains invalid/unsupported resource type: " + resType);
@ -133,6 +128,16 @@ public class SubscriptionValidatingInterceptor {
}
}
@SuppressWarnings("WeakerAccess")
protected void validateChannelType(CanonicalSubscription theSubscription) {
if (theSubscription.getChannelType() == null) {
throw new UnprocessableEntityException("Subscription.channel.type must be populated");
} else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
validateChannelPayload(theSubscription);
validateChannelEndpoint(theSubscription);
}
}
@SuppressWarnings("WeakerAccess")
protected void validateChannelEndpoint(CanonicalSubscription theResource) {
if (isBlank(theResource.getEndpointUrl())) {

View File

@ -42,8 +42,6 @@ public class SubscriptionChannelRegistryTest {
@Test
public void testAddAddRemoveRemove() {
when(myModelConfig.isSubscriptionMatchingEnabled()).thenReturn(true);
CanonicalSubscription cansubA = new CanonicalSubscription();
cansubA.setIdElement(new IdDt("A"));
ActiveSubscription activeSubscriptionA = new ActiveSubscription(cansubA, TEST_CHANNEL_NAME);

View File

@ -0,0 +1,85 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import org.hl7.fhir.dstu2.model.Subscription;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
SubscriptionSubmitterConfig.class,
SearchParamConfig.class,
SubscriptionSubmitInterceptorLoaderTest.MyConfig.class
})
public class SubscriptionSubmitInterceptorLoaderTest {
@MockBean
private ISearchParamProvider mySearchParamProvider;
@MockBean
private ISchedulerService mySchedulerService;
@MockBean
private IInterceptorService myInterceptorService;
@MockBean
private IValidationSupport myValidationSupport;
@MockBean
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@MockBean
private DaoRegistry myDaoRegistry;
@Autowired
private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
/**
* It should be possible to run only the {@link SubscriptionSubmitterConfig} without the
* {@link ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig}
*/
@Test
public void testLoaderCanRunWithoutProcessorConfigLoaded() {
verify(myInterceptorService, times(1)).registerInterceptor(eq(mySubscriptionMatcherInterceptor));
}
@Configuration
public static class MyConfig {
@Bean
public FhirContext fhirContext() {
return FhirContext.forR4();
}
@Bean
public ModelConfig modelConfig() {
return new ModelConfig();
}
@Bean
public DaoConfig daoConfig() {
DaoConfig daoConfig = new DaoConfig();
daoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK);
return daoConfig;
}
}
}

View File

@ -20,7 +20,7 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5;
import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.rest.api.EncodingEnum;
@ -252,12 +252,6 @@ public class TestRestfulServer extends RestfulServer {
*/
setPagingProvider(myAppCtx.getBean(DatabaseBackedPagingProvider.class));
/*
* Register subscription interceptors
*/
SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class);
submitInterceptorLoader.registerInterceptors();
/*
* Cascading deletes
*/