One more change

This commit is contained in:
jamesagnew 2020-04-07 19:09:44 -04:00
parent edc30568f2
commit 8a77d839c3
5 changed files with 27 additions and 28 deletions

View File

@ -20,12 +20,12 @@ package ca.uhn.fhir.jpa.subscription.channel.api;
* #L% * #L%
*/ */
public class ChannelConsumerOptions { public class ChannelConsumerSettings {
/** /**
* Constructor * Constructor
*/ */
public ChannelConsumerOptions() { public ChannelConsumerSettings() {
super(); super();
} }
@ -35,7 +35,7 @@ public class ChannelConsumerOptions {
return myConcurrentConsumers; return myConcurrentConsumers;
} }
public ChannelConsumerOptions setConcurrentConsumers(int theConcurrentConsumers) { public ChannelConsumerSettings setConcurrentConsumers(int theConcurrentConsumers) {
myConcurrentConsumers = theConcurrentConsumers; myConcurrentConsumers = theConcurrentConsumers;
return this; return this;
} }

View File

@ -30,7 +30,7 @@ public interface IChannelFactory {
* Create a channel that is used to receive messages from the queue. * Create a channel that is used to receive messages from the queue.
* *
* <p> * <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerOptions)} * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerSettings)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance. * when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p> * </p>
* *
@ -38,18 +38,18 @@ public interface IChannelFactory {
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures. * @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for * @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and * both {@link #getOrCreateReceiver} and
* {@link #getOrCreateProducer(String, Class, ChannelConsumerOptions)} * {@link #getOrCreateProducer(String, Class, ChannelConsumerSettings)}
* even though this object is used to configure the sender only. We do this because the factory * even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows * may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first. * the config details to be known regardless of which method is returned first.
*/ */
IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig); IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig);
/** /**
* Create a channel that is used to send messages to the queue. * Create a channel that is used to send messages to the queue.
* *
* <p> * <p>
* Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerOptions)} * Implementations can choose to return the same object for multiple invocations of this method (and {@link #getOrCreateReceiver(String, Class, ChannelConsumerSettings)}
* when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance. * when invoked with the same {@literal theChannelName} if they need to, or they can create a new instance.
* </p> * </p>
* *
@ -57,11 +57,11 @@ public interface IChannelFactory {
* @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures. * @param theMessageType The object type that will be placed on this queue. Objects will be Jackson-annotated structures.
* @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for * @param theConfig Contains the configuration for subscribers. Note that this parameter is provided for
* both {@link #getOrCreateReceiver} and * both {@link #getOrCreateReceiver} and
* {@link #getOrCreateProducer(String, Class, ChannelConsumerOptions)} * {@link #getOrCreateProducer(String, Class, ChannelConsumerSettings)}
* even though this object is used to configure the sender only. We do this because the factory * even though this object is used to configure the sender only. We do this because the factory
* may want to create a single object to be used for both the sender and receiver, so this allows * may want to create a single object to be used for both the sender and receiver, so this allows
* the config details to be known regardless of which method is returned first. * the config details to be known regardless of which method is returned first.
*/ */
IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig); IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig);
} }

View File

@ -20,7 +20,7 @@ package ca.uhn.fhir.jpa.subscription.channel.impl;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerOptions; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
@ -54,12 +54,12 @@ public class LinkedBlockingChannelFactory implements IChannelFactory {
} }
@Override @Override
public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig) { public IChannelReceiver getOrCreateReceiver(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers()); return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
} }
@Override @Override
public IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelConsumerOptions theConfig) { public IChannelProducer getOrCreateProducer(String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theConfig) {
return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers()); return getOrCreateChannel(theChannelName, theConfig.getConcurrentConsumers());
} }

View File

@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.subscription.channel.subscription;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerOptions; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
@ -47,36 +47,36 @@ public class SubscriptionChannelFactory {
myQueueChannelFactory = theQueueChannelFactory; myQueueChannelFactory = theQueueChannelFactory;
} }
public IChannelProducer newDeliverySendingChannel(String theChannelName, ChannelConsumerOptions theOptions) { public IChannelProducer newDeliverySendingChannel(String theChannelName, ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = newConfigForDeliveryChannel(theOptions); ChannelConsumerSettings config = newConfigForDeliveryChannel(theOptions);
return myQueueChannelFactory.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config); return myQueueChannelFactory.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config);
} }
public IChannelReceiver newDeliveryReceivingChannel(String theChannelName, ChannelConsumerOptions theOptions) { public IChannelReceiver newDeliveryReceivingChannel(String theChannelName, ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = newConfigForDeliveryChannel(theOptions); ChannelConsumerSettings config = newConfigForDeliveryChannel(theOptions);
IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config); IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceDeliveryJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel); return new BroadcastingSubscribableChannelWrapper(channel);
} }
public IChannelProducer newMatchingSendingChannel(String theChannelName, ChannelConsumerOptions theOptions) { public IChannelProducer newMatchingSendingChannel(String theChannelName, ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = newConfigForMatchingChannel(theOptions); ChannelConsumerSettings config = newConfigForMatchingChannel(theOptions);
return myQueueChannelFactory.getOrCreateProducer(theChannelName, ResourceModifiedJsonMessage.class, config); return myQueueChannelFactory.getOrCreateProducer(theChannelName, ResourceModifiedJsonMessage.class, config);
} }
public IChannelReceiver newMatchingReceivingChannel(String theChannelName, ChannelConsumerOptions theOptions) { public IChannelReceiver newMatchingReceivingChannel(String theChannelName, ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = newConfigForMatchingChannel(theOptions); ChannelConsumerSettings config = newConfigForMatchingChannel(theOptions);
IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config); IChannelReceiver channel = myQueueChannelFactory.getOrCreateReceiver(theChannelName, ResourceModifiedJsonMessage.class, config);
return new BroadcastingSubscribableChannelWrapper(channel); return new BroadcastingSubscribableChannelWrapper(channel);
} }
protected ChannelConsumerOptions newConfigForDeliveryChannel(ChannelConsumerOptions theOptions) { protected ChannelConsumerSettings newConfigForDeliveryChannel(ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = new ChannelConsumerOptions(); ChannelConsumerSettings config = new ChannelConsumerSettings();
config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers()); config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
return config; return config;
} }
protected ChannelConsumerOptions newConfigForMatchingChannel(ChannelConsumerOptions theOptions) { protected ChannelConsumerSettings newConfigForMatchingChannel(ChannelConsumerSettings theOptions) {
ChannelConsumerOptions config = new ChannelConsumerOptions(); ChannelConsumerSettings config = new ChannelConsumerSettings();
config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers()); config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
return config; return config;
} }

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerOptions; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionLoader;
@ -46,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
@ -57,7 +56,7 @@ import java.util.List;
public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test { public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends BaseSubscriptionDstu3Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
public static final ChannelConsumerOptions CONSUMER_OPTIONS = new ChannelConsumerOptions().setConcurrentConsumers(1); public static final ChannelConsumerSettings CONSUMER_OPTIONS = new ChannelConsumerSettings().setConcurrentConsumers(1);
protected static ObservationListener ourObservationListener; protected static ObservationListener ourObservationListener;
@Autowired @Autowired