From 7ae8d4c669d6afdcc8c5d1fb1773374bf523874e Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 24 Aug 2010 15:28:54 +0300 Subject: [PATCH] thread pool type setting is wrong, fix blocking max setting to have a minimum of 10, use cached TP where needed --- .../service/InternalClusterService.java | 14 +++++----- .../common/util/concurrent/EsExecutors.java | 10 ++++++- .../TransferThreadPoolExecutor.java | 6 ++++- .../elasticsearch/threadpool/ThreadPool.java | 17 +++--------- .../threadpool/ThreadPoolModule.java | 2 +- .../blocking/BlockingThreadPool.java | 18 ++++++++----- .../threadpool/scaling/ScalingThreadPool.java | 2 +- .../support/AbstractThreadPool.java | 24 ----------------- .../transport/TransportService.java | 2 +- .../netty/MessageChannelHandler.java | 26 +++++++++---------- .../concurrent/BlockingThreadPoolTest.java | 2 +- 11 files changed, 55 insertions(+), 68 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index dbbfe4726a7..40b8f5deec5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -202,13 +202,15 @@ public class InternalClusterService extends AbstractLifecycleComponent(), + threadFactory); + } + public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { String name = settings.get("name"); if (name == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java index be00eb50860..16d6f0b97b8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/util/concurrent/TransferThreadPoolExecutor.java @@ -32,6 +32,10 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** + * A thread pool based on {@link org.elasticsearch.common.util.concurrent.jsr166y.TransferQueue}. + * + *

Limited compared to ExecutorServer in what it does, but focused on speed. + * * @author kimchy (shay.banon) */ public class TransferThreadPoolExecutor extends AbstractExecutorService { @@ -247,7 +251,7 @@ public class TransferThreadPoolExecutor extends AbstractExecutorService { succeeded = workQueue.tryTransfer(command, blockingTime, TimeUnit.NANOSECONDS); if (!succeeded) { throw new RejectedExecutionException("Rejected execution after waiting " - + TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "ms for task [" + command.getClass() + "] to be executed."); + + TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "s for task [" + command.getClass() + "] to be executed."); } } catch (InterruptedException e) { throw new RejectedExecutionException(e); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 648953669d5..47000341601 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -21,7 +21,10 @@ package org.elasticsearch.threadpool; import org.elasticsearch.common.unit.TimeValue; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * @author kimchy (shay.banon) @@ -96,18 +99,6 @@ public interface ThreadPool extends Executor { void execute(Runnable command); - Future submit(Callable task); - - Future submit(Runnable task, T result); - - Future submit(Runnable task); - - Future submit(Callable task, FutureListener listener); - - Future submit(Runnable task, T result, FutureListener listener); - - Future submit(Runnable task, FutureListener listener); - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java index fd15002cbd3..9c7f662789c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java @@ -39,7 +39,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules { } @Override public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(settings.getAsClass("transport.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings)); + return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings)); } @Override protected void configure() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java index 927054b411f..0f7aa5a8dc7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java @@ -21,8 +21,7 @@ package org.elasticsearch.threadpool.blocking; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; @@ -58,14 +57,21 @@ public class BlockingThreadPool extends AbstractThreadPool { super(settings); this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20); this.min = componentSettings.getAsInt("min", 1); - this.max = componentSettings.getAsInt("max", 100); - this.capacity = (int) componentSettings.getAsBytesSize("capacity", new ByteSizeValue(1, ByteSizeUnit.KB)).bytes(); + int max = componentSettings.getAsInt("max", 100); + if (max < 10) { + logger.warn("blocking threadpool max threads [{}] must not be lower than 10, setting it to 10", max); + max = 10; + } + this.max = max; + + // capacity is set to 0 as it might cause starvation in blocking mode + this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles(); this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60)); this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60)); - logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize); + logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize); executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]")); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); - cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]")); + cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); started = true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java index cae8cd9f5df..d2f0d4de985 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java @@ -57,7 +57,7 @@ public class ScalingThreadPool extends AbstractThreadPool { logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]")); - cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]")); + cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); started = true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java index 83dd5ebdb66..f03ee88177a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java @@ -114,30 +114,6 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th return result; } - @Override public Future submit(Callable task) { - return executorService.submit(task); - } - - @Override public Future submit(Callable task, FutureListener listener) { - return executorService.submit(new FutureCallable(task, listener)); - } - - @Override public Future submit(Runnable task, T result) { - return executorService.submit(task, result); - } - - @Override public Future submit(Runnable task, T result, FutureListener listener) { - return executorService.submit(new FutureRunnable(task, result, listener), result); - } - - @Override public Future submit(Runnable task) { - return executorService.submit(task); - } - - @Override public Future submit(Runnable task, FutureListener listener) { - return executorService.submit(new FutureRunnable(task, null, listener)); - } - @Override public ScheduledFuture schedule(Runnable command, TimeValue delay) { return schedule(command, delay.millis(), TimeUnit.MILLISECONDS); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 2c237ca619b..316b0fe6864 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -261,7 +261,7 @@ public class TransportService extends AbstractLifecycleComponent