Subscription work

This commit is contained in:
jamesagnew 2020-04-06 20:15:41 -04:00
parent b78205c218
commit 826ba6458a
80 changed files with 258 additions and 246 deletions

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.subscription.process.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

View File

@ -1,8 +1,8 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.jpa.search.LuceneSearchMappingFactory;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;

View File

@ -4,9 +4,9 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

View File

@ -24,7 +24,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.jpa.util.ResourceProviderFactory;
import ca.uhn.fhir.model.dstu2.composite.CodeableConceptDt;

View File

@ -36,7 +36,7 @@ import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.term.BaseTermReadSvcImpl;
import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;

View File

@ -32,7 +32,7 @@ import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.term.BaseTermReadSvcImpl;
import ca.uhn.fhir.jpa.term.TermDeferredStorageSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;

View File

@ -1,6 +1,6 @@
package ca.uhn.fhir.jpa.provider;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu2.BaseJpaDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.model.dstu2.resource.Bundle;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
@ -10,7 +10,7 @@ import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.parser.StrictErrorHandler;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.provider.r5;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.dao.r5.BaseJpaR5Test;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
@ -10,7 +10,7 @@ import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.searchparam.registry.SearchParamRegistryImpl;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.parser.StrictErrorHandler;

View File

@ -3,7 +3,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -99,13 +99,13 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingQueueChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}
myCountingInterceptor = new CountingInterceptor();
if (processingChannel != null) {
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
processingChannel.addInterceptor(myCountingInterceptor);
}
}

View File

@ -3,8 +3,8 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.provider.r5.BaseResourceProviderR5Test;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -111,13 +111,13 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingQueueChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}
myCountingInterceptor = new CountingInterceptor();
if (processingChannel != null) {
processingChannel.addInterceptorForUnitTest(myCountingInterceptor);
processingChannel.addInterceptor(myCountingInterceptor);
}
}

View File

@ -1,15 +1,15 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IIdType;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,7 +31,7 @@ public class SubscriptionTestUtil {
private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
public int getExecutorQueueSize() {
LinkedBlockingQueueChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
return channel.getQueueSizeForUnitTest();
}

View File

@ -2,9 +2,8 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import org.hl7.fhir.r4.model.Subscription;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.subscription.email;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.EmailDetails;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.EmailDetails;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.JavaMailEmailSender;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.GreenMailUtil;
import com.icegreen.greenmail.util.ServerSetup;

View File

@ -9,9 +9,9 @@ import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.util.CoordCalculatorTest;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.rest.param.*;

View File

@ -2,7 +2,6 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;

View File

@ -8,7 +8,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.NotificationServlet;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderDstu2Test;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.model.dstu2.composite.CodeableConceptDt;
import ca.uhn.fhir.model.dstu2.composite.CodingDt;
import ca.uhn.fhir.model.dstu2.resource.Observation;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.api;
/*-
* #%L
@ -20,16 +20,24 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
public class QueueChannelConsumerConfig {
public class ChannelConsumerOptions {
private int myConcurrentConsumers;
/**
* Constructor
*/
public ChannelConsumerOptions() {
super();
}
public int getConcurrentConsumers() {
private Integer myConcurrentConsumers;
public Integer getConcurrentConsumers() {
return myConcurrentConsumers;
}
public void setConcurrentConsumers(int theConcurrentConsumers) {
public ChannelConsumerOptions setConcurrentConsumers(int theConcurrentConsumers) {
myConcurrentConsumers = theConcurrentConsumers;
return this;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.api;
/*-
* #%L
@ -24,13 +24,13 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* This interface is the factory for Queue Channels, which are the low level abstraction over a
* queue (e.g. memory queue, JMS queue, Kafka stream, etc.) for any purpose.
*/
public interface IQueueChannelFactory {
public interface IChannelFactory {
/**
* Create a channel that is used to receive messages from the queue.
*
* <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerOptions)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p>
*
@ -38,18 +38,18 @@ public interface IQueueChannelFactory {
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and
* {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
* {@link #getOrCreateSender(String, Class, ChannelConsumerOptions)}
* even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first.
*/
IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig);
IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig);
/**
* Create a channel that is used to send messages to the queue.
*
* <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, QueueChannelConsumerConfig)}
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerOptions)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p>
*
@ -57,11 +57,11 @@ public interface IQueueChannelFactory {
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and
* {@link #getOrCreateSender(String, Class, QueueChannelConsumerConfig)}
* {@link #getOrCreateSender(String, Class, ChannelConsumerOptions)}
* even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first.
*/
IQueueChannelSender getOrCreateSender(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig);
IChannelProducer getOrCreateSender(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig);
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.api;
/*-
* #%L
@ -21,8 +21,7 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
*/
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IQueueChannelSender extends MessageChannel, InterceptableChannel {
public interface IChannelProducer extends MessageChannel, InterceptableChannel {
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.api;
/*-
* #%L
@ -23,5 +23,5 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.InterceptableChannel;
public interface IQueueChannelReceiver extends SubscribableChannel, InterceptableChannel {
public interface IChannelReceiver extends SubscribableChannel, InterceptableChannel {
}

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.subscription.channel.config;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,12 +33,12 @@ public class SubscriptionChannelConfig {
* Create a @Primary @Bean if you need a different implementation
*/
@Bean
public IQueueChannelFactory queueChannelFactory() {
return new LinkedBlockingQueueChannelFactory();
public IChannelFactory queueChannelFactory() {
return new LinkedBlockingChannelFactory();
}
@Bean
public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
public SubscriptionChannelFactory subscriptionChannelFactory(IChannelFactory theQueueChannelFactory) {
return new SubscriptionChannelFactory(theQueueChannelFactory);
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.impl;
/*-
* #%L
@ -20,14 +20,29 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class LinkedBlockingQueueChannel extends ExecutorSubscribableChannel implements IQueueChannelSender, IQueueChannelReceiver {
public class LinkedBlockingChannel extends ExecutorSubscribableChannel implements IChannelProducer, IChannelReceiver {
public LinkedBlockingQueueChannel(ThreadPoolExecutor theExecutor) {
private final BlockingQueue<?> myQueue;
public LinkedBlockingChannel(ThreadPoolExecutor theExecutor, BlockingQueue<?> theQueue) {
super(theExecutor);
myQueue = theQueue;
}
public int getQueueSizeForUnitTest() {
return myQueue.size();
}
public void clearInterceptorsForUnitTest() {
while (getInterceptors().size() > 0) {
removeInterceptor(0);
}
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.channel.queue;
package ca.uhn.fhir.jpa.subscription.channel.impl;
/*-
* #%L
@ -20,13 +20,15 @@ package ca.uhn.fhir.jpa.subscription.channel.queue;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerOptions;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import java.util.Collections;
import java.util.HashMap;
@ -38,29 +40,29 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
public class LinkedBlockingChannelFactory implements IChannelFactory {
private Map<String, LinkedBlockingQueueChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingQueueChannelFactory.class);
private Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>());
private static final Logger ourLog = LoggerFactory.getLogger(LinkedBlockingChannelFactory.class);
/**
* Constructor
*/
public LinkedBlockingQueueChannelFactory() {
public LinkedBlockingChannelFactory() {
super();
}
@Override
public IQueueChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig) {
public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
@Override
public IQueueChannelSender getOrCreateSender(String theChannelName, Class<?> theMessageType, QueueChannelConsumerConfig theConfig) {
public IChannelProducer getOrCreateSender(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
}
private LinkedBlockingQueueChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
private LinkedBlockingChannel getOrCreateChannel(String theChannelName, int theConcurrentConsumers) {
return myChannels.computeIfAbsent(theChannelName, t -> {
String threadNamingPattern = theChannelName + "-%d";
@ -92,7 +94,7 @@ public class LinkedBlockingQueueChannelFactory implements IQueueChannelFactory {
queue,
threadFactory,
rejectedExecutionHandler);
return new LinkedBlockingQueueChannel(executor);
return new LinkedBlockingChannel(executor, queue);
});
}

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -20,13 +20,13 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelSender;
import ca.uhn.fhir.jpa.subscription.channel.queue.QueueChannelConsumerConfig;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerOptions;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
@ -37,46 +37,46 @@ import org.springframework.messaging.support.ChannelInterceptor;
public class SubscriptionChannelFactory {
private final IQueueChannelFactory myQueueChannelFactory;
private final IChannelFactory myQueueChannelFactory;
/**
* Constructor
*/
public SubscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
public SubscriptionChannelFactory(IChannelFactory theQueueChannelFactory) {
Validate.notNull(theQueueChannelFactory);
myQueueChannelFactory = theQueueChannelFactory;
}
public IQueueChannelSender newDeliverySendingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
public IChannelProducer newDeliverySendingChannel(String theChannelName, ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = newConfigForDeliveryChannel(theOptions);
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceDeliveryJsonMessage.class, config);
}
public IQueueChannelReceiver newDeliveryReceivingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForDeliveryChannel();
IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config);
public IChannelReceiver newDeliveryReceivingChannel(String theChannelName, ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = newConfigForDeliveryChannel(theOptions);
IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
public IQueueChannelSender newMatchingSendingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForMatchingChannel();
public IChannelProducer newMatchingSendingChannel(String theChannelName, ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = newConfigForMatchingChannel(theOptions);
return myQueueChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedJsonMessage.class, config);
}
public IQueueChannelReceiver newMatchingReceivingChannel(String theChannelName) {
QueueChannelConsumerConfig config = newConfigForMatchingChannel();
IQueueChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config);
public IChannelReceiver newMatchingReceivingChannel(String theChannelName, ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = newConfigForMatchingChannel(theOptions);
IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel);
}
protected QueueChannelConsumerConfig newConfigForDeliveryChannel() {
QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
protected ChannelConsumerOptions newConfigForDeliveryChannel(ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = new ChannelConsumerOptions();
config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
return config;
}
protected QueueChannelConsumerConfig newConfigForMatchingChannel() {
QueueChannelConsumerConfig config = new QueueChannelConsumerConfig();
protected ChannelConsumerOptions newConfigForMatchingChannel(ChannelConsumerOptions theOptions) {
ChannelConsumerOptions config = new ChannelConsumerOptions();
config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
return config;
}
@ -89,11 +89,11 @@ public class SubscriptionChannelFactory {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
}
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IQueueChannelReceiver, DisposableBean {
public static class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver, DisposableBean {
private final IQueueChannelReceiver myWrappedChannel;
private final IChannelReceiver myWrappedChannel;
public BroadcastingSubscribableChannelWrapper(IQueueChannelReceiver theChannel) {
public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) {
theChannel.subscribe(message -> send(message));
myWrappedChannel = theChannel;
}

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.slf4j.Logger;
@ -29,7 +31,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.util.Map;
import java.util.Optional;
@ -42,7 +43,7 @@ public class SubscriptionChannelRegistry {
// This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
// Key Channel Name, Value Subscription Id
private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
private final Map<String, MessageChannel> myChannelNameToSender = new ConcurrentHashMap<>();
private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>();
@Autowired
private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
@ -59,23 +60,31 @@ public class SubscriptionChannelRegistry {
return;
}
SubscribableChannel deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(channelName);
IChannelReceiver channelReceiver = newReceivingChannel(channelName);
Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, channelReceiver);
deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers);
MessageChannel sendingChannel = mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(channelName);
IChannelProducer sendingChannel = newSendingChannel(channelName);
myChannelNameToSender.put(channelName, sendingChannel);
}
protected IChannelReceiver newReceivingChannel(String theChannelName) {
return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theChannelName, null);
}
protected IChannelProducer newSendingChannel(String theChannelName) {
return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theChannelName, null);
}
public synchronized void remove(ActiveSubscription theActiveSubscription) {
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);
}
// This was the last one. Close and remove the channel

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -21,14 +21,12 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
*/
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.process.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import java.util.Optional;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.config;
package ca.uhn.fhir.jpa.subscription.match.config;
/*-
* #%L
@ -24,29 +24,25 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegi
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.DaoResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.IResourceRetriever;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.process.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.process.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketConnectionValidator;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.CompositeInMemoryDaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.MatchingQueueSubscriberLoader;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionRegisteringSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.DaoSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.process.registry.ISubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.subscription.match.deliver.DaoResourceRetriever;
import ca.uhn.fhir.jpa.subscription.match.deliver.IResourceRetriever;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.CompositeInMemoryDaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.InMemorySubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.MatchingQueueSubscriberLoader;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionRegisteringSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.DaoSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.match.registry.ISubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.config;
package ca.uhn.fhir.jpa.subscription.match.config;
/*
* #%L
@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.process.config;
*/
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.SubscriptionWebsocketHandler;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketConnectionValidator;
import ca.uhn.fhir.jpa.subscription.match.deliver.websocket.SubscriptionWebsocketHandler;
import ca.uhn.fhir.jpa.subscription.match.deliver.websocket.WebsocketConnectionValidator;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver;
package ca.uhn.fhir.jpa.subscription.match.deliver;
/*-
* #%L
@ -26,8 +26,8 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver;
package ca.uhn.fhir.jpa.subscription.match.deliver;
/*-
* #%L
@ -24,9 +24,8 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.FhirTerser;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.email;
package ca.uhn.fhir.jpa.subscription.match.deliver.email;
/*-
* #%L

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.email;
package ca.uhn.fhir.jpa.subscription.match.deliver.email;
/*-
* #%L

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.email;
package ca.uhn.fhir.jpa.subscription.match.deliver.email;
/*-
* #%L
@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.subscription.process.deliver.email;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.process.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import org.apache.commons.lang3.StringUtils;
@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.resthook;
package ca.uhn.fhir.jpa.subscription.match.deliver.resthook;
/*-
* #%L
@ -24,8 +24,8 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.process.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.process.deliver.IResourceRetriever;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
import ca.uhn.fhir.jpa.subscription.match.deliver.IResourceRetriever;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.client.api.*;
@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.*;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
package ca.uhn.fhir.jpa.subscription.match.deliver.websocket;
/*
* #%L
@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
package ca.uhn.fhir.jpa.subscription.match.deliver.websocket;
/*-
* #%L
@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
*/
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import com.sun.istack.NotNull;
import org.hl7.fhir.r4.model.IdType;
import org.slf4j.Logger;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
package ca.uhn.fhir.jpa.subscription.match.deliver.websocket;
/*-
* #%L
@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.process.deliver.websocket;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
public class WebsocketValidationResponse {
private final boolean myValid;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.matching;
package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
/*-
* #%L
@ -35,9 +35,6 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
@Autowired

View File

@ -1,19 +1,16 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import static ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
/*-
* #%L
@ -52,7 +49,7 @@ public class MatchingQueueSubscriberLoader {
@EventListener(classes = {ContextRefreshedEvent.class})
public void handleContextRefreshEvent() {
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
myMatchingChannel = mySubscriptionChannelFactory.newMatchingReceivingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
}
if (myMatchingChannel != null) {
myMatchingChannel.subscribe(mySubscriptionMatchingSubscriber);

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
/*-
* #%L
@ -26,10 +26,10 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
@ -11,9 +11,9 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.api.EncodingEnum;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
/*-
* #%L
@ -22,8 +22,8 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.registry;
package ca.uhn.fhir.jpa.subscription.match.registry;
/*-
* #%L
@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.subscription.process.registry;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.registry;
package ca.uhn.fhir.jpa.subscription.match.registry;
/*-
* #%L
@ -25,7 +25,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.model.config;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -6,12 +6,12 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import com.google.common.annotations.VisibleForTesting;
@ -26,8 +26,6 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.PostConstruct;
/*-
* #%L
* HAPI FHIR Subscription Server
@ -70,7 +68,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@EventListener(classes = {ContextRefreshedEvent.class})
public void startIfNeeded() {
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME);
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, null);
}
}
@ -148,7 +146,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
}
@VisibleForTesting
public LinkedBlockingQueueChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingQueueChannel) myMatchingChannel;
public LinkedBlockingChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingChannel) myMatchingChannel;
}
}

View File

@ -27,9 +27,9 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;

View File

@ -33,7 +33,7 @@ import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -36,7 +36,7 @@ public class SubscriptionChannelFactoryTest {
@Before
public void before() {
mySvc = new SubscriptionChannelFactory(new LinkedBlockingQueueChannelFactory());
mySvc = new SubscriptionChannelFactory(new LinkedBlockingChannelFactory());
}
/**
@ -45,7 +45,7 @@ public class SubscriptionChannelFactoryTest {
@Test
public void testInterceptorsOnChannelWrapperArePropagated() {
IQueueChannelReceiver channel = mySvc.newDeliveryReceivingChannel("CHANNEL_NAME");
IChannelReceiver channel = mySvc.newDeliveryReceivingChannel("CHANNEL_NAME", null);
channel.subscribe(new NpeThrowingHandler());
channel.addInterceptor(myInterceptor);

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.deliver;
package ca.uhn.fhir.jpa.subscription.match.deliver;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
@ -7,8 +7,8 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.matcher.matching;
package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
@ -10,7 +10,7 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import org.junit.Test;
import org.junit.runner.RunWith;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.subscription.process.registry;
package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.model.primitive.IdDt;

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -23,8 +23,8 @@ package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
@ -49,12 +49,12 @@ public class SubscriptionTestConfig {
}
@Bean
public IQueueChannelFactory subscribableChannelFactory() {
return new LinkedBlockingQueueChannelFactory();
public IChannelFactory subscribableChannelFactory() {
return new LinkedBlockingChannelFactory();
}
@Bean
public SubscriptionChannelFactory subscriptionChannelFactory(IQueueChannelFactory theQueueChannelFactory) {
public SubscriptionChannelFactory subscriptionChannelFactory(IChannelFactory theQueueChannelFactory) {
return new SubscriptionChannelFactory(theQueueChannelFactory);
}

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionDeliveryHandlerFactory;
@ -17,7 +17,6 @@ import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
@RunWith(SpringRunner.class)
public class SubscriptionChannelRegistryTest {

View File

@ -13,11 +13,11 @@ import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketConnectionValidator;
import ca.uhn.fhir.jpa.subscription.process.deliver.websocket.WebsocketValidationResponse;
import ca.uhn.fhir.jpa.subscription.process.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.deliver.websocket.WebsocketConnectionValidator;
import ca.uhn.fhir.jpa.subscription.match.deliver.websocket.WebsocketValidationResponse;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import org.hl7.fhir.r4.model.IdType;
import org.junit.Before;
import org.junit.Test;
@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;

View File

@ -52,7 +52,7 @@ public class SubscriptionSubmitInterceptorLoaderTest {
/**
* It should be possible to run only the {@link SubscriptionSubmitterConfig} without the
* {@link ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig}
* {@link ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig}
*/
@Test
public void testLoaderCanRunWithoutProcessorConfigLoaded() {

View File

@ -6,7 +6,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.provider.GraphQLProvider;
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;

View File

@ -1,8 +1,8 @@
package ca.uhn.fhirtest.config;
import ca.uhn.fhir.jpa.subscription.process.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.match.config.WebsocketDispatcherConfig;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.interceptor.LoggingInterceptor;

View File

@ -31,7 +31,7 @@ import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.provider.BaseJpaProvider;
import ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.process.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.okhttp.client.OkHttpRestfulClientFactory;
import ca.uhn.fhir.rest.client.apache.ApacheRestfulClientFactory;