Use higher thread count for subscription processor by default

This commit is contained in:
James Agnew 2017-09-06 11:08:43 -07:00
parent 989246f0da
commit df3aa86471
2 changed files with 39 additions and 12 deletions

View File

@ -23,12 +23,14 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Subscription;
@ -62,13 +64,21 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
private SubscribableChannel myProcessingChannel;
private SubscribableChannel myDeliveryChannel;
private ExecutorService myExecutor;
private int myExecutorThreadCount = 1;
private int myExecutorThreadCount;
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, IBaseResource> myIdToSubscription = new ConcurrentHashMap<>();
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private BlockingQueue<Runnable> myExecutorQueue;
/**
* Constructor
*/
public BaseSubscriptionInterceptor() {
super();
setExecutorThreadCount(5);
}
public abstract Subscription.SubscriptionChannelType getChannelType();
public SubscribableChannel getDeliveryChannel() {
@ -83,6 +93,15 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
return myExecutorQueue;
}
public int getExecutorThreadCount() {
return myExecutorThreadCount;
}
public void setExecutorThreadCount(int theExecutorThreadCount) {
Validate.inclusiveBetween(1, Integer.MAX_VALUE, theExecutorThreadCount);
myExecutorThreadCount = theExecutorThreadCount;
}
public ConcurrentHashMap<String, IBaseResource> getIdToSubscription() {
return myIdToSubscription;
}
@ -97,13 +116,6 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
protected abstract IFhirResourceDao<?> getSubscriptionDao();
/**
* Constructor
*/
public BaseSubscriptionInterceptor() {
super();
}
/**
* Read the existing subscriptions from the database
*/
@ -147,15 +159,28 @@ public abstract class BaseSubscriptionInterceptor extends ServerOperationInterce
public void postConstruct() {
myExecutorQueue = new LinkedBlockingQueue<>(1000);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myExecutorQueue.size());
StopWatch sw = new StopWatch();
try {
myExecutorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("subscription-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
myExecutor = new ThreadPoolExecutor(
myExecutorThreadCount,
myExecutorThreadCount,
1,
getExecutorThreadCount(),
0L,
TimeUnit.MILLISECONDS,
myExecutorQueue,

View File

@ -67,7 +67,9 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriptionSubscriber
getSubscriptionDao().update(theSubscription);
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
} else if (activeStatus.equals(statusString)) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
if (!getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
}
getIdToSubscription().put(theSubscription.getIdElement().getIdPart(), theSubscription);
} else {
if (getIdToSubscription().containsKey(theSubscription.getIdElement().getIdPart())) {