Queue factory cleanup

This commit is contained in:
jamesagnew 2020-04-03 08:07:39 -04:00
parent 7c6024fa14
commit 68db40d057
3 changed files with 14 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
public class SubscriptionChannelFactory { public class SubscriptionChannelFactory {
@ -35,8 +36,8 @@ public class SubscriptionChannelFactory {
return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingSendingChannel(String theChannelName) { public MessageChannel newMatchingSendingChannel(String theChannelName) {
return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingReceivingChannel(String theChannelName) { public SubscribableChannel newMatchingReceivingChannel(String theChannelName) {

View File

@ -10,6 +10,8 @@ import org.springframework.messaging.SubscribableChannel;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import static ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
/*- /*-
* #%L * #%L
* HAPI FHIR Subscription Server * HAPI FHIR Subscription Server
@ -32,7 +34,6 @@ import javax.annotation.PreDestroy;
public class MatchingQueueSubscriberLoader { public class MatchingQueueSubscriberLoader {
private Logger ourLog = LoggerFactory.getLogger(MatchingQueueSubscriberLoader.class); private Logger ourLog = LoggerFactory.getLogger(MatchingQueueSubscriberLoader.class);
public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
@Autowired @Autowired
private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;

View File

@ -1,13 +1,17 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor; package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.*; import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel; import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.process.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -16,7 +20,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -54,7 +58,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@Autowired @Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory; private SubscriptionChannelFactory mySubscriptionChannelFactory;
private SubscribableChannel myMatchingChannel; private MessageChannel myMatchingChannel;
/** /**
* Constructor * Constructor