From 68db40d057b961f4fe3b91b5d6d99227d1922a71 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Fri, 3 Apr 2020 08:07:39 -0400 Subject: [PATCH] Queue factory cleanup --- .../subscription/SubscriptionChannelFactory.java | 5 +++-- .../subscriber/MatchingQueueSubscriberLoader.java | 3 ++- .../SubscriptionMatcherInterceptor.java | 14 +++++++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java index b4c18e2bff9..03b5704d650 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/channel/subscription/SubscriptionChannelFactory.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.channel.queue.IQueueChannelFactory; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public class SubscriptionChannelFactory { @@ -35,8 +36,8 @@ public class SubscriptionChannelFactory { return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); } - public SubscribableChannel newMatchingSendingChannel(String theChannelName) { - return mySubscribableChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); + public MessageChannel newMatchingSendingChannel(String theChannelName) { + return mySubscribableChannelFactory.getOrCreateSender(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); } public SubscribableChannel newMatchingReceivingChannel(String theChannelName) { diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java index a4b7d6ef9cf..82202cb777f 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/MatchingQueueSubscriberLoader.java @@ -10,6 +10,8 @@ import org.springframework.messaging.SubscribableChannel; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import static ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME; + /*- * #%L * HAPI FHIR Subscription Server @@ -32,7 +34,6 @@ import javax.annotation.PreDestroy; public class MatchingQueueSubscriberLoader { private Logger ourLog = LoggerFactory.getLogger(MatchingQueueSubscriberLoader.class); - public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; @Autowired private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java index ceb3dccf7d9..d5c2371063b 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/submit/interceptor/SubscriptionMatcherInterceptor.java @@ -1,13 +1,17 @@ package ca.uhn.fhir.jpa.subscription.submit.interceptor; import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.interceptor.api.*; +import ca.uhn.fhir.interceptor.api.Hook; +import ca.uhn.fhir.interceptor.api.HookParams; +import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; +import ca.uhn.fhir.interceptor.api.Interceptor; +import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.subscription.channel.queue.LinkedBlockingQueueChannel; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; +import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.process.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.process.matcher.subscriber.SubscriptionMatchingSubscriber; -import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster; import ca.uhn.fhir.rest.api.server.RequestDetails; import com.google.common.annotations.VisibleForTesting; @@ -16,7 +20,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.MessageChannel; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -54,7 +58,7 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer @Autowired private SubscriptionChannelFactory mySubscriptionChannelFactory; - private SubscribableChannel myMatchingChannel; + private MessageChannel myMatchingChannel; /** * Constructor