diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 142f33f9d07..627f37c61a4 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; @@ -40,6 +41,8 @@ import org.elasticsearch.transport.support.TransportStatus; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -50,6 +53,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new public class LocalTransport extends AbstractLifecycleComponent implements Transport { private final ThreadPool threadPool; + private final ThreadPoolExecutor workers; private final Version version; private volatile TransportServiceAdapter transportServiceAdapter; private volatile BoundTransportAddress boundAddress; @@ -58,13 +62,20 @@ public class LocalTransport extends AbstractLifecycleComponent implem private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); - public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local_address"; + public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address"; + public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers"; + public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue"; @Inject public LocalTransport(Settings settings, ThreadPool threadPool, Version version) { super(settings); this.threadPool = threadPool; this.version = version; + + int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings)); + int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); + logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); + this.workers = EsExecutors.newFixed(workerCount, queueSize, EsExecutors.daemonThreadFactory(this.settings, "local_transport")); } @Override @@ -106,6 +117,13 @@ public class LocalTransport extends AbstractLifecycleComponent implem @Override protected void doClose() throws ElasticsearchException { + workers.shutdown(); + try { + workers.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + workers.shutdownNow(); } @Override @@ -185,7 +203,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem transportServiceAdapter.sent(data.length); - threadPool.generic().execute(new Runnable() { + targetTransport.workers().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); @@ -193,8 +211,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem }); } - ThreadPool threadPool() { - return this.threadPool; + ThreadPoolExecutor workers() { + return this.workers; } protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index f4d5e83053a..f316e9ba69d 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -72,7 +72,7 @@ public class LocalTransportChannel implements TransportChannel { response.writeTo(stream); stream.close(); final byte[] data = bStream.bytes().toBytes(); - targetTransport.threadPool().generic().execute(new Runnable() { + targetTransport.workers().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, sourceTransport, version, null); @@ -98,7 +98,7 @@ public class LocalTransportChannel implements TransportChannel { too.close(); } final byte[] data = stream.bytes().toBytes(); - targetTransport.threadPool().generic().execute(new Runnable() { + targetTransport.workers().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, sourceTransport, version, null);