diff --git a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java
index 98956b3ba1e..fd471bd9141 100644
--- a/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java
+++ b/hapi-fhir-cli/hapi-fhir-cli-jpaserver/src/main/java/ca/uhn/fhir/jpa/demo/JpaServerDemo.java
@@ -36,7 +36,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.provider.r4.JpaConformanceProviderR4;
import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
-import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
+import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.model.dstu2.composite.MetaDt;
import ca.uhn.fhir.model.dstu2.resource.Bundle;
@@ -173,9 +173,6 @@ public class JpaServerDemo extends RestfulServer {
daoConfig.setEnforceReferentialIntegrityOnWrite(!ContextHolder.isDisableReferentialIntegrity());
daoConfig.setReuseCachedSearchResultsForMillis(ContextHolder.getReuseCachedSearchResultsForMillis());
- SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class);
- submitInterceptorLoader.registerInterceptors();
-
DaoRegistry daoRegistry = myAppCtx.getBean(DaoRegistry.class);
IInterceptorBroadcaster interceptorBroadcaster = myAppCtx.getBean(IInterceptorBroadcaster.class);
CascadingDeleteInterceptor cascadingDeleteInterceptor = new CascadingDeleteInterceptor(daoRegistry, interceptorBroadcaster);
diff --git a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java
index ae9f47c6c3e..d5c58153126 100644
--- a/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java
+++ b/hapi-fhir-jpaserver-api/src/main/java/ca/uhn/fhir/jpa/api/config/DaoConfig.java
@@ -1559,28 +1559,6 @@ public class DaoConfig {
myEnableInMemorySubscriptionMatching = theEnableInMemorySubscriptionMatching;
}
- /**
- * If set to true
(default is true) the server will match incoming resources against active subscriptions
- * and send them to the subscription channel. If set to false
no matching or sending occurs.
- *
- * @since 3.7.0
- */
-
- public boolean isSubscriptionMatchingEnabled() {
- return myModelConfig.isSubscriptionMatchingEnabled();
- }
-
- /**
- * If set to true
(default is true) the server will match incoming resources against active subscriptions
- * and send them to the subscription channel. If set to false
no matching or sending occurs.
- *
- * @since 3.7.0
- */
-
- public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
- myModelConfig.setSubscriptionMatchingEnabled(theSubscriptionMatchingEnabled);
- }
-
public ModelConfig getModelConfig() {
return myModelConfig;
}
@@ -1703,6 +1681,8 @@ public class DaoConfig {
/**
* This setting indicates which subscription channel types are supported by the server. Any subscriptions submitted
* to the server matching these types will be activated.
+ *
+ * @see #addSupportedSubscriptionType(Subscription.SubscriptionChannelType)
*/
public Set getSupportedSubscriptionTypes() {
return myModelConfig.getSupportedSubscriptionTypes();
diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java
index 21deca15563..9dbf6fa3492 100644
--- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java
+++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTestUtil.java
@@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
-import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
+import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
@@ -22,7 +22,7 @@ public class SubscriptionTestUtil {
@Autowired
private DaoConfig myDaoConfig;
@Autowired
- private SubmitInterceptorLoader mySubmitInterceptorLoader;
+ private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
@@ -50,22 +50,22 @@ public class SubscriptionTestUtil {
public void registerEmailInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.EMAIL);
- mySubmitInterceptorLoader.registerInterceptors();
+ mySubscriptionSubmitInterceptorLoader.start();
}
public void registerRestHookInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK);
- mySubmitInterceptorLoader.registerInterceptors();
+ mySubscriptionSubmitInterceptorLoader.start();
}
public void registerWebSocketInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET);
- mySubmitInterceptorLoader.registerInterceptors();
+ mySubscriptionSubmitInterceptorLoader.start();
}
public void unregisterSubscriptionInterceptor() {
myDaoConfig.clearSupportedSubscriptionTypesForUnitTest();
- mySubmitInterceptorLoader.unregisterInterceptorsForUnitTest();
+ mySubscriptionSubmitInterceptorLoader.unregisterInterceptorsForUnitTest();
}
public int getExecutorQueueSizeForUnitTests() {
diff --git a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java
index 171c6d31866..4448fc36955 100644
--- a/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java
+++ b/hapi-fhir-jpaserver-model/src/main/java/ca/uhn/fhir/jpa/model/entity/ModelConfig.java
@@ -58,7 +58,6 @@ public class ModelConfig {
private boolean myDefaultSearchParamsCanBeOverridden = false;
private Set mySupportedSubscriptionTypes = new HashSet<>();
private String myEmailFromAddress = "noreply@unknown.com";
- private boolean mySubscriptionMatchingEnabled = true;
private String myWebsocketContextPath = DEFAULT_WEBSOCKET_CONTEXT_PATH;
/**
@@ -330,27 +329,6 @@ public class ModelConfig {
return Collections.unmodifiableSet(mySupportedSubscriptionTypes);
}
- /**
- * If set to true
(default is true) the server will match incoming resources against active subscriptions
- * and send them to the subscription channel. If set to false
no matching or sending occurs.
- * @since 3.7.0
- */
-
- public boolean isSubscriptionMatchingEnabled() {
- return mySubscriptionMatchingEnabled;
- }
-
- /**
- * If set to true
(default is true) the server will match incoming resources against active subscriptions
- * and send them to the subscription channel. If set to false
no matching or sending occurs.
- * @since 3.7.0
- */
-
-
- public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
- mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
- }
-
@VisibleForTesting
public void clearSupportedSubscriptionTypesForUnitTest() {
mySupportedSubscriptionTypes.clear();
diff --git a/hapi-fhir-jpaserver-subscription/pom.xml b/hapi-fhir-jpaserver-subscription/pom.xml
index 43cd8b96d32..d99b38a38c4 100644
--- a/hapi-fhir-jpaserver-subscription/pom.xml
+++ b/hapi-fhir-jpaserver-subscription/pom.xml
@@ -78,6 +78,11 @@
+
+ ca.uhn.hapi.fhir
+ hapi-fhir-test-utilities
+ ${project.version}
+
org.springframework
spring-test
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
index e764be57fe4..92321b7f44b 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/IQueueChannelFactory.java
@@ -50,9 +50,4 @@ public interface IQueueChannelFactory {
*/
MessageChannel getOrCreateSender(String theChannelName, Class> theMessageType, int theConcurrentConsumers);
- // FIXME: can these be removed?
- int getDeliveryChannelConcurrentConsumers();
-
- // FIXME: can these be removed?
- int getMatchingChannelConcurrentConsumers();
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
index d75809d765a..07c2f0f710a 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/queue/LinkedBlockingQueueChannelFactory.java
@@ -33,6 +33,13 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
private Map myChannels = Collections.synchronizedMap(new HashMap<>());
+ /**
+ * Constructor
+ */
+ public LinkedBlockingQueueChannelFactory() {
+ super();
+ }
+
@Override
public SubscribableChannel getOrCreateReceiver(String theChannelName, Class> theMessageType, int theConcurrentConsumers) {
return getOrCreateChannel(theChannelName, theConcurrentConsumers);
@@ -44,17 +51,11 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
}
private SubscribableChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
- return myChannels.computeIfAbsent(theChannelName, t ->
- new LinkedBlockingQueueChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers));
+ return myChannels.computeIfAbsent(theChannelName, t -> {
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE);
+ String threadNamingPattern = theChannelName + "-%d";
+ return new LinkedBlockingQueueChannel(queue, threadNamingPattern, theConcurrentConsumers);
+ });
}
- @Override
- public int getDeliveryChannelConcurrentConsumers() {
- return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
- }
-
- @Override
- public int getMatchingChannelConcurrentConsumers() {
- return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
- }
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
index 03b5704d650..70da786f8a4 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java
@@ -20,12 +20,17 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
-import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
+import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
+import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
+import org.springframework.messaging.support.AbstractSubscribableChannel;
public class SubscriptionChannelFactory {
@@ -33,15 +38,48 @@ public class SubscriptionChannelFactory {
private IQueueChannelFactory mySubscribableChannelFactory;
public SubscribableChannel newDeliveryChannel(String theChannelName) {
- return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
+ SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, getDeliveryChannelConcurrentConsumers());
+ return new BroadcastingSubscribableChannelWrapper(channel);
}
public MessageChannel newMatchingSendingChannel(String theChannelName) {
- return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
+ return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers());
}
public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {
- return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
+ SubscribableChannel channel = mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, getMatchingChannelConcurrentConsumers());
+ return new BroadcastingSubscribableChannelWrapper(channel);
}
+ public int getDeliveryChannelConcurrentConsumers() {
+ return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
+ }
+
+ public int getMatchingChannelConcurrentConsumers() {
+ return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
+ }
+
+
+ private static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements MessageHandler {
+
+ public BroadcastingSubscribableChannelWrapper(SubscribableChannel theChannel) {
+ theChannel.subscribe(this);
+ }
+
+
+ @Override
+ protected boolean sendInternal(Message> theMessage, long timeout) {
+ for (MessageHandler next : getSubscribers()) {
+ next.handleMessage(theMessage);
+ }
+ return true;
+ }
+
+ @Override
+ public void handleMessage(Message> message) throws MessagingException {
+ send(message);
+ }
+ }
+
+
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
index 4a1e782cdee..1661d132573 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelRegistry.java
@@ -51,9 +51,6 @@ public class SubscriptionChannelRegistry {
private ModelConfig myModelConfig;
public synchronized void add(ActiveSubscription theActiveSubscription) {
- if (!myModelConfig.isSubscriptionMatchingEnabled()) {
- return;
- }
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
@@ -75,9 +72,6 @@ public class SubscriptionChannelRegistry {
}
public synchronized void remove(ActiveSubscription theActiveSubscription) {
- if (!myModelConfig.isSubscriptionMatchingEnabled()) {
- return;
- }
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java
new file mode 100644
index 00000000000..2812747cf41
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/model/config/SubscriptionModelConfig.java
@@ -0,0 +1,44 @@
+package ca.uhn.fhir.jpa.subscription.model.config;
+
+/*-
+ * #%L
+ * HAPI FHIR Subscription Server
+ * %%
+ * Copyright (C) 2014 - 2020 University Health Network
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import ca.uhn.fhir.context.FhirContext;
+import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
+import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SubscriptionModelConfig {
+
+ @Bean
+ public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
+ return new SubscriptionCanonicalizer(theFhirContext);
+ }
+
+
+ @Bean
+ public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() {
+ return new SubscriptionStrategyEvaluator();
+ }
+
+
+}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
index 88151a43de1..1f7eed79122 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/config/SubscriptionProcessorConfig.java
@@ -20,10 +20,10 @@ package ca.uhn.fhir.jpa.subscription.process.config;
* #L%
*/
-import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
+import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.DaoResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.IResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender;
@@ -34,19 +34,18 @@ import ca.uhn.fhir.jpa.subscription.process.matcher.matching.CompositeInMemoryDa
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.InMemorySubscriptionMatcher;
-import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.MatchingQueueSubscriberLoader;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionRegisteringSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.DaoSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.process.registry.ISubscriptionProvider;
-import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
@@ -55,6 +54,7 @@ import org.springframework.context.annotation.Scope;
* This Spring config should be imported by a system that pulls messages off of the
* matching queue for processing, and handles delivery
*/
+@Import(SubscriptionModelConfig.class)
public class SubscriptionProcessorConfig {
@Bean
@@ -102,16 +102,6 @@ public class SubscriptionProcessorConfig {
return new WebsocketConnectionValidator();
}
- @Bean
- public SubscriptionStrategyEvaluator subscriptionStrategyEvaluator() {
- return new SubscriptionStrategyEvaluator();
- }
-
- @Bean
- public SubscriptionCanonicalizer subscriptionCanonicalizer(FhirContext theFhirContext) {
- return new SubscriptionCanonicalizer(theFhirContext);
- }
-
@Bean
public SubscriptionLoader subscriptionLoader() {
return new SubscriptionLoader();
@@ -127,12 +117,6 @@ public class SubscriptionProcessorConfig {
return new SubscriptionDeliveryHandlerFactory();
}
- @Bean
- @Lazy
- public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() {
- return new SubscriptionTriggeringSvcImpl();
- }
-
@Bean
@Scope("prototype")
public SubscriptionDeliveringRestHookSubscriber subscriptionDeliveringRestHookSubscriber() {
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java
index 577dbc4e2fc..d03b0054ad7 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/IResourceModifiedConsumer.java
@@ -21,7 +21,19 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.matching;
*/
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
+import ca.uhn.fhir.rest.api.server.RequestDetails;
+import org.hl7.fhir.instance.model.api.IBaseResource;
public interface IResourceModifiedConsumer {
+
+ /**
+ * This is an internal API - Use with caution!
+ */
+ void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest);
+
+ /**
+ * This is an internal API - Use with caution!
+ */
void submitResourceModified(ResourceModifiedMessage theMsg);
+
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java
index 04239f38839..e0ac8dbcdbc 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/matching/SubscriptionStrategyEvaluator.java
@@ -29,6 +29,13 @@ public class SubscriptionStrategyEvaluator {
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
+ /**
+ * Constructor
+ */
+ public SubscriptionStrategyEvaluator() {
+ super();
+ }
+
public SubscriptionMatchingStrategy determineStrategy(String theCriteria) {
InMemoryMatchResult result = myInMemoryResourceMatcher.match(theCriteria, null, null);
if (result.supported()) {
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java
index ddc167ee6cd..8f35ce3b196 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java
@@ -24,43 +24,29 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
-import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
+import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
+import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
-import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
-import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
-import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
-import ca.uhn.fhir.parser.DataFormatException;
-import ca.uhn.fhir.rest.api.EncodingEnum;
-import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
-import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* Responsible for transitioning subscription resources from REQUESTED to ACTIVE
@@ -69,14 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
*/
public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
- private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
@Autowired
private PlatformTransactionManager myTransactionManager;
- // FIXME: use constant if this is still needed
- @Autowired
- @Qualifier("hapiJpaTaskExecutor")
- private AsyncTaskExecutor myTaskExecutor;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
@@ -133,43 +114,7 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- /*
- * If we're in a transaction, we don't want to try and change the status from
- * requested to active within the same transaction because it's too late by
- * the time we get here to make modifications to the payload.
- *
- * So, we register a synchronization, meaning that when the transaction is
- * finished, we'll schedule a task to do this in a separate worker thread
- * to avoid any possibility of conflict.
- */
- TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
- @Override
- public void afterCommit() {
- Future> activationFuture = myTaskExecutor.submit(new Runnable() {
- @Override
- public void run() {
- activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
- }
- });
-
- /*
- * If we're running in a unit test, it's nice to be predictable in
- * terms of order... In the real world it's a recipe for deadlocks
- */
- if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
- try {
- activationFuture.get(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- ourLog.error("Failed to activate subscription", e);
- }
- }
- }
- });
- return true;
- } else {
- return activateSubscription(SubscriptionConstants.ACTIVE_STATUS, theSubscription, SubscriptionConstants.REQUESTED_STATUS);
- }
+ return activateSubscription(theSubscription);
} else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) {
return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else {
@@ -179,14 +124,14 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
}
@SuppressWarnings("unchecked")
- private boolean activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
+ private boolean activateSubscription(final IBaseResource theSubscription) {
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement());
subscription.setId(subscription.getIdElement().toVersionless());
- ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus);
+ ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
try {
- SubscriptionUtil.setStatus(myFhirContext, subscription, theActiveStatus);
+ SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
subscriptionDao.update(subscription);
return true;
} catch (final UnprocessableEntityException e) {
@@ -214,9 +159,4 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
});
}
- @VisibleForTesting
- public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
- ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
- }
-
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java
index 2f8ff1632a8..441ff31b8d2 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionMatchingSubscriber.java
@@ -67,6 +67,13 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
@Autowired
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
+ /**
+ * Constructor
+ */
+ public SubscriptionMatchingSubscriber() {
+ super();
+ }
+
@Override
public void handleMessage(@Nonnull Message> theMessage) throws MessagingException {
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java
index 59e66b77696..90757e100f0 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionLoader.java
@@ -59,6 +59,13 @@ public class SubscriptionLoader {
@Autowired
private ISchedulerService mySchedulerService;
+ /**
+ * Constructor
+ */
+ public SubscriptionLoader() {
+ super();
+ }
+
/**
* Read the existing subscriptions from the database
*/
@@ -76,18 +83,14 @@ public class SubscriptionLoader {
}
}
- @VisibleForTesting
- void acquireSemaphoreForUnitTest() throws InterruptedException {
- mySyncSubscriptionsSemaphore.acquire();
- }
-
-
@PostConstruct
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(Job.class);
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail);
+
+ syncSubscriptions();
}
public static class Job implements HapiJob {
@@ -158,9 +161,5 @@ public class SubscriptionLoader {
}
}
- @VisibleForTesting
- public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
- mySubscriptionProvider = theSubscriptionProvider;
- }
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java
index edbb96c5296..b0eb39b0213 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/config/SubscriptionSubmitterConfig.java
@@ -20,17 +20,23 @@ package ca.uhn.fhir.jpa.subscription.submit.config;
* #L%
*/
+import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
-import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
+import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
+import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
+import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Lazy;
/**
* This Spring config should be imported by a system that submits resources to the
* matching queue for processing
*/
@Configuration
+@Import(SubscriptionModelConfig.class)
public class SubscriptionSubmitterConfig {
@Bean
@@ -44,8 +50,15 @@ public class SubscriptionSubmitterConfig {
}
@Bean
- public SubmitInterceptorLoader subscriptionMatcherInterceptorLoader() {
- return new SubmitInterceptorLoader();
+ public SubscriptionSubmitInterceptorLoader subscriptionMatcherInterceptorLoader() {
+ return new SubscriptionSubmitInterceptorLoader();
}
+ @Bean
+ @Lazy
+ public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() {
+ return new SubscriptionTriggeringSvcImpl();
+ }
+
+
}
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java
index d5c2371063b..0877bc1f4c5 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java
@@ -52,8 +52,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@Autowired
private FhirContext myFhirContext;
@Autowired
- private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
- @Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@@ -87,7 +85,11 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
}
- private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
+ /**
+ * This is an internal API - Use with caution!
+ */
+ @Override
+ public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
@@ -101,16 +103,6 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
submitResourceModified(msg);
}
- protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
- ourLog.trace("Sending resource modified message to processing channel");
- Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
- myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
- }
-
- public void setFhirContext(FhirContext theCtx) {
- myFhirContext = theCtx;
- }
-
/**
* This is an internal API - Use with caution!
*/
@@ -138,6 +130,16 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
}
}
+ protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
+ ourLog.trace("Sending resource modified message to processing channel");
+ Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
+ myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
+ }
+
+ public void setFhirContext(FhirContext theCtx) {
+ myFhirContext = theCtx;
+ }
+
@VisibleForTesting
public LinkedBlockingQueueChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingQueueChannel) myMatchingChannel;
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java
similarity index 61%
rename from hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java
rename to hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java
index c6715a95232..a827abc7462 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubmitInterceptorLoader.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoader.java
@@ -22,10 +22,6 @@ package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
-import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
-import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
-import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
-import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.hl7.fhir.dstu2.model.Subscription;
import org.slf4j.Logger;
@@ -33,52 +29,38 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
+import javax.annotation.PostConstruct;
import java.util.Set;
-public class SubmitInterceptorLoader {
- private static final Logger ourLog = LoggerFactory.getLogger(SubmitInterceptorLoader.class);
+public class SubscriptionSubmitInterceptorLoader {
+ private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionSubmitInterceptorLoader.class);
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
private SubscriptionValidatingInterceptor mySubscriptionValidatingInterceptor;
@Autowired
- DaoConfig myDaoConfig;
- @Autowired
- private SubscriptionRegistry mySubscriptionRegistry;
- @Autowired
- private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
+ private DaoConfig myDaoConfig;
@Autowired
private ApplicationContext myApplicationContext;
@Autowired
private IInterceptorService myInterceptorRegistry;
- public void registerInterceptors() {
+ @PostConstruct
+ public void start() {
Set supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();
if (supportedSubscriptionTypes.isEmpty()) {
ourLog.info("Subscriptions are disabled on this server. Subscriptions will not be activated and incoming resources will not be matched against subscriptions.");
} else {
- loadSubscriptions();
- if (myDaoConfig.isSubscriptionMatchingEnabled()) {
- mySubscriptionMatcherInterceptor.start();
- ourLog.info("Registering subscription matcher interceptor");
- myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor);
- }
+ mySubscriptionMatcherInterceptor.start();
+ ourLog.info("Registering subscription matcher interceptor");
+ myInterceptorRegistry.registerInterceptor(mySubscriptionMatcherInterceptor);
}
myInterceptorRegistry.registerInterceptor(mySubscriptionValidatingInterceptor);
}
- private void loadSubscriptions() {
- ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
- // Load active subscriptions into the SubscriptionRegistry and activate their channels
- SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class);
- loader.syncSubscriptions();
- ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
- ourLog.info("...{} subscription channels started", mySubscriptionChannelRegistry.size());
- }
-
@VisibleForTesting
public void unregisterInterceptorsForUnitTest() {
myInterceptorRegistry.unregisterInterceptor(mySubscriptionMatcherInterceptor);
diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java
index 237d22a38e6..4cf26006cbe 100644
--- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java
+++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionValidatingInterceptor.java
@@ -108,12 +108,7 @@ public class SubscriptionValidatingInterceptor {
throw new UnprocessableEntityException("Subscription.criteria must be in the form \"{Resource Type}?[params]\"");
}
- if (subscription.getChannelType() == null) {
- throw new UnprocessableEntityException("Subscription.channel.type must be populated");
- } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
- validateChannelPayload(subscription);
- validateChannelEndpoint(subscription);
- }
+ validateChannelType(subscription);
if (!myDaoRegistry.isResourceTypeSupported(resType)) {
throw new UnprocessableEntityException("Subscription.criteria contains invalid/unsupported resource type: " + resType);
@@ -133,6 +128,16 @@ public class SubscriptionValidatingInterceptor {
}
}
+ @SuppressWarnings("WeakerAccess")
+ protected void validateChannelType(CanonicalSubscription theSubscription) {
+ if (theSubscription.getChannelType() == null) {
+ throw new UnprocessableEntityException("Subscription.channel.type must be populated");
+ } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
+ validateChannelPayload(theSubscription);
+ validateChannelEndpoint(theSubscription);
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
protected void validateChannelEndpoint(CanonicalSubscription theResource) {
if (isBlank(theResource.getEndpointUrl())) {
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java
index 0fb9edac08d..d16ad8f0a88 100644
--- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistryTest.java
@@ -42,8 +42,6 @@ public class SubscriptionChannelRegistryTest {
@Test
public void testAddAddRemoveRemove() {
- when(myModelConfig.isSubscriptionMatchingEnabled()).thenReturn(true);
-
CanonicalSubscription cansubA = new CanonicalSubscription();
cansubA.setIdElement(new IdDt("A"));
ActiveSubscription activeSubscriptionA = new ActiveSubscription(cansubA, TEST_CHANNEL_NAME);
diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java
new file mode 100644
index 00000000000..de8f1910397
--- /dev/null
+++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionSubmitInterceptorLoaderTest.java
@@ -0,0 +1,85 @@
+package ca.uhn.fhir.jpa.subscription.submit.interceptor;
+
+import ca.uhn.fhir.context.FhirContext;
+import ca.uhn.fhir.context.support.IValidationSupport;
+import ca.uhn.fhir.interceptor.api.IInterceptorService;
+import ca.uhn.fhir.jpa.api.config.DaoConfig;
+import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
+import ca.uhn.fhir.jpa.model.entity.ModelConfig;
+import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
+import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
+import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
+import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
+import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
+import org.hl7.fhir.dstu2.model.Subscription;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes = {
+ SubscriptionSubmitterConfig.class,
+ SearchParamConfig.class,
+ SubscriptionSubmitInterceptorLoaderTest.MyConfig.class
+})
+public class SubscriptionSubmitInterceptorLoaderTest {
+
+ @MockBean
+ private ISearchParamProvider mySearchParamProvider;
+ @MockBean
+ private ISchedulerService mySchedulerService;
+ @MockBean
+ private IInterceptorService myInterceptorService;
+ @MockBean
+ private IValidationSupport myValidationSupport;
+ @MockBean
+ private SubscriptionChannelFactory mySubscriptionChannelFactory;
+ @MockBean
+ private DaoRegistry myDaoRegistry;
+ @Autowired
+ private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
+ @Autowired
+ private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
+
+ /**
+ * It should be possible to run only the {@link SubscriptionSubmitterConfig} without the
+ * {@link ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig}
+ */
+ @Test
+ public void testLoaderCanRunWithoutProcessorConfigLoaded() {
+ verify(myInterceptorService, times(1)).registerInterceptor(eq(mySubscriptionMatcherInterceptor));
+ }
+
+ @Configuration
+ public static class MyConfig {
+
+ @Bean
+ public FhirContext fhirContext() {
+ return FhirContext.forR4();
+ }
+
+ @Bean
+ public ModelConfig modelConfig() {
+ return new ModelConfig();
+ }
+
+ @Bean
+ public DaoConfig daoConfig() {
+ DaoConfig daoConfig = new DaoConfig();
+ daoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.RESTHOOK);
+ return daoConfig;
+ }
+
+ }
+
+
+}
diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
index 68ae52c05cf..52d8ffaa687 100644
--- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
+++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/TestRestfulServer.java
@@ -20,7 +20,7 @@ import ca.uhn.fhir.jpa.provider.r5.JpaConformanceProviderR5;
import ca.uhn.fhir.jpa.provider.r5.JpaSystemProviderR5;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
-import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubmitInterceptorLoader;
+import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.rest.api.EncodingEnum;
@@ -252,12 +252,6 @@ public class TestRestfulServer extends RestfulServer {
*/
setPagingProvider(myAppCtx.getBean(DatabaseBackedPagingProvider.class));
- /*
- * Register subscription interceptors
- */
- SubmitInterceptorLoader submitInterceptorLoader = myAppCtx.getBean(SubmitInterceptorLoader.class);
- submitInterceptorLoader.registerInterceptors();
-
/*
* Cascading deletes
*/