From fef447afeebd6a82eb39f5a88939fc616f757f31 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Sat, 4 Apr 2020 16:07:01 -0400 Subject: [PATCH] Work on subscription cleanup --- .../ca/uhn/fhir/jpa/demo/JpaServerDemo.java | 5 +- .../ca/uhn/fhir/jpa/api/config/DaoConfig.java | 24 +----- .../subscription/SubscriptionTestUtil.java | 12 +-- .../fhir/jpa/model/entity/ModelConfig.java | 22 ----- hapi-fhir-jpaserver-subscription/pom.xml | 5 ++ .../channel/queue/IQueueChannelFactory.java | 5 -- .../LinkedBlockingQueueChannelFactory.java | 23 ++--- .../SubscriptionChannelFactory.java | 46 +++++++++- .../SubscriptionChannelRegistry.java | 6 -- .../model/config/SubscriptionModelConfig.java | 44 ++++++++++ .../config/SubscriptionProcessorConfig.java | 22 +---- .../matching/IResourceModifiedConsumer.java | 12 +++ .../SubscriptionStrategyEvaluator.java | 7 ++ .../SubscriptionActivatingSubscriber.java | 72 ++-------------- .../SubscriptionMatchingSubscriber.java | 7 ++ .../process/registry/SubscriptionLoader.java | 19 ++--- .../config/SubscriptionSubmitterConfig.java | 19 ++++- .../SubscriptionMatcherInterceptor.java | 28 +++--- ... SubscriptionSubmitInterceptorLoader.java} | 36 ++------ .../SubscriptionValidatingInterceptor.java | 17 ++-- .../SubscriptionChannelRegistryTest.java | 2 - ...bscriptionSubmitInterceptorLoaderTest.java | 85 +++++++++++++++++++ .../ca/uhn/fhirtest/TestRestfulServer.java | 8 +- 23 files changed, 293 insertions(+), 233 deletions(-) create mode 100644 hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java rename hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/{SubmitInterceptorLoader.java => SubscriptionSubmitInterceptorLoader.java} (61%) create mode 100644 hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java diff --git a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java index 98956b3ba1e..fd471bd9141 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java +++ b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java @@ -36,7 +36,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3; import ca.uhn.fhir.jpa.provider.r4.JpaConformanceProviderR4; import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; -import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader; +import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader; import ca.uhn.fhir.jpa.util.ResourceProviderFactory; import ca.uhn.fhir.model.dstu2.composite.MetaDt; import ca.uhn.fhir.model.dstu2.resource.Bundle; @@ -173,9 +173,6 @@ public class JpaServerDemo extends RestfulServer { daoConfig.setEnforceReferentialIntegrityOnWrite(!ContextHolder.isDisableReferentialIntegrity()); daoConfig.setReuseCachedSearchResultsForMillis(ContextHolder.getReuseCachedSearchResultsForMillis()); - SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class); - submitInterceptorLoader.registerInterceptors(); - DaoRegistry daoRegistry = myAppCtx.getBean(DaoRegistry.class); IInterceptorBroadcaster interceptorBroadcaster = myAppCtx.getBean(IInterceptorBroadcaster.class); CascadingDeleteInterceptor cascadingDeleteInterceptor = new CascadingDeleteInterceptor(daoRegistry, interceptorBroadcaster); diff --git a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java index ae9f47c6c3e..d5c58153126 100644 --- a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java +++ b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java @@ -1559,28 +1559,6 @@ public class DaoConfig { myEnableInMemorySubscriptionMatching = theEnableInMemorySubscriptionMatching; } - /** - * If set to true (default is true) the server will match incoming resources against active subscriptions - * and send them to the subscription channel. If set to false no matching or sending occurs. - * - * @since 3.7.0 - */ - - public boolean isSubscriptionMatchingEnabled() { - return myModelConfig.isSubscriptionMatchingEnabled(); - } - - /** - * If set to true (default is true) the server will match incoming resources against active subscriptions - * and send them to the subscription channel. If set to false no matching or sending occurs. - * - * @since 3.7.0 - */ - - public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) { - myModelConfig.setSubscriptionMatchingEnabled(theSubscriptionMatchingEnabled); - } - public ModelConfig getModelConfig() { return myModelConfig; } @@ -1703,6 +1681,8 @@ public class DaoConfig { /** * This setting indicates which subscription channel types are supported by the server. Any subscriptions submitted * to the server matching these types will be activated. + * + * @see #addSupportedSubscriptionType(Subscription.SubscriptionChannelType) */ public Set getSupportedSubscriptionTypes() { return myModelConfig.getSupportedSubscriptionTypes(); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java index 21deca15563..9dbf6fa3492 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java @@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel; -import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader; +import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader; import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; @@ -22,7 +22,7 @@ public class SubscriptionTestUtil { @Autowired private DaoConfig myDaoConfig; @Autowired - private SubmitInterceptorLoader mySubmitInterceptorLoader; + private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader; @Autowired private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; @Autowired @@ -50,22 +50,22 @@ public class SubscriptionTestUtil { public void registerEmailInterceptor() { myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.EMAIL); - mySubmitInterceptorLoader.registerInterceptors(); + mySubscriptionSubmitInterceptorLoader.start(); } public void registerRestHookInterceptor() { myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK); - mySubmitInterceptorLoader.registerInterceptors(); + mySubscriptionSubmitInterceptorLoader.start(); } public void registerWebSocketInterceptor() { myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET); - mySubmitInterceptorLoader.registerInterceptors(); + mySubscriptionSubmitInterceptorLoader.start(); } public void unregisterSubscriptionInterceptor() { myDaoConfig.clearSupportedSubscriptionTypesForUnitTest(); - mySubmitInterceptorLoader.unregisterInterceptorsForUnitTest(); + mySubscriptionSubmitInterceptorLoader.unregisterInterceptorsForUnitTest(); } public int getExecutorQueueSizeForUnitTests() { diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java index 171c6d31866..4448fc36955 100644 --- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java +++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java @@ -58,7 +58,6 @@ public class ModelConfig { private boolean myDefaultSearchParamsCanBeOverridden = false; private Set mySupportedSubscriptionTypes = new HashSet<>(); private String myEmailFromAddress = "noreply@unknown.com"; - private boolean mySubscriptionMatchingEnabled = true; private String myWebsocketContextPath = DEFAULT_WEBSOCKET_CONTEXT_PATH; /** @@ -330,27 +329,6 @@ public class ModelConfig { return Collections.unmodifiableSet(mySupportedSubscriptionTypes); } - /** - * If set to true (default is true) the server will match incoming resources against active subscriptions - * and send them to the subscription channel. If set to false no matching or sending occurs. - * @since 3.7.0 - */ - - public boolean isSubscriptionMatchingEnabled() { - return mySubscriptionMatchingEnabled; - } - - /** - * If set to true (default is true) the server will match incoming resources against active subscriptions - * and send them to the subscription channel. If set to false no matching or sending occurs. - * @since 3.7.0 - */ - - - public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) { - mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled; - } - @VisibleForTesting public void clearSupportedSubscriptionTypesForUnitTest() { mySupportedSubscriptionTypes.clear(); diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml index 43cd8b96d32..d99b38a38c4 100644 --- a/hapi-fhir-jpaserver-subscription/pom.xml +++ b/hapi-fhir-jpaserver-subscription/pom.xml @@ -78,6 +78,11 @@ + + ca.uhn.hapi.fhir + hapi-fhir-test-utilities + ${project.version} + org.springframework spring-test diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java index e764be57fe4..92321b7f44b 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java @@ -50,9 +50,4 @@ public interface IQueueChannelFactory { */ MessageChannel getOrCreateSender(String theChannelName, Class theMessageType, int theConcurrentConsumers); - // FIXME: can these be removed? - int getDeliveryChannelConcurrentConsumers(); - - // FIXME: can these be removed? - int getMatchingChannelConcurrentConsumers(); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java index d75809d765a..07c2f0f710a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java @@ -33,6 +33,13 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory { private Map myChannels = Collections.synchronizedMap(new HashMap<>()); + /** + * Constructor + */ + public LinkedBlockingQueueChannelFactory() { + super(); + } + @Override public SubscribableChannel getOrCreateReceiver(String theChannelName, Class theMessageType, int theConcurrentConsumers) { return getOrCreateChannel(theChannelName, theConcurrentConsumers); @@ -44,17 +51,11 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory { } private SubscribableChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) { - return myChannels.computeIfAbsent(theChannelName, t -> - new LinkedBlockingQueueChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers)); + return myChannels.computeIfAbsent(theChannelName, t -> { + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE); + String threadNamingPattern = theChannelName + "-%d"; + return new LinkedBlockingQueueChannel(queue, threadNamingPattern, theConcurrentConsumers); + }); } - @Override - public int getDeliveryChannelConcurrentConsumers() { - return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS; - } - - @Override - public int getMatchingChannelConcurrentConsumers() { - return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS; - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index 03b5704d650..70da786f8a4 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -20,12 +20,17 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription; * #L% */ -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.AbstractSubscribableChannel; public class SubscriptionChannelFactory { @@ -33,15 +38,48 @@ public class SubscriptionChannelFactory { private IQueueChannelFactory mySubscribableChannelFactory; public SubscribableChannel newDeliveryChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); + SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers()); + return new BroadcastingSubscribableChannelWrapper(channel); } public MessageChannel newMatchingSendingChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); + return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); } public SubscribableChannel newMatchingReceivingChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); + SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers()); + return new BroadcastingSubscribableChannelWrapper(channel); } + public int getDeliveryChannelConcurrentConsumers() { + return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS; + } + + public int getMatchingChannelConcurrentConsumers() { + return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS; + } + + + private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler { + + public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) { + theChannel.subscribe(this); + } + + + @Override + protected boolean sendInternal(Message theMessage, long timeout) { + for (MessageHandler next : getSubscribers()) { + next.handleMessage(theMessage); + } + return true; + } + + @Override + public void handleMessage(Message message) throws MessagingException { + send(message); + } + } + + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java index 4a1e782cdee..1661d132573 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java @@ -51,9 +51,6 @@ public class SubscriptionChannelRegistry { private ModelConfig myModelConfig; public synchronized void add(ActiveSubscription theActiveSubscription) { - if (!myModelConfig.isSubscriptionMatchingEnabled()) { - return; - } String channelName = theActiveSubscription.getChannelName(); ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); @@ -75,9 +72,6 @@ public class SubscriptionChannelRegistry { } public synchronized void remove(ActiveSubscription theActiveSubscription) { - if (!myModelConfig.isSubscriptionMatchingEnabled()) { - return; - } String channelName = theActiveSubscription.getChannelName(); ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName); boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java new file mode 100644 index 00000000000..2812747cf41 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java @@ -0,0 +1,44 @@ +package ca.uhn.fhir.jpa.subscription.model.config; + +/*- + * #%L + * HAPI FHIR Subscription Server + * %% + * Copyright (C) 2014 - 2020 University Health Network + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator; +import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SubscriptionModelConfig { + + @Bean + public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) { + return new SubscriptionCanonicalizer(theFhirContext); + } + + + @Bean + public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() { + return new SubscriptionStrategyEvaluator(); + } + + +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java index 88151a43de1..1f7eed79122 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java @@ -20,10 +20,10 @@ package ca.uhn.fhir.jpa.subscription.process.config; * #L% */ -import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory; +import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig; import ca.uhn.fhir.jpa.subscription.process.deliver.DaoResourceRetriever; import ca.uhn.fhir.jpa.subscription.process.deliver.IResourceRetriever; import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender; @@ -34,19 +34,18 @@ import ca.uhn.fhir.jpa.subscription.process.matcher.matching.CompositeInMemoryDa import ca.uhn.fhir.jpa.subscription.process.matcher.matching.DaoSubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher; import ca.uhn.fhir.jpa.subscription.process.matcher.matching.InMemorySubscriptionMatcher; -import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.MatchingQueueSubscriberLoader; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionRegisteringSubscriber; import ca.uhn.fhir.jpa.subscription.process.registry.DaoSubscriptionProvider; import ca.uhn.fhir.jpa.subscription.process.registry.ISubscriptionProvider; -import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Scope; @@ -55,6 +54,7 @@ import org.springframework.context.annotation.Scope; * This Spring config should be imported by a system that pulls messages off of the * matching queue for processing, and handles delivery */ +@Import(SubscriptionModelConfig.class) public class SubscriptionProcessorConfig { @Bean @@ -102,16 +102,6 @@ public class SubscriptionProcessorConfig { return new WebsocketConnectionValidator(); } - @Bean - public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() { - return new SubscriptionStrategyEvaluator(); - } - - @Bean - public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) { - return new SubscriptionCanonicalizer(theFhirContext); - } - @Bean public SubscriptionLoader subscriptionLoader() { return new SubscriptionLoader(); @@ -127,12 +117,6 @@ public class SubscriptionProcessorConfig { return new SubscriptionDeliveryHandlerFactory(); } - @Bean - @Lazy - public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() { - return new SubscriptionTriggeringSvcImpl(); - } - @Bean @Scope("prototype") public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java index 577dbc4e2fc..d03b0054ad7 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java @@ -21,7 +21,19 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.matching; */ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import org.hl7.fhir.instance.model.api.IBaseResource; public interface IResourceModifiedConsumer { + + /** + * This is an internal API - Use with caution! + */ + void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest); + + /** + * This is an internal API - Use with caution! + */ void submitResourceModified(ResourceModifiedMessage theMsg); + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java index 04239f38839..e0ac8dbcdbc 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java @@ -29,6 +29,13 @@ public class SubscriptionStrategyEvaluator { @Autowired private InMemoryResourceMatcher myInMemoryResourceMatcher; + /** + * Constructor + */ + public SubscriptionStrategyEvaluator() { + super(); + } + public SubscriptionMatchingStrategy determineStrategy(String theCriteria) { InMemoryMatchResult result = myInMemoryResourceMatcher.match(theCriteria, null, null); if (result.supported()) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java index ddc167ee6cd..8f35ce3b196 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -24,43 +24,29 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; -import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy; -import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; -import ca.uhn.fhir.parser.DataFormatException; -import ca.uhn.fhir.rest.api.EncodingEnum; -import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.SubscriptionUtil; -import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Nonnull; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static org.apache.commons.lang3.StringUtils.isBlank; /** * Responsible for transitioning subscription resources from REQUESTED to ACTIVE @@ -69,14 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank; * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. */ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler { - private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest; private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); @Autowired private PlatformTransactionManager myTransactionManager; - // FIXME: use constant if this is still needed - @Autowired - @Qualifier("hapiJpaTaskExecutor") - private AsyncTaskExecutor myTaskExecutor; @Autowired private SubscriptionRegistry mySubscriptionRegistry; @Autowired @@ -133,43 +114,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { - if (TransactionSynchronizationManager.isSynchronizationActive()) { - /* - * If we're in a transaction, we don't want to try and change the status from - * requested to active within the same transaction because it's too late by - * the time we get here to make modifications to the payload. - * - * So, we register a synchronization, meaning that when the transaction is - * finished, we'll schedule a task to do this in a separate worker thread - * to avoid any possibility of conflict. - */ - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { - @Override - public void afterCommit() { - Future activationFuture = myTaskExecutor.submit(new Runnable() { - @Override - public void run() { - activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS); - } - }); - - /* - * If we're running in a unit test, it's nice to be predictable in - * terms of order... In the real world it's a recipe for deadlocks - */ - if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) { - try { - activationFuture.get(5, TimeUnit.SECONDS); - } catch (Exception e) { - ourLog.error("Failed to activate subscription", e); - } - } - } - }); - return true; - } else { - return activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS); - } + return activateSubscription(theSubscription); } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) { return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); } else { @@ -179,14 +124,14 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript } @SuppressWarnings("unchecked") - private boolean activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) { + private boolean activateSubscription(final IBaseResource theSubscription) { IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao(); IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement()); subscription.setId(subscription.getIdElement().toVersionless()); - ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus); + ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS); try { - SubscriptionUtil.setStatus(myFhirContext, subscription, theActiveStatus); + SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); subscriptionDao.update(subscription); return true; } catch (final UnprocessableEntityException e) { @@ -214,9 +159,4 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript }); } - @VisibleForTesting - public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) { - ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest; - } - } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java index 2f8ff1632a8..441ff31b8d2 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java @@ -67,6 +67,13 @@ public class SubscriptionMatchingSubscriber implements MessageHandler { @Autowired private SubscriptionChannelRegistry mySubscriptionChannelRegistry; + /** + * Constructor + */ + public SubscriptionMatchingSubscriber() { + super(); + } + @Override public void handleMessage(@Nonnull Message theMessage) throws MessagingException { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java index 59e66b77696..90757e100f0 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java @@ -59,6 +59,13 @@ public class SubscriptionLoader { @Autowired private ISchedulerService mySchedulerService; + /** + * Constructor + */ + public SubscriptionLoader() { + super(); + } + /** * Read the existing subscriptions from the database */ @@ -76,18 +83,14 @@ public class SubscriptionLoader { } } - @VisibleForTesting - void acquireSemaphoreForUnitTest() throws InterruptedException { - mySyncSubscriptionsSemaphore.acquire(); - } - - @PostConstruct public void scheduleJob() { ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); jobDetail.setId(getClass().getName()); jobDetail.setJobClass(Job.class); mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail); + + syncSubscriptions(); } public static class Job implements HapiJob { @@ -158,9 +161,5 @@ public class SubscriptionLoader { } } - @VisibleForTesting - public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { - mySubscriptionProvider = theSubscriptionProvider; - } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java index edbb96c5296..b0eb39b0213 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java @@ -20,17 +20,23 @@ package ca.uhn.fhir.jpa.subscription.submit.config; * #L% */ +import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor; -import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader; +import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader; import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor; +import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; +import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Lazy; /** * This Spring config should be imported by a system that submits resources to the * matching queue for processing */ @Configuration +@Import(SubscriptionModelConfig.class) public class SubscriptionSubmitterConfig { @Bean @@ -44,8 +50,15 @@ public class SubscriptionSubmitterConfig { } @Bean - public SubmitInterceptorLoader subscriptionMatcherInterceptorLoader() { - return new SubmitInterceptorLoader(); + public SubscriptionSubmitInterceptorLoader subscriptionMatcherInterceptorLoader() { + return new SubscriptionSubmitInterceptorLoader(); } + @Bean + @Lazy + public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() { + return new SubscriptionTriggeringSvcImpl(); + } + + } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java index d5c2371063b..0877bc1f4c5 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java @@ -52,8 +52,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer @Autowired private FhirContext myFhirContext; @Autowired - private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; - @Autowired private IInterceptorBroadcaster myInterceptorBroadcaster; @Autowired private SubscriptionChannelFactory mySubscriptionChannelFactory; @@ -87,7 +85,11 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest); } - private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) { + /** + * This is an internal API - Use with caution! + */ + @Override + public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) { ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType); // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED @@ -101,16 +103,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer submitResourceModified(msg); } - protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) { - ourLog.trace("Sending resource modified message to processing channel"); - Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it."); - myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage)); - } - - public void setFhirContext(FhirContext theCtx) { - myFhirContext = theCtx; - } - /** * This is an internal API - Use with caution! */ @@ -138,6 +130,16 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer } } + protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) { + ourLog.trace("Sending resource modified message to processing channel"); + Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it."); + myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage)); + } + + public void setFhirContext(FhirContext theCtx) { + myFhirContext = theCtx; + } + @VisibleForTesting public LinkedBlockingQueueChannel getProcessingChannelForUnitTest() { return (LinkedBlockingQueueChannel) myMatchingChannel; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java similarity index 61% rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java index c6715a95232..a827abc7462 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java @@ -22,10 +22,6 @@ package ca.uhn.fhir.jpa.subscription.submit.interceptor; import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.jpa.api.config.DaoConfig; -import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber; -import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader; -import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; -import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; import com.google.common.annotations.VisibleForTesting; import org.hl7.fhir.dstu2.model.Subscription; import org.slf4j.Logger; @@ -33,52 +29,38 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; +import javax.annotation.PostConstruct; import java.util.Set; -public class SubmitInterceptorLoader { - private static final Logger ourLog = LoggerFactory.getLogger(SubmitInterceptorLoader.class); +public class SubscriptionSubmitInterceptorLoader { + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionSubmitInterceptorLoader.class); @Autowired private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; @Autowired private SubscriptionValidatingInterceptor mySubscriptionValidatingInterceptor; @Autowired - DaoConfig myDaoConfig; - @Autowired - private SubscriptionRegistry mySubscriptionRegistry; - @Autowired - private SubscriptionChannelRegistry mySubscriptionChannelRegistry; + private DaoConfig myDaoConfig; @Autowired private ApplicationContext myApplicationContext; @Autowired private IInterceptorService myInterceptorRegistry; - public void registerInterceptors() { + @PostConstruct + public void start() { Set supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes(); if (supportedSubscriptionTypes.isEmpty()) { ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions."); } else { - loadSubscriptions(); - if (myDaoConfig.isSubscriptionMatchingEnabled()) { - mySubscriptionMatcherInterceptor.start(); - ourLog.info("Registering subscription matcher interceptor"); - myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor); - } + mySubscriptionMatcherInterceptor.start(); + ourLog.info("Registering subscription matcher interceptor"); + myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor); } myInterceptorRegistry.registerInterceptor(mySubscriptionValidatingInterceptor); } - private void loadSubscriptions() { - ourLog.info("Loading subscriptions into the SubscriptionRegistry..."); - // Load active subscriptions into the SubscriptionRegistry and activate their channels - SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class); - loader.syncSubscriptions(); - ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size()); - ourLog.info("...{} subscription channels started", mySubscriptionChannelRegistry.size()); - } - @VisibleForTesting public void unregisterInterceptorsForUnitTest() { myInterceptorRegistry.unregisterInterceptor(mySubscriptionMatcherInterceptor); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java index 237d22a38e6..4cf26006cbe 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java @@ -108,12 +108,7 @@ public class SubscriptionValidatingInterceptor { throw new UnprocessableEntityException("Subscription.criteria must be in the form \"{Resource Type}?[params]\""); } - if (subscription.getChannelType() == null) { - throw new UnprocessableEntityException("Subscription.channel.type must be populated"); - } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { - validateChannelPayload(subscription); - validateChannelEndpoint(subscription); - } + validateChannelType(subscription); if (!myDaoRegistry.isResourceTypeSupported(resType)) { throw new UnprocessableEntityException("Subscription.criteria contains invalid/unsupported resource type: " + resType); @@ -133,6 +128,16 @@ public class SubscriptionValidatingInterceptor { } } + @SuppressWarnings("WeakerAccess") + protected void validateChannelType(CanonicalSubscription theSubscription) { + if (theSubscription.getChannelType() == null) { + throw new UnprocessableEntityException("Subscription.channel.type must be populated"); + } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { + validateChannelPayload(theSubscription); + validateChannelEndpoint(theSubscription); + } + } + @SuppressWarnings("WeakerAccess") protected void validateChannelEndpoint(CanonicalSubscription theResource) { if (isBlank(theResource.getEndpointUrl())) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java index 0fb9edac08d..d16ad8f0a88 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java @@ -42,8 +42,6 @@ public class SubscriptionChannelRegistryTest { @Test public void testAddAddRemoveRemove() { - when(myModelConfig.isSubscriptionMatchingEnabled()).thenReturn(true); - CanonicalSubscription cansubA = new CanonicalSubscription(); cansubA.setIdElement(new IdDt("A")); ActiveSubscription activeSubscriptionA = new ActiveSubscription(cansubA, TEST_CHANNEL_NAME); diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java new file mode 100644 index 00000000000..de8f1910397 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java @@ -0,0 +1,85 @@ +package ca.uhn.fhir.jpa.subscription.submit.interceptor; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.support.IValidationSupport; +import ca.uhn.fhir.interceptor.api.IInterceptorService; +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.model.entity.ModelConfig; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; +import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; +import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig; +import org.hl7.fhir.dstu2.model.Subscription; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = { + SubscriptionSubmitterConfig.class, + SearchParamConfig.class, + SubscriptionSubmitInterceptorLoaderTest.MyConfig.class +}) +public class SubscriptionSubmitInterceptorLoaderTest { + + @MockBean + private ISearchParamProvider mySearchParamProvider; + @MockBean + private ISchedulerService mySchedulerService; + @MockBean + private IInterceptorService myInterceptorService; + @MockBean + private IValidationSupport myValidationSupport; + @MockBean + private SubscriptionChannelFactory mySubscriptionChannelFactory; + @MockBean + private DaoRegistry myDaoRegistry; + @Autowired + private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader; + @Autowired + private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; + + /** + * It should be possible to run only the {@link SubscriptionSubmitterConfig} without the + * {@link ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig} + */ + @Test + public void testLoaderCanRunWithoutProcessorConfigLoaded() { + verify(myInterceptorService, times(1)).registerInterceptor(eq(mySubscriptionMatcherInterceptor)); + } + + @Configuration + public static class MyConfig { + + @Bean + public FhirContext fhirContext() { + return FhirContext.forR4(); + } + + @Bean + public ModelConfig modelConfig() { + return new ModelConfig(); + } + + @Bean + public DaoConfig daoConfig() { + DaoConfig daoConfig = new DaoConfig(); + daoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK); + return daoConfig; + } + + } + + +} diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java index 68ae52c05cf..52d8ffaa687 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java @@ -20,7 +20,7 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5; import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry; -import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader; +import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader; import ca.uhn.fhir.jpa.util.ResourceProviderFactory; import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator; import ca.uhn.fhir.rest.api.EncodingEnum; @@ -252,12 +252,6 @@ public class TestRestfulServer extends RestfulServer { */ setPagingProvider(myAppCtx.getBean(DatabaseBackedPagingProvider.class)); - /* - * Register subscription interceptors - */ - SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class); - submitInterceptorLoader.registerInterceptors(); - /* * Cascading deletes */