changes needed to support different number of consumers in matching vs delivery

This commit is contained in:
Ken Stevens 2019-02-17 15:23:01 -05:00
parent fb5d94383b
commit 3af0bc7206
6 changed files with 24 additions and 10 deletions

View File

@ -48,7 +48,7 @@ import javax.annotation.PreDestroy;
public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer { public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class); private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
private static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
private SubscribableChannel myProcessingChannel; private SubscribableChannel myProcessingChannel;
@Autowired @Autowired

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.module;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@ -41,7 +40,7 @@ public class LinkedBlockingQueueSubscribableChannel implements SubscribableChann
private final ExecutorSubscribableChannel mySubscribableChannel; private final ExecutorSubscribableChannel mySubscribableChannel;
private final BlockingQueue<Runnable> myQueue; private final BlockingQueue<Runnable> myQueue;
public LinkedBlockingQueueSubscribableChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern) { public LinkedBlockingQueueSubscribableChannel(BlockingQueue<Runnable> theQueue, String theThreadNamingPattern, int theConcurrentConsumers) {
ThreadFactory threadFactory = new BasicThreadFactory.Builder() ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern(theThreadNamingPattern) .namingPattern(theThreadNamingPattern)
@ -62,7 +61,7 @@ public class LinkedBlockingQueueSubscribableChannel implements SubscribableChann
}; };
ThreadPoolExecutor executor = new ThreadPoolExecutor( ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1,
SubscriptionConstants.EXECUTOR_THREAD_COUNT, theConcurrentConsumers,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
theQueue, theQueue,

View File

@ -23,5 +23,9 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
public interface ISubscribableChannelFactory { public interface ISubscribableChannelFactory {
SubscribableChannel createSubscribableChannel(String theChannelName); SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers);
int getDeliveryChannelConcurrentConsumers();
int getMatchingChannelConcurrentConsumers();
} }

View File

@ -27,7 +27,17 @@ import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory { public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory {
@Override @Override
public SubscribableChannel createSubscribableChannel(String theChannelName) { public SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) {
return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d"); return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers);
}
@Override
public int getDeliveryChannelConcurrentConsumers() {
return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
}
@Override
public int getMatchingChannelConcurrentConsumers() {
return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
} }
} }

View File

@ -39,10 +39,10 @@ public class SubscriptionChannelFactory {
theChannelType + theChannelType +
"-" + "-" +
theSubscriptionId; theSubscriptionId;
return mySubscribableChannelFactory.createSubscribableChannel(channelName); return mySubscribableChannelFactory.createSubscribableChannel(channelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingChannel(String theChannelName) { public SubscribableChannel newMatchingChannel(String theChannelName) {
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
} }
} }

View File

@ -79,7 +79,8 @@ public class SubscriptionConstants {
/** /**
* The number of threads used in subscription channel processing * The number of threads used in subscription channel processing
*/ */
public static final int EXECUTOR_THREAD_COUNT = 5; public static final int MATCHING_CHANNEL_CONCURRENT_CONSUMERS = 5;
public static final int DELIVERY_CHANNEL_CONCURRENT_CONSUMERS = 5;
/** /**
* The maximum number of subscriptions that can be active at once * The maximum number of subscriptions that can be active at once