thread pool type setting is wrong, fix blocking max setting to have a minimum of 10, use cached TP where needed

This commit is contained in:
kimchy 2010-08-24 15:28:54 +03:00
parent b8ab50828c
commit 7ae8d4c669
11 changed files with 55 additions and 68 deletions

View File

@ -202,13 +202,15 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
listener.clusterChanged(clusterChangedEvent); listener.clusterChanged(clusterChangedEvent);
} }
threadPool.execute(new Runnable() { if (!nodesDelta.removedNodes().isEmpty()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() { @Override public void run() {
for (DiscoveryNode node : nodesDelta.removedNodes()) { for (DiscoveryNode node : nodesDelta.removedNodes()) {
transportService.disconnectFromNode(node); transportService.disconnectFromNode(node);
} }
} }
}); });
}
// if we are the master, publish the new state to all nodes // if we are the master, publish the new state to all nodes
if (clusterState.nodes().localNodeMaster()) { if (clusterState.nodes().localNodeMaster()) {

View File

@ -20,14 +20,22 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class EsExecutors { public class EsExecutors {
public static ExecutorService newCachedThreadPool(TimeValue keepAlive, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
String name = settings.get("name"); String name = settings.get("name");
if (name == null) { if (name == null) {

View File

@ -32,6 +32,10 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
* A thread pool based on {@link org.elasticsearch.common.util.concurrent.jsr166y.TransferQueue}.
*
* <p>Limited compared to ExecutorServer in what it does, but focused on speed.
*
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class TransferThreadPoolExecutor extends AbstractExecutorService { public class TransferThreadPoolExecutor extends AbstractExecutorService {
@ -247,7 +251,7 @@ public class TransferThreadPoolExecutor extends AbstractExecutorService {
succeeded = workQueue.tryTransfer(command, blockingTime, TimeUnit.NANOSECONDS); succeeded = workQueue.tryTransfer(command, blockingTime, TimeUnit.NANOSECONDS);
if (!succeeded) { if (!succeeded) {
throw new RejectedExecutionException("Rejected execution after waiting " throw new RejectedExecutionException("Rejected execution after waiting "
+ TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "ms for task [" + command.getClass() + "] to be executed."); + TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "s for task [" + command.getClass() + "] to be executed.");
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RejectedExecutionException(e); throw new RejectedExecutionException(e);

View File

@ -21,7 +21,10 @@ package org.elasticsearch.threadpool;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.*; import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -96,18 +99,6 @@ public interface ThreadPool extends Executor {
void execute(Runnable command); void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task, FutureListener<T> listener);
<T> Future<T> submit(Runnable task, T result, FutureListener<T> listener);
Future<?> submit(Runnable task, FutureListener<?> listener);
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

View File

@ -39,7 +39,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules {
} }
@Override public Iterable<? extends Module> spawnModules() { @Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("transport.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings)); return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
} }
@Override protected void configure() { @Override protected void configure() {

View File

@ -21,8 +21,7 @@ package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
@ -58,14 +57,21 @@ public class BlockingThreadPool extends AbstractThreadPool {
super(settings); super(settings);
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20); this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
this.min = componentSettings.getAsInt("min", 1); this.min = componentSettings.getAsInt("min", 1);
this.max = componentSettings.getAsInt("max", 100); int max = componentSettings.getAsInt("max", 100);
this.capacity = (int) componentSettings.getAsBytesSize("capacity", new ByteSizeValue(1, ByteSizeUnit.KB)).bytes(); if (max < 10) {
logger.warn("blocking threadpool max threads [{}] must not be lower than 10, setting it to 10", max);
max = 10;
}
this.max = max;
// capacity is set to 0 as it might cause starvation in blocking mode
this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles();
this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60)); this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60));
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60)); this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize); logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]")); executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]")); cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true; started = true;
} }

View File

@ -57,7 +57,7 @@ public class ScalingThreadPool extends AbstractThreadPool {
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize); logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]")); executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]")); cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true; started = true;
} }

View File

@ -114,30 +114,6 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return result; return result;
} }
@Override public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(task);
}
@Override public <T> Future<T> submit(Callable<T> task, FutureListener<T> listener) {
return executorService.submit(new FutureCallable<T>(task, listener));
}
@Override public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(task, result);
}
@Override public <T> Future<T> submit(Runnable task, T result, FutureListener<T> listener) {
return executorService.submit(new FutureRunnable<T>(task, result, listener), result);
}
@Override public Future<?> submit(Runnable task) {
return executorService.submit(task);
}
@Override public Future<?> submit(Runnable task, FutureListener<?> listener) {
return executorService.submit(new FutureRunnable(task, null, listener));
}
@Override public ScheduledFuture<?> schedule(Runnable command, TimeValue delay) { @Override public ScheduledFuture<?> schedule(Runnable command, TimeValue delay) {
return schedule(command, delay.millis(), TimeUnit.MILLISECONDS); return schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
} }

View File

@ -261,7 +261,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
@Override public void raiseNodeConnected(final DiscoveryNode node) { @Override public void raiseNodeConnected(final DiscoveryNode node) {
threadPool.execute(new Runnable() { threadPool.cached().execute(new Runnable() {
@Override public void run() { @Override public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) { for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node); connectionListener.onNodeConnected(node);

View File

@ -126,6 +126,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
return; return;
} }
try {
if (handler.spawn()) { if (handler.spawn()) {
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@SuppressWarnings({"unchecked"}) @Override public void run() { @SuppressWarnings({"unchecked"}) @Override public void run() {
@ -137,14 +138,13 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
}); });
} else { } else {
try {
//noinspection unchecked //noinspection unchecked
handler.handleResponse(streamable); handler.handleResponse(streamable);
}
} catch (Exception e) { } catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException("Failed to handle response", e)); handleException(handler, new ResponseHandlerFailureTransportException("Failed to handle response", e));
} }
} }
}
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) { private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
Throwable error; Throwable error;

View File

@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@Test(enabled = false) @Test
public class BlockingThreadPoolTest { public class BlockingThreadPoolTest {
@Test public void testBlocking() throws Exception { @Test public void testBlocking() throws Exception {