add optional queue_size to fixed thread pool, and reject_policy

This commit is contained in:
Shay Banon 2011-09-01 13:26:41 +03:00
parent d98ac9b6fc
commit cc051014d3
1 changed files with 14 additions and 11 deletions

View File

@ -36,14 +36,7 @@ import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.unit.TimeValue.*;
@ -177,11 +170,21 @@ public class ThreadPool extends AbstractComponent {
threadFactory);
} else if ("fixed".equals(type)) {
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
logger.debug("creating thread_pool [{}], type [{}], size [{}]", name, type, size);
int queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", -1));
RejectedExecutionHandler rejectedExecutionHandler;
String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort"));
if ("abort".equals(rejectSetting)) {
rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
} else if ("caller".equals(rejectSetting)) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
} else {
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool");
}
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, queueSize, rejectSetting);
return new ThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS,
new LinkedTransferQueue<Runnable>(),
threadFactory);
queueSize <= 0 ? new LinkedTransferQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queueSize),
threadFactory, rejectedExecutionHandler);
} else if ("scaling".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));