From 3ed848a495a494538a9071ccd447f23fa07fb7f2 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 15 Feb 2011 07:00:24 +0200 Subject: [PATCH] ThreadPool: Refactor into several pools, with configurable types per pool, closes #687. --- .../index/engine/SimpleEngineBenchmark.java | 3 +- .../stress/SingleThreadIndexingStress.java | 1 + .../BenchmarkNettyLargeMessages.java | 19 +- .../transport/TransportBenchmark.java | 33 ++- .../health/TransportClusterHealthAction.java | 4 + .../admin/cluster/node/info/NodeInfo.java | 29 +- .../node/info/TransportNodesInfoAction.java | 6 +- .../restart/TransportNodesRestartAction.java | 8 +- .../TransportNodesShutdownAction.java | 14 +- .../admin/cluster/node/stats/NodeStats.java | 29 +- .../node/stats/TransportNodesStatsAction.java | 6 +- .../TransportBroadcastPingAction.java | 4 + .../TransportShardReplicationPingAction.java | 4 + .../single/TransportSinglePingAction.java | 4 + .../state/TransportClusterStateAction.java | 4 + .../alias/TransportIndicesAliasesAction.java | 4 + .../analyze/TransportAnalyzeAction.java | 4 + .../TransportClearIndicesCacheAction.java | 4 + .../close/TransportCloseIndexAction.java | 4 + .../create/TransportCreateIndexAction.java | 4 + .../delete/TransportDeleteIndexAction.java | 4 + .../indices/flush/TransportFlushAction.java | 4 + .../TransportGatewaySnapshotAction.java | 4 + .../delete/TransportDeleteMappingAction.java | 3 + .../put/TransportPutMappingAction.java | 3 + .../open/TransportOpenIndexAction.java | 4 + .../optimize/TransportOptimizeAction.java | 4 + .../refresh/TransportRefreshAction.java | 4 + .../TransportUpdateSettingsAction.java | 4 + .../status/TransportIndicesStatusAction.java | 4 + .../TransportDeleteIndexTemplateAction.java | 4 + .../put/TransportPutIndexTemplateAction.java | 4 + .../action/bulk/TransportBulkAction.java | 8 +- .../action/bulk/TransportShardBulkAction.java | 4 + .../action/count/TransportCountAction.java | 4 + .../action/delete/TransportDeleteAction.java | 4 + .../index/TransportShardDeleteAction.java | 4 + .../TransportShardDeleteByQueryAction.java | 4 + .../action/get/TransportGetAction.java | 4 + .../action/index/TransportIndexAction.java | 4 + .../mlt/TransportMoreLikeThisAction.java | 5 +- .../percolate/TransportPercolateAction.java | 4 + .../action/search/TransportSearchAction.java | 5 +- .../search/TransportSearchScrollAction.java | 5 + ...TransportSearchDfsQueryAndFetchAction.java | 4 +- ...ransportSearchDfsQueryThenFetchAction.java | 8 +- .../TransportSearchQueryThenFetchAction.java | 4 +- ...nsportSearchScrollQueryAndFetchAction.java | 8 +- ...sportSearchScrollQueryThenFetchAction.java | 8 +- .../type/TransportSearchTypeAction.java | 8 +- .../AbstractListenableActionFuture.java | 6 +- .../TransportBroadcastOperationAction.java | 236 ++-------------- .../TransportMasterNodeOperationAction.java | 63 +++-- .../nodes/TransportNodesOperationAction.java | 42 ++- ...nsportIndexReplicationOperationAction.java | 13 +- ...portIndicesReplicationOperationAction.java | 22 +- ...nsportShardReplicationOperationAction.java | 92 ++++--- .../TransportSingleCustomOperationAction.java | 47 ++-- .../TransportShardSingleOperationAction.java | 48 ++-- .../TransportClientNodesService.java | 25 +- .../support/BaseClientTransportAction.java | 12 +- .../action/index/MappingUpdatedAction.java | 4 + .../action/index/NodeIndexCreatedAction.java | 8 +- .../action/index/NodeIndexDeletedAction.java | 8 +- .../index/NodeMappingCreatedAction.java | 12 +- .../action/shard/ShardStateAction.java | 24 +- .../metadata/MetaDataDeleteIndexService.java | 4 +- .../cluster/routing/RoutingService.java | 2 +- .../service/InternalClusterService.java | 6 +- .../zen/fd/MasterFaultDetection.java | 18 +- .../discovery/zen/fd/NodesFaultDetection.java | 15 +- .../zen/membership/MembershipAction.java | 15 +- .../zen/ping/multicast/MulticastZenPing.java | 16 +- .../zen/ping/unicast/UnicastZenPing.java | 24 +- .../publish/PublishClusterStateAction.java | 10 +- .../elasticsearch/gateway/GatewayService.java | 4 +- .../TransportNodesListGatewayMetaState.java | 4 + ...ransportNodesListGatewayStartedShards.java | 4 + .../org/elasticsearch/http/HttpServer.java | 2 +- .../gateway/IndexShardGatewayService.java | 4 +- .../index/shard/recovery/RecoverySource.java | 37 +-- .../index/shard/recovery/RecoveryTarget.java | 24 ++ .../shard/service/InternalIndexShard.java | 10 +- .../index/translog/TranslogService.java | 8 +- .../cluster/IndicesClusterStateService.java | 8 +- .../TransportNodesListShardStoreMetaData.java | 4 + .../jmx/action/GetJmxServiceUrlAction.java | 5 + .../node/info/RestNodesInfoAction.java | 3 - .../node/stats/RestNodesStatsAction.java | 3 - .../PublishRiverClusterStateAction.java | 14 +- .../action/SearchServiceTransportAction.java | 71 +++-- .../threadpool/FutureListener.java | 30 --- .../elasticsearch/threadpool/ThreadPool.java | 253 +++++++++++++----- .../threadpool/ThreadPoolInfo.java | 141 ---------- .../threadpool/ThreadPoolModule.java | 12 +- .../threadpool/ThreadPoolStats.java | 153 ----------- .../blocking/BlockingThreadPool.java | 114 -------- .../BlockingThreadPoolManagement.java | 83 ------ .../blocking/BlockingThreadPoolModule.java | 34 --- .../threadpool/cached/CachedThreadPool.java | 95 ------- .../cached/CachedThreadPoolManagement.java | 73 ----- .../cached/CachedThreadPoolModule.java | 34 --- .../threadpool/fixed/FixedThreadPool.java | 109 -------- .../fixed/FixedThreadPoolManagement.java | 73 ----- .../fixed/FixedThreadPoolModule.java | 34 --- .../threadpool/scaling/ScalingThreadPool.java | 104 ------- .../scaling/ScalingThreadPoolManagement.java | 83 ------ .../scaling/ScalingThreadPoolModule.java | 34 --- .../support/AbstractThreadPool.java | 222 --------------- .../BaseTransportRequestHandler.java | 3 - .../BaseTransportResponseHandler.java | 3 - .../FutureTransportResponseHandler.java | 5 + .../transport/PlainTransportFuture.java | 8 +- .../transport/TransportRequestHandler.java | 2 +- .../transport/TransportResponseHandler.java | 2 +- .../transport/TransportService.java | 4 +- .../VoidTransportResponseHandler.java | 19 +- .../transport/local/LocalTransport.java | 25 +- .../local/LocalTransportChannel.java | 4 +- .../netty/MessageChannelHandler.java | 87 +++--- .../ping/multicast/MulticastZenPingTests.java | 3 +- .../zen/ping/unicast/UnicastZenPingTests.java | 3 +- .../monitor/jvm/DeadlockSimulator.java | 101 ------- .../AbstractSimpleTransportTests.java | 47 +++- 124 files changed, 993 insertions(+), 2310 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/FutureListener.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolManagement.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolModule.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java delete mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/monitor/jvm/DeadlockSimulator.java diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java index 38b686d5190..b47eea4b50d 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java @@ -46,7 +46,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.memory.ByteBufferStore; import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.scaling.ScalingThreadPool; import java.io.File; import java.util.concurrent.*; @@ -305,7 +304,7 @@ public class SimpleEngineBenchmark { store.deleteContent(); - ThreadPool threadPool = new ScalingThreadPool(); + ThreadPool threadPool = new ThreadPool(); SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store), new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index())); diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java index 68e2aac69be..d2c42f612e9 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadIndexingStress.java @@ -67,6 +67,7 @@ public class SingleThreadIndexingStress { System.out.println("Indexing [" + COUNT + "] ..."); int i = 1; for (; i <= COUNT; i++) { +// client1.admin().cluster().preparePingSingle("test", "type1", Integer.toString(i)).execute().actionGet(); client1.prepareIndex("test", "type1").setId(Integer.toString(i)).setSource(source(Integer.toString(i), "test" + i)) .setCreate(false).execute().actionGet(); if ((i % 10000) == 0) { diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index 9b43dfd8edb..6eb59325be9 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.netty.NettyTransport; @@ -49,7 +48,7 @@ public class BenchmarkNettyLargeMessages { Settings settings = ImmutableSettings.settingsBuilder() .build(); - final ThreadPool threadPool = new CachedThreadPool(settings); + final ThreadPool threadPool = new ThreadPool(); final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); @@ -65,12 +64,12 @@ public class BenchmarkNettyLargeMessages { return new BenchmarkMessage(); } - @Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception { - channel.sendResponse(request); + @Override public String executor() { + return ThreadPool.Names.CACHED; } - @Override public boolean spawn() { - return true; + @Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception { + channel.sendResponse(request); } }); @@ -85,6 +84,10 @@ public class BenchmarkNettyLargeMessages { return new BenchmarkMessage(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(BenchmarkMessage response) { } @@ -108,6 +111,10 @@ public class BenchmarkNettyLargeMessages { return new BenchmarkMessage(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(BenchmarkMessage response) { } diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index 3001efeceb7..c32ffcdc633 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.fixed.FixedThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.local.LocalTransport; import org.elasticsearch.transport.netty.NettyTransport; @@ -54,14 +53,8 @@ public class TransportBenchmark { public abstract Transport newTransport(Settings settings, ThreadPool threadPool); } - public static ThreadPool newThreadPool(Settings settings) { -// return new ForkjoinThreadPool(settings); - return new FixedThreadPool(settings); -// return new CachedThreadPool(settings); - } - public static void main(String[] args) { - final boolean spawn = true; + final String executor = ThreadPool.Names.CACHED; final boolean waitForRequest = true; final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES); final int NUMBER_OF_CLIENTS = 1; @@ -74,10 +67,10 @@ public class TransportBenchmark { Settings settings = ImmutableSettings.settingsBuilder() .build(); - final ThreadPool serverThreadPool = newThreadPool(settings); + final ThreadPool serverThreadPool = new ThreadPool(); final TransportService serverTransportService = new TransportService(type.newTransport(settings, serverThreadPool), serverThreadPool).start(); - final ThreadPool clientThreadPool = newThreadPool(settings); + final ThreadPool clientThreadPool = new ThreadPool(); final TransportService clientTransportService = new TransportService(type.newTransport(settings, clientThreadPool), clientThreadPool).start(); final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress()); @@ -87,12 +80,12 @@ public class TransportBenchmark { return new BenchmarkMessage(); } - @Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception { - channel.sendResponse(request); + @Override public String executor() { + return executor; } - @Override public boolean spawn() { - return spawn; + @Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception { + channel.sendResponse(request); } }); @@ -105,6 +98,10 @@ public class TransportBenchmark { return new BenchmarkMessage(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(BenchmarkMessage response) { } @@ -128,6 +125,10 @@ public class TransportBenchmark { return new BenchmarkMessage(); } + @Override public String executor() { + return executor; + } + @Override public void handleResponse(BenchmarkMessage response) { if (response.id != id) { System.out.println("NO ID MATCH [" + response.id + "] and [" + id + "]"); @@ -139,10 +140,6 @@ public class TransportBenchmark { exp.printStackTrace(); latch.countDown(); } - - @Override public boolean spawn() { - return spawn; - } }; if (waitForRequest) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index fac28415870..36305ae30bc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -50,6 +50,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc this.clusterName = clusterName; } + @Override protected String executor() { + return ThreadPool.Names.CACHED; + } + @Override protected String transportAction() { return TransportActions.Admin.Cluster.HEALTH; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index 3c60cdc7204..2f311fab919 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -30,7 +30,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.network.NetworkInfo; import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; -import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.transport.TransportInfo; import java.io.IOException; @@ -55,15 +54,13 @@ public class NodeInfo extends NodeOperationResponse { private NetworkInfo network; - private ThreadPoolInfo threadPool; - private TransportInfo transport; NodeInfo() { } public NodeInfo(DiscoveryNode node, ImmutableMap attributes, Settings settings, - OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool, + OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, TransportInfo transport) { super(node); this.attributes = attributes; @@ -72,7 +69,6 @@ public class NodeInfo extends NodeOperationResponse { this.process = process; this.jvm = jvm; this.network = network; - this.threadPool = threadPool; this.transport = transport; } @@ -160,20 +156,6 @@ public class NodeInfo extends NodeOperationResponse { return network(); } - /** - * Thread Pool level information. - */ - public ThreadPoolInfo threadPool() { - return threadPool; - } - - /** - * Thread Pool level information. - */ - public ThreadPoolInfo getThreadPool() { - return threadPool(); - } - public TransportInfo transport() { return transport; } @@ -209,9 +191,6 @@ public class NodeInfo extends NodeOperationResponse { if (in.readBoolean()) { network = NetworkInfo.readNetworkInfo(in); } - if (in.readBoolean()) { - threadPool = ThreadPoolInfo.readThreadPoolInfo(in); - } if (in.readBoolean()) { transport = TransportInfo.readTransportInfo(in); } @@ -249,12 +228,6 @@ public class NodeInfo extends NodeOperationResponse { out.writeBoolean(true); network.writeTo(out); } - if (threadPool == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - threadPool.writeTo(out); - } if (transport == null) { out.writeBoolean(false); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index a933b65becc..3dad2b17898 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -61,6 +61,10 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction().putAll(nodeAttributes).remove(key).immutableMap(); } + @Override protected String executor() { + return ThreadPool.Names.CACHED; + } + @Override protected String transportAction() { return TransportActions.Admin.Cluster.Node.INFO; } @@ -100,7 +104,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction { if (requestsByShard.isEmpty()) { // all failures, no shards to process, send a response if (bulkRequest.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); } @@ -264,7 +264,7 @@ public class TransportBulkAction extends BaseAction { private void finishHim() { if (bulkRequest.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); } @@ -305,8 +305,8 @@ public class TransportBulkAction extends BaseAction { }); } - @Override public boolean spawn() { - return true; // spawn, we do some work here... + @Override public String executor() { + return ThreadPool.Names.SAME; } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 93b5c8ba493..085be10ede4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -72,6 +72,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation this.mappingUpdatedAction = mappingUpdatedAction; } + @Override protected String executor() { + return ThreadPool.Names.INDEX; + } + @Override protected boolean checkWriteConsistency() { return true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index 52ab09912b9..eed72e8a3d8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -54,6 +54,10 @@ public class TransportCountAction extends TransportBroadcastOperationAction listener) { if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(deleteRequest.index())) { createIndexAction.execute(new CreateIndexRequest(deleteRequest.index()), new ActionListener() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java index be0faed23ca..a33bb99c084 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java @@ -62,6 +62,10 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati return "indices/index/b_shard/delete"; } + @Override protected String executor() { + return ThreadPool.Names.INDEX; + } + @Override protected void checkBlock(ShardDeleteRequest request, ClusterState state) { state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 01deed399d1..c895b2a4fc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -48,6 +48,10 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication return true; } + @Override protected String executor() { + return ThreadPool.Names.INDEX; + } + @Override protected ShardDeleteByQueryRequest newRequestInstance() { return new ShardDeleteByQueryRequest(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 7779022b1e7..b891e236801 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -69,6 +69,10 @@ public class TransportGetAction extends TransportShardSingleOperationAction 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (final DfsSearchResult dfsResult : dfsResults) { DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); @@ -115,7 +115,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs); if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executeSecondPhase(dfsResult, counter, node, querySearchRequest); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 2d22196e128..29416dc78d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -104,7 +104,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (localOperations > 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (final DfsSearchResult dfsResult : dfsResults) { DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); @@ -122,7 +122,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (node.id().equals(nodes.localNodeId())) { final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs); if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executeQuery(dfsResult, counter, querySearchRequest, node); } @@ -190,7 +190,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (localOperations > 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (final Map.Entry entry : docIdsToLoad.entrySet()) { DiscoveryNode node = nodes.get(entry.getKey().nodeId()); @@ -208,7 +208,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (node.id().equals(nodes.localNodeId())) { final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue()); if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executeFetch(entry.getKey(), counter, fetchSearchRequest, node); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 6011c1211aa..3c1981237d8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -104,7 +104,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (localOperations > 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (final Map.Entry entry : docIdsToLoad.entrySet()) { DiscoveryNode node = nodes.get(entry.getKey().nodeId()); @@ -122,7 +122,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (node.id().equals(nodes.localNodeId())) { final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue()); if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executeFetch(entry.getKey(), counter, fetchSearchRequest, node); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index fca88274c34..9f50d92f887 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -129,7 +129,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (localOperations > 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (Tuple target : scrollId.values()) { DiscoveryNode node = nodes.get(target.v1()); @@ -145,7 +145,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executePhase(node, target.v2()); } @@ -217,7 +217,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent protected void invokeListener(final SearchResponse response) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(response); } @@ -229,7 +229,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent protected void invokeListener(final Throwable t) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onFailure(t); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index a64f985f9be..b2c7ccf5475 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -135,7 +135,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (localOperations > 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (Tuple target : scrollId.values()) { DiscoveryNode node = nodes.get(target.v1()); @@ -151,7 +151,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { executeQueryPhase(counter, node, target.v2()); } @@ -246,7 +246,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent protected void invokeListener(final SearchResponse response) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(response); } @@ -258,7 +258,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent protected void invokeListener(final Throwable t) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onFailure(t); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index d40839ce8b2..b44ea19c75e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -144,7 +144,7 @@ public abstract class TransportSearchTypeAction extends BaseAction 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { request.beforeLocalFork(); - threadPool.execute(new Runnable() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { for (final ShardIterator shardIt : shardsIts) { final ShardRouting shard = shardIt.reset().nextActiveOrNull(); @@ -166,7 +166,7 @@ public abstract class TransportSearchTypeAction extends BaseAction extends AdapterAction } public boolean listenerThreaded() { - return listenerThreaded; + return false; // we control execution of the listener } public ThreadPool threadPool() { @@ -107,9 +107,9 @@ public abstract class AbstractListenableActionFuture extends AdapterAction private void executeListener(final Object listener) { if (listenerThreaded) { if (listener instanceof Runnable) { - threadPool.execute((Runnable) listener); + threadPool.cached().execute((Runnable) listener); } else { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { ActionListener lst = (ActionListener) listener; try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index eaad978b0ff..a4c1b0e2e63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -30,24 +30,13 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.ImmutableList; -import org.elasticsearch.common.io.ThrowableObjectInputStream; -import org.elasticsearch.common.io.ThrowableObjectOutputStream; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; -import java.io.IOException; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; -import static org.elasticsearch.common.collect.Lists.*; - /** * @author kimchy (shay.banon) */ @@ -60,14 +49,22 @@ public abstract class TransportBroadcastOperationAction listener) { @@ -78,6 +75,8 @@ public abstract class TransportBroadcastOperationAction 0) { if (request.operationThreading() == BroadcastOperationThreading.SINGLE_THREAD) { request.beforeLocalFork(); - threadPool.execute(new Runnable() { + threadPool.executor(executor).execute(new Runnable() { @Override public void run() { for (final ShardIterator shardIt : shardsIts) { final ShardRouting shard = nextShardOrNull(shardIt.reset()); @@ -225,7 +224,7 @@ public abstract class TransportBroadcastOperationAction() { + transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler() { @Override public ShardResponse newInstance() { return newShardResponse(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(ShardResponse response) { onOperation(shard, response, false); } @@ -259,11 +262,6 @@ public abstract class TransportBroadcastOperationAction { @@ -392,188 +390,12 @@ public abstract class TransportBroadcastOperationAction { - - @Override public ShardsRequest newInstance() { - return new ShardsRequest(); - } - - @Override public void messageReceived(final ShardsRequest request, final TransportChannel channel) throws Exception { - if (request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD) { - final AtomicInteger counter = new AtomicInteger(request.requests().size()); - final AtomicInteger index = new AtomicInteger(); - final AtomicReferenceArray results = new AtomicReferenceArray(request.requests().size()); - for (final ShardRequest singleRequest : request.requests()) { - threadPool.execute(new Runnable() { - @Override public void run() { - int arrIndex = index.getAndIncrement(); - try { - results.set(arrIndex, shardOperation(singleRequest)); - } catch (Exception e) { - results.set(arrIndex, new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e)); - } - if (counter.decrementAndGet() == 0) { - // we are done - List responses = newArrayListWithCapacity(request.requests().size()); - List exceptions = null; - for (int i = 0; i < results.length(); i++) { - Object result = results.get(i); - if (result instanceof BroadcastShardOperationFailedException) { - if (exceptions == null) { - exceptions = newArrayList(); - } - exceptions.add((BroadcastShardOperationFailedException) result); - } else { - responses.add((ShardResponse) result); - } - } - try { - channel.sendResponse(new ShardsResponse(responses, exceptions)); - } catch (IOException e) { - logger.warn("Failed to send broadcast response", e); - } - } - } - }); - } - } else { - // single thread - threadPool.execute(new Runnable() { - @Override public void run() { - List responses = newArrayListWithCapacity(request.requests().size()); - List exceptions = null; - for (ShardRequest singleRequest : request.requests()) { - try { - responses.add(shardOperation(singleRequest)); - } catch (Exception e) { - if (exceptions == null) { - exceptions = newArrayList(); - } - exceptions.add(new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e)); - } - } - try { - channel.sendResponse(new ShardsResponse(responses, exceptions)); - } catch (IOException e) { - logger.warn("Failed to send broadcast response", e); - } - } - }); - } - } - - @Override public boolean spawn() { - // we handle the forking here... - return false; - } - } - - class ShardsResponse implements Streamable { - - private List responses; - - private List exceptions; - - ShardsResponse() { - } - - ShardsResponse(List responses, List exceptions) { - this.responses = responses; - this.exceptions = exceptions; - } - - @Override public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - responses = newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - ShardResponse response = newShardResponse(); - response.readFrom(in); - responses.add(response); - } - size = in.readVInt(); - if (size > 0) { - exceptions = newArrayListWithCapacity(size); - ThrowableObjectInputStream toi = new ThrowableObjectInputStream(in); - for (int i = 0; i < size; i++) { - try { - exceptions.add((BroadcastShardOperationFailedException) toi.readObject()); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to load class", e); - } - } - } - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(responses.size()); - for (BroadcastShardOperationResponse response : responses) { - response.writeTo(out); - } - - if (exceptions == null || exceptions.isEmpty()) { - out.writeInt(0); - } else { - out.writeInt(exceptions.size()); - ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(out); - for (BroadcastShardOperationFailedException ex : exceptions) { - too.writeObject(ex); - } - too.flush(); - } - } - } - - class ShardsRequest implements Streamable { - - private BroadcastOperationThreading operationThreading = BroadcastOperationThreading.SINGLE_THREAD; - - private List requests; - - ShardsRequest() { - } - - public List requests() { - return this.requests; - } - - public BroadcastOperationThreading operationThreading() { - return operationThreading; - } - - ShardsRequest(BroadcastOperationThreading operationThreading, List requests) { - this.operationThreading = operationThreading; - this.requests = requests; - } - - @Override public void readFrom(StreamInput in) throws IOException { - operationThreading = BroadcastOperationThreading.fromId(in.readByte()); - int size = in.readVInt(); - if (size == 0) { - requests = ImmutableList.of(); - } else { - requests = newArrayListWithCapacity(in.readVInt()); - for (int i = 0; i < size; i++) { - ShardRequest request = newShardRequest(); - request.readFrom(in); - requests.add(request); - } - } - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeByte(operationThreading.id()); - out.writeVInt(requests.size()); - for (BroadcastShardOperationRequest request : requests) { - request.writeTo(out); - } - } - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 359f73ec738..3a8cc1e5468 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -49,17 +49,25 @@ public abstract class TransportMasterNodeOperationAction() { + transportService.sendRequest(nodes.masterNode(), transportAction, request, new BaseTransportResponseHandler() { @Override public Response newInstance() { return newResponse(); } @@ -177,6 +185,10 @@ public abstract class TransportMasterNodeOperationAction() { - @Override public Response newInstance() { - return newResponse(); + // we just send back a response, no need to fork a listener + request.listenerThreaded(false); + execute(request, new ActionListener() { + @Override public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (Exception e) { + onFailure(e); } + } - @Override public void handleResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - logger.error("Failed to send response", e); - } + @Override public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send response", e1); } - - @Override public void handleException(TransportException exp) { - try { - channel.sendResponse(exp); - } catch (Exception e) { - logger.error("Failed to send response", e); - } - } - }); - } + } + }); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java index 13dfe347829..876f1d68421 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java @@ -51,6 +51,10 @@ public abstract class TransportNodesOperationAction listener) { @@ -71,6 +79,8 @@ public abstract class TransportNodesOperationAction() { + transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler() { @Override public NodeResponse newInstance() { return newNodeResponse(); } @@ -168,8 +178,8 @@ public abstract class TransportNodesOperationAction indexAction; + + final String transportAction; + @Inject public TransportIndicesReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexReplicationOperationAction indexAction) { super(settings); @@ -53,7 +56,9 @@ public abstract class TransportIndicesReplicationOperationAction listener) { @@ -79,7 +84,7 @@ public abstract class TransportIndicesReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest); protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest); @@ -150,6 +162,10 @@ public abstract class TransportShardReplicationOperationAction { @@ -185,16 +197,13 @@ public abstract class TransportShardReplicationOperationAction() { + transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler() { @Override public Response newInstance() { return newResponseInstance(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(Response response) { listener.onResponse(response); } @@ -352,10 +365,6 @@ public abstract class TransportShardReplicationOperationAction listener) { @@ -68,6 +76,8 @@ public abstract class TransportSingleCustomOperationAction() { + transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler() { @Override public Response newInstance() { return newResponse(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(final Response response) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(response); } @@ -222,11 +236,6 @@ public abstract class TransportSingleCustomOperationAction listener) { @@ -68,6 +76,8 @@ public abstract class TransportShardSingleOperationAction() { + transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler() { + @Override public Response newInstance() { return newResponse(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(final Response response) { if (request.listenerThreaded()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { listener.onResponse(response); } @@ -187,11 +202,6 @@ public abstract class TransportShardSingleOperationAction { @@ -260,7 +270,11 @@ public abstract class TransportShardSingleOperationAction() { + NodesInfoResponse nodeInfo = transportService.submitRequest(node, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfoRequest("_local"), new FutureTransportResponseHandler() { @Override public NodesInfoResponse newInstance() { return new NodesInfoResponse(); } - - @Override public void handleResponse(NodesInfoResponse response) { - } - - @Override public void handleException(TransportException exp) { - } }).txGet(); if (!clusterName.equals(nodeInfo.clusterName())) { logger.warn("Node {} not part of the cluster {}, ignoring...", node, clusterName); @@ -198,7 +189,7 @@ public class TransportClientNodesService extends AbstractComponent { nodes = new ImmutableList.Builder().addAll(newNodes).build(); if (!closed) { - nodesSamplerFuture = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.ExecutionType.THREADED); + nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this); } } } @@ -213,7 +204,7 @@ public class TransportClientNodesService extends AbstractComponent { final CountDownLatch latch = new CountDownLatch(listedNodes.size()); final CopyOnWriteArrayList nodesInfoResponses = new CopyOnWriteArrayList(); for (final DiscoveryNode listedNode : listedNodes) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { try { transportService.connectToNode(listedNode); // make sure we are connected to it @@ -223,6 +214,10 @@ public class TransportClientNodesService extends AbstractComponent { return new NodesInfoResponse(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(NodesInfoResponse response) { nodesInfoResponses.add(response); latch.countDown(); @@ -271,7 +266,7 @@ public class TransportClientNodesService extends AbstractComponent { nodes = new ImmutableList.Builder().addAll(newNodes).build(); if (!closed) { - nodesSamplerFuture = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.ExecutionType.THREADED); + nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/support/BaseClientTransportAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/support/BaseClientTransportAction.java index 31521acc970..e20f1146486 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/support/BaseClientTransportAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/support/BaseClientTransportAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.client.transport.action.ClientTransportAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -73,6 +74,13 @@ public abstract class BaseClientTransportAction { @@ -198,6 +194,10 @@ public class ShardStateAction extends AbstractComponent { innerShardStarted(request.shardRouting, request.reason); channel.sendResponse(VoidStreamable.INSTANCE); } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } } private static class ShardRoutingEntry implements Streamable { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index e865dff563a..1cce3c73bff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -129,12 +129,12 @@ public class MetaDataDeleteIndexService extends AbstractComponent { }; nodeIndexDeletedAction.add(nodeIndexDeleteListener); - threadPool.schedule(new Runnable() { + threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { @Override public void run() { listener.onResponse(new Response(false)); nodeIndexDeletedAction.remove(nodeIndexDeleteListener); } - }, request.timeout, ThreadPool.ExecutionType.DEFAULT); + }); } return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 49973e1a11d..691273197e3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -93,7 +93,7 @@ public class RoutingService extends AbstractLifecycleComponent i // also, if the routing table changed, it means that we have new indices, or shard have started // or failed, we want to apply this as fast as possible routingTableDirty = true; - threadPool.execute(new RoutingTableUpdater()); + threadPool.cached().execute(new RoutingTableUpdater()); } else { if (event.nodesAdded()) { routingTableDirty = true; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b91b94001e2..f9328ab8f55 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -95,7 +95,7 @@ public class InternalClusterService extends AbstractLifecycleComponent implem sendPingRequest(id, true); // try and send another ping request halfway through (just in case someone woke up during it...) // this can be a good trade-off to nailing the initial lookup or un-delivered messages - threadPool.schedule(new Runnable() { + threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { try { sendPingRequest(id, false); @@ -208,13 +208,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem logger.warn("[{}] failed to send second ping request", e, id); } } - }, TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.ExecutionType.THREADED); - threadPool.schedule(new Runnable() { + }); + threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { ConcurrentMap responses = receivedResponses.remove(id); listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); } - }, timeout, ThreadPool.ExecutionType.THREADED); + }); } private void sendPingRequest(int id, boolean remove) { @@ -266,8 +266,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem channel.sendResponse(VoidStreamable.INSTANCE); } - @Override public boolean spawn() { - return false; + @Override public String executor() { + return ThreadPool.Names.SAME; } } @@ -352,7 +352,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem // connect to the node if possible try { transportService.connectToNode(requestingNode); - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } @@ -363,7 +363,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } }); } else { - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index d685cbfbd64..eeb9485571f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -152,13 +152,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen final int id = pingIdGenerator.incrementAndGet(); receivedResponses.put(id, new ConcurrentHashMap()); sendPings(id, timeout, false); - threadPool.schedule(new Runnable() { + threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { sendPings(id, timeout, true); ConcurrentMap responses = receivedResponses.remove(id); listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); } - }, timeout, ThreadPool.ExecutionType.THREADED); + }); } private void sendPings(final int id, TimeValue timeout, boolean wait) { @@ -202,6 +202,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen return new UnicastPingResponse(); } + @Override public String executor() { + return ThreadPool.Names.SAME; + } + @Override public void handleResponse(UnicastPingResponse response) { logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses)); try { @@ -243,10 +247,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen logger.warn("failed to send ping to [{}]", exp, node); } } - - @Override public boolean spawn() { - return false; - } }); } if (wait) { @@ -260,11 +260,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { temporalResponses.add(request.pingResponse); - threadPool.schedule(new Runnable() { + threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() { @Override public void run() { temporalResponses.remove(request.pingResponse); } - }, TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.ExecutionType.DEFAULT); + }); List pingResponses = newArrayList(temporalResponses); DiscoveryNodes discoNodes = nodesProvider.nodes(); @@ -286,12 +286,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen return new UnicastPingRequest(); } - @Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception { - channel.sendResponse(handlePingRequest(request)); + @Override public String executor() { + return ThreadPool.Names.SAME; } - @Override public boolean spawn() { - return false; + @Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(handlePingRequest(request)); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 5a4cae44d6f..a2d12873279 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; @@ -71,7 +72,7 @@ public class PublishClusterStateAction extends AbstractComponent { new PublishClusterStateRequest(clusterState), TransportRequestOptions.options().withHighType(), - new VoidTransportResponseHandler(false) { + new VoidTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); } @@ -112,11 +113,8 @@ public class PublishClusterStateAction extends AbstractComponent { channel.sendResponse(VoidStreamable.INSTANCE); } - /** - * No need to spawn, we add submit a new cluster state directly. This allows for faster application. - */ - @Override public boolean spawn() { - return false; + @Override public String executor() { + return ThreadPool.Names.SAME; } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 89bb766124d..fa08d13388a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -191,13 +191,13 @@ public class GatewayService extends AbstractLifecycleComponent i if (!ignoreTimeout && recoverAfterTime != null) { if (scheduledRecovery.compareAndSet(false, true)) { logger.debug("delaying initial state recovery for [{}]", recoverAfterTime); - threadPool.schedule(new Runnable() { + threadPool.schedule(recoverAfterTime, ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { if (recovered.compareAndSet(false, true)) { gateway.performStateRecovery(recoveryListener); } } - }, recoverAfterTime, ThreadPool.ExecutionType.THREADED); + }); } } else { if (recovered.compareAndSet(false, true)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java index 1cd20b1c909..4eff271ccac 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java @@ -61,6 +61,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA return execute(new Request(nodesIds).timeout(timeout)); } + @Override protected String executor() { + return ThreadPool.Names.CACHED; + } + @Override protected String transportAction() { return "/gateway/local/meta-state"; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java index 83e7c638950..0c2d0a3a9cd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java @@ -61,6 +61,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat return execute(new Request(nodesIds).timeout(timeout)); } + @Override protected String executor() { + return ThreadPool.Names.CACHED; + } + @Override protected String transportAction() { return "/gateway/local/started-shards"; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java index 1e4d5e2d720..46e8b2ec3e8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpServer.java @@ -119,7 +119,7 @@ public class HttpServer extends AbstractLifecycleComponent { } } else { if (httpHandler.spawn()) { - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { try { httpHandler.handleRequest(request, channel); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 2eadff53313..99f10659da1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -316,7 +316,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem if (logger.isDebugEnabled()) { logger.debug("scheduling snapshot every [{}]", snapshotInterval); } - snapshotScheduleFuture = threadPool.schedule(new SnapshotRunnable(), snapshotInterval, ThreadPool.ExecutionType.THREADED); + snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, new SnapshotRunnable()); } } @@ -332,7 +332,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem } // schedule it again if (indexShard.state() != IndexShardState.CLOSED) { - snapshotScheduleFuture = threadPool.schedule(this, snapshotInterval, ThreadPool.ExecutionType.THREADED); + snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 3d6893a764c..511ea1ef566 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -144,7 +144,7 @@ public class RecoverySource extends AbstractComponent { RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet(); final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final AtomicReference lastException = new AtomicReference(); @@ -167,7 +167,7 @@ public class RecoverySource extends AbstractComponent { long position = indexInput.getFilePointer(); indexInput.readBytes(buf, 0, toRead, false); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), - TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); + TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } indexInput.close(); @@ -195,7 +195,7 @@ public class RecoverySource extends AbstractComponent { // now, set the clean files request Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); @@ -212,7 +212,7 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); int totalOperations = sendSnapshot(snapshot); @@ -229,7 +229,7 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); int totalOperations = sendSnapshot(snapshot); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); if (request.markAsRelocated()) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started try { @@ -258,7 +258,7 @@ public class RecoverySource extends AbstractComponent { totalOperations++; if (++counter == translogBatchSize) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); counter = 0; operations = Lists.newArrayList(); } @@ -266,7 +266,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; } @@ -280,26 +280,13 @@ public class RecoverySource extends AbstractComponent { return new StartRecoveryRequest(); } - @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception { - // we don't spawn, but we execute the expensive recovery process on a cached thread pool - threadPool.cached().execute(new Runnable() { - @Override public void run() { - try { - RecoveryResponse response = recover(request); - channel.sendResponse(response); - } catch (Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - // ignore - } - } - } - }); + @Override public String executor() { + return ThreadPool.Names.CACHED; } - @Override public boolean spawn() { - return false; + @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception { + RecoveryResponse response = recover(request); + channel.sendResponse(response); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index fa936deb565..1e8bcbe9133 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -278,6 +278,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryPrepareForTranslogOperationsRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); @@ -299,6 +303,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryFinalizeRecoveryRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId()); @@ -321,6 +329,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryTranslogOperationsRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); for (Translog.Operation operation : request.operations()) { @@ -344,6 +356,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryFilesInfoRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); @@ -368,6 +384,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryCleanFilesRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); for (String existingFile : shard.store().directory().listAll()) { @@ -390,6 +410,10 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryFileChunkRequest(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 885345e27a8..3eed4a59cc8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -581,7 +581,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private void startScheduledTasksIfNeeded() { if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(new EngineRefresher(), refreshInterval, ThreadPool.ExecutionType.DEFAULT); + refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); logger.debug("scheduling refresher every {}", refreshInterval); } else { logger.debug("scheduled refresher disabled"); @@ -590,7 +590,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I // so, make sure we periodically call it, this need to be a small enough value so mergine will actually // happen and reduce the number of segments if (optimizeInterval.millis() > 0) { - optimizeScheduleFuture = threadPool.schedule(new EngineOptimizer(), optimizeInterval, ThreadPool.ExecutionType.THREADED); + optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, new EngineOptimizer()); logger.debug("scheduling optimizer / merger every {}", optimizeInterval); } else { logger.debug("scheduled optimizer / merger disabled"); @@ -609,7 +609,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule if (!engine().refreshNeeded()) { if (state != IndexShardState.CLOSED) { - refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT); + refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); } return; } @@ -635,7 +635,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I logger.warn("Failed to perform scheduled engine refresh", e); } if (state != IndexShardState.CLOSED) { - refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT); + refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this); } } }); @@ -663,7 +663,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I logger.warn("Failed to perform scheduled engine optimize/merge", e); } if (state != IndexShardState.CLOSED) { - optimizeScheduleFuture = threadPool.schedule(this, optimizeInterval, ThreadPool.ExecutionType.THREADED); + optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java index e5711332ccb..80dcd989092 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -72,7 +72,7 @@ public class TranslogService extends AbstractIndexShardComponent { logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod); - this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.DEFAULT); + this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush()); } @@ -109,11 +109,11 @@ public class TranslogService extends AbstractIndexShardComponent { return; } - future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT); + future = threadPool.schedule(interval, ThreadPool.Names.SAME, this); } private void asyncFlushAndReschedule() { - threadPool.cached().execute(new Runnable() { + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { @Override public void run() { try { indexShard.flush(new Engine.Flush()); @@ -127,7 +127,7 @@ public class TranslogService extends AbstractIndexShardComponent { lastFlushTime = System.currentTimeMillis(); if (indexShard.state() != IndexShardState.CLOSED) { - future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT); + future = threadPool.schedule(interval, ThreadPool.Names.SAME, this); } } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index fd36a03e005..ff9fba6f66e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -178,7 +178,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { @@ -376,6 +385,10 @@ public class SearchServiceTransportAction extends AbstractComponent { QuerySearchResult result = searchService.executeQueryPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchQueryByIdTransportHandler extends BaseTransportRequestHandler { @@ -390,6 +403,10 @@ public class SearchServiceTransportAction extends AbstractComponent { QuerySearchResult result = searchService.executeQueryPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchQueryScrollTransportHandler extends BaseTransportRequestHandler { @@ -404,6 +421,10 @@ public class SearchServiceTransportAction extends AbstractComponent { ScrollQuerySearchResult result = searchService.executeQueryPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchQueryFetchTransportHandler extends BaseTransportRequestHandler { @@ -418,6 +439,10 @@ public class SearchServiceTransportAction extends AbstractComponent { QueryFetchSearchResult result = searchService.executeFetchPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchQueryQueryFetchTransportHandler extends BaseTransportRequestHandler { @@ -432,6 +457,10 @@ public class SearchServiceTransportAction extends AbstractComponent { QueryFetchSearchResult result = searchService.executeFetchPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchFetchByIdTransportHandler extends BaseTransportRequestHandler { @@ -446,6 +475,10 @@ public class SearchServiceTransportAction extends AbstractComponent { FetchSearchResult result = searchService.executeFetchPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } private class SearchQueryFetchScrollTransportHandler extends BaseTransportRequestHandler { @@ -460,5 +493,9 @@ public class SearchServiceTransportAction extends AbstractComponent { ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request); channel.sendResponse(result); } + + @Override public String executor() { + return ThreadPool.Names.SEARCH; + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/FutureListener.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/FutureListener.java deleted file mode 100644 index 7b9cf5b6d8e..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/FutureListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool; - -/** - * @author kimchy (Shay Banon) - */ -public interface FutureListener { - - void onResult(T result); - - void onException(Exception e); -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 9810c4ec281..1c552bbc86b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -19,103 +19,210 @@ package org.elasticsearch.threadpool; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.MoreExecutors; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.concurrent.*; + +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.elasticsearch.common.unit.TimeValue.*; /** * @author kimchy (shay.banon) */ -public interface ThreadPool extends Executor { +public class ThreadPool extends AbstractComponent { - ThreadPoolInfo info(); + public static class Names { + public static final String SAME = "same"; + public static final String CACHED = "cached"; + public static final String INDEX = "index"; + public static final String SEARCH = "search"; + public static final String PERCOLATE = "percolate"; + public static final String MANAGEMENT = "management"; + public static final String SNAPSHOT = "snapshot"; + } - ThreadPoolStats stats(); + private final ImmutableMap executors; - /** - * The minimum number of threads in the thread pool. - */ - int getMinThreads(); + private final ScheduledExecutorService scheduler; - /** - * The maximum number of threads in the thread pool. - */ - int getMaxThreads(); + public ThreadPool() { + this(ImmutableSettings.Builder.EMPTY_SETTINGS); + } - /** - * The size of scheduler threads. - */ - int getSchedulerThreads(); + @Inject public ThreadPool(Settings settings) { + super(settings); - /** - * Returns the current number of threads in the pool. - * - * @return the number of threads - */ - int getPoolSize(); + Map groupSettings = settings.getGroups("threadpool"); - /** - * Returns the approximate number of threads that are actively - * executing tasks. - * - * @return the number of threads - */ - int getActiveCount(); + Map executors = Maps.newHashMap(); + executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build())); + executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS)); + executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS)); + executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS)); + executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), ImmutableSettings.Builder.EMPTY_SETTINGS)); + executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS)); + executors.put(Names.SAME, MoreExecutors.sameThreadExecutor()); + this.executors = ImmutableMap.copyOf(executors); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]")); + } - /** - * The size of the scheduler thread pool. - */ - int getSchedulerPoolSize(); + public Executor cached() { + return executor(Names.CACHED); + } - /** - * The approximate number of threads that are actively executing scheduled - * tasks. - */ - int getSchedulerActiveCount(); + public Executor executor(String name) { + Executor executor = executors.get(name); + if (executor == null) { + throw new ElasticSearchIllegalArgumentException("No executor found for [" + name + "]"); + } + return executor; + } - /** - * Returns true if the thread pool has started. - */ - boolean isStarted(); + public ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval) { + return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); + } - /** - * Returns a cached executor that will always allocate threads. - */ - Executor cached(); + public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + if (!Names.SAME.equals(name)) { + command = new ThreadedRunnable(command, executor(name)); + } + return scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS); + } - void shutdownNow(); + public void shutdown() { + scheduler.shutdown(); + for (Executor executor : executors.values()) { + if (executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor).shutdown(); + } + } + } - /** - * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. - * Invocation has no additional effect if already shut down. - */ - void shutdown(); + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + boolean result = scheduler.awaitTermination(timeout, unit); + for (Executor executor : executors.values()) { + if (executor instanceof ThreadPoolExecutor) { + result &= ((ThreadPoolExecutor) executor).awaitTermination(timeout, unit); + } + } + return result; + } - boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; + public void shutdownNow() { + scheduler.shutdownNow(); + for (Executor executor : executors.values()) { + if (executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor).shutdownNow(); + } + } + } - void execute(Runnable command); + private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { + if (settings == null) { + settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + } + String type = settings.get("type", defaultType); + ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[" + name + "]"); + if ("cached".equals(type)) { + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); + logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + keepAlive.millis(), TimeUnit.MILLISECONDS, + new SynchronousQueue(), + threadFactory); + } else if ("fixed".equals(type)) { + int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); + logger.debug("creating thread_pool [{}], type [{}], size [{}]", name, type, size); + return new ThreadPoolExecutor(size, size, + 0L, TimeUnit.MILLISECONDS, + new LinkedTransferQueue(), + threadFactory); + } else if ("scaling".equals(type)) { + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); + int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); + int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); + logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + return DynamicExecutors.newScalingThreadPool(min, size, keepAlive.millis(), threadFactory); + } else if ("blocking".equals(type)) { + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); + int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); + int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); + SizeValue capacity = settings.getAsSize("capacity", defaultSettings.getAsSize("capacity", new SizeValue(0))); + TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60))); + logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, keepAlive, waitTime); + return DynamicExecutors.newBlockingThreadPool(min, size, keepAlive.millis(), (int) capacity.singles(), waitTime.millis(), threadFactory); + } + throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); + } - /** - * Scheduled a task. Note, when using {@link ExecutionType#DEFAULT}, make sure to not - * execute long running blocking tasks. - */ - ScheduledFuture schedule(Runnable command, TimeValue delay, ExecutionType executionType); + class LoggingRunnable implements Runnable { - /** - * Schedule a repeating task with a task that is very short lived. - */ - ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval); + private final Runnable runnable; - /** - * Returns an estimated current time in milliseconds. - */ - long estimatedCurrentTimeInMillis(); + LoggingRunnable(Runnable runnable) { + this.runnable = runnable; + } - static enum ExecutionType { - DEFAULT, - THREADED + @Override public void run() { + try { + runnable.run(); + } catch (Exception e) { + logger.warn("failed to run {}", e, runnable.toString()); + } + } + + @Override public int hashCode() { + return runnable.hashCode(); + } + + @Override public boolean equals(Object obj) { + return runnable.equals(obj); + } + + @Override public String toString() { + return "[threaded] " + runnable.toString(); + } + } + + class ThreadedRunnable implements Runnable { + + private final Runnable runnable; + + private final Executor executor; + + ThreadedRunnable(Runnable runnable, Executor executor) { + this.runnable = runnable; + this.executor = executor; + } + + @Override public void run() { + executor.execute(runnable); + } + + @Override public int hashCode() { + return runnable.hashCode(); + } + + @Override public boolean equals(Object obj) { + return runnable.equals(obj); + } + + @Override public String toString() { + return "[threaded] " + runnable.toString(); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java deleted file mode 100644 index 2c33c609fbd..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import java.io.IOException; -import java.io.Serializable; - -/** - * Thread Pool Info. - * - * @author kimchy (shay.banon) - */ -public class ThreadPoolInfo implements Streamable, Serializable, ToXContent { - - private String type; - - private int minThreads; - - private int maxThreads; - - private int schedulerThreads; - - ThreadPoolInfo() { - } - - public ThreadPoolInfo(String type, int minThreads, int maxThreads, int schedulerThreads) { - this.type = type; - this.minThreads = minThreads; - this.maxThreads = maxThreads; - this.schedulerThreads = schedulerThreads; - } - - public static ThreadPoolInfo readThreadPoolInfo(StreamInput in) throws IOException { - ThreadPoolInfo info = new ThreadPoolInfo(); - info.readFrom(in); - return info; - } - - @Override public void readFrom(StreamInput in) throws IOException { - type = in.readUTF(); - minThreads = in.readInt(); - maxThreads = in.readInt(); - schedulerThreads = in.readInt(); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(type); - out.writeInt(minThreads); - out.writeInt(maxThreads); - out.writeInt(schedulerThreads); - } - - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("thread_pool"); - builder.field("type", type); - builder.field("min_threads", minThreads); - builder.field("max_threads", maxThreads); - builder.field("scheduler_threads", schedulerThreads); - builder.endObject(); - return builder; - } - - /** - * The type of the thread pool. - */ - public String type() { - return type; - } - - /** - * The type of the thread pool. - */ - public String getType() { - return type(); - } - - /** - * The minimum number of threads in the thread pool. - */ - public int minThreads() { - return minThreads; - } - - /** - * The minimum number of threads in the thread pool. - */ - public int getMinThreads() { - return minThreads(); - } - - /** - * The maximum number of threads in the thread pool. - */ - public int maxThreads() { - return maxThreads; - } - - /** - * The maximum number of threads in the thread pool. - */ - public int getMaxThreads() { - return maxThreads(); - } - - /** - * The size of scheduler threads. - */ - public int schedulerThreads() { - return schedulerThreads; - } - - /** - * The size of scheduler threads. - */ - public int getSchedulerThreads() { - return schedulerThreads(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java index 9c7f662789c..e9746982606 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java @@ -19,18 +19,13 @@ package org.elasticsearch.threadpool; -import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.Modules; -import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.cached.CachedThreadPoolModule; /** * @author kimchy (shay.banon) */ -public class ThreadPoolModule extends AbstractModule implements SpawnModules { +public class ThreadPoolModule extends AbstractModule { private final Settings settings; @@ -38,10 +33,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules { this.settings = settings; } - @Override public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings)); - } - @Override protected void configure() { + bind(ThreadPool.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java deleted file mode 100644 index 4674e356525..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import java.io.IOException; -import java.io.Serializable; - -/** - * Thread Pool level stats. - * - * @author kimchy (shay.banon) - */ -public class ThreadPoolStats implements Streamable, Serializable, ToXContent { - - private int poolSize; - - private int activeCount; - - private int schedulerPoolSize; - - private int schedulerActiveCount; - - ThreadPoolStats() { - } - - public ThreadPoolStats(int poolSize, int activeCount, int schedulerPoolSize, int schedulerActiveCount) { - this.poolSize = poolSize; - this.activeCount = activeCount; - this.schedulerPoolSize = schedulerPoolSize; - this.schedulerActiveCount = schedulerActiveCount; - } - - public static ThreadPoolStats readThreadPoolStats(StreamInput in) throws IOException { - ThreadPoolStats stats = new ThreadPoolStats(); - stats.readFrom(in); - return stats; - } - - @Override public void readFrom(StreamInput in) throws IOException { - poolSize = in.readVInt(); - activeCount = in.readVInt(); - schedulerPoolSize = in.readVInt(); - schedulerActiveCount = in.readVInt(); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(poolSize); - out.writeVInt(activeCount); - out.writeVInt(schedulerPoolSize); - out.writeVInt(schedulerActiveCount); - } - - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("thread_pool"); - builder.field("pool_size", poolSize); - builder.field("active_count", activeCount); - builder.field("scheduler_pool_size", schedulerPoolSize); - builder.field("scheduler_active_count", schedulerActiveCount); - builder.endObject(); - return builder; - } - - /** - * Returns the current number of threads in the pool. - * - * @return the number of threads - */ - public int poolSize() { - return poolSize; - } - - /** - * Returns the current number of threads in the pool. - * - * @return the number of threads - */ - public int getPoolSize() { - return poolSize(); - } - - /** - * Returns the approximate number of threads that are actively - * executing tasks. - * - * @return the number of threads - */ - public int activeCount() { - return activeCount; - } - - /** - * Returns the approximate number of threads that are actively - * executing tasks. - * - * @return the number of threads - */ - public int getActiveCount() { - return activeCount(); - } - - /** - * The size of the scheduler thread pool. - */ - public int schedulerPoolSize() { - return schedulerPoolSize; - } - - /** - * The size of the scheduler thread pool. - */ - public int getSchedulerPoolSize() { - return schedulerPoolSize(); - } - - /** - * The approximate number of threads that are actively executing scheduled - * tasks. - */ - public int schedulerActiveCount() { - return schedulerActiveCount; - } - - /** - * The approximate number of threads that are actively executing scheduled - * tasks. - */ - public int getSchedulerActiveCount() { - return schedulerActiveCount(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java deleted file mode 100644 index 08888d69126..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPool.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.blocking; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.SizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; -import org.elasticsearch.threadpool.support.AbstractThreadPool; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; -import static org.elasticsearch.common.unit.TimeValue.*; - -/** - * A thread pool that will will block the execute if all threads are busy. - * - * @author kimchy (shay.banon) - */ -public class BlockingThreadPool extends AbstractThreadPool { - - final int min; - final int max; - final int capacity; - final TimeValue waitTime; - final TimeValue keepAlive; - - final int scheduledSize; - - public BlockingThreadPool() { - this(EMPTY_SETTINGS); - } - - @Inject public BlockingThreadPool(Settings settings) { - super(settings); - this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); - this.min = componentSettings.getAsInt("min", 10); - this.max = componentSettings.getAsInt("max", 100); - - // capacity is set to 0 as it might cause starvation in blocking mode - this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles(); - this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60)); - this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); - logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize); -// executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]")); - executorService = DynamicExecutors.newBlockingThreadPool(min, max, keepAlive.millis(), capacity, waitTime.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]")); - scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); - cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); - started = true; - } - - @Override public String getType() { - return "blocking"; - } - - @Override public int getMinThreads() { - return min; - } - - @Override public int getMaxThreads() { - return max; - } - - @Override public int getSchedulerThreads() { - return scheduledSize; - } - - @Override public int getPoolSize() { - if (executorService instanceof TransferThreadPoolExecutor) { - return ((TransferThreadPoolExecutor) executorService).getPoolSize(); - } else { - return ((ThreadPoolExecutor) executorService).getPoolSize(); - } - } - - @Override public int getActiveCount() { - if (executorService instanceof TransferThreadPoolExecutor) { - return ((TransferThreadPoolExecutor) executorService).getActiveCount(); - } else { - return ((ThreadPoolExecutor) executorService).getActiveCount(); - } - } - - - @Override public int getSchedulerPoolSize() { - return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); - } - - @Override public int getSchedulerActiveCount() { - return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java deleted file mode 100644 index 5eb3e108bce..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolManagement.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.blocking; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.jmx.MBean; -import org.elasticsearch.jmx.ManagedAttribute; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -@MBean(objectName = "service=threadpool,threadpoolType=blocking", description = "Blocking Thread Pool") -public class BlockingThreadPoolManagement { - - private final BlockingThreadPool threadPool; - - @Inject public BlockingThreadPoolManagement(ThreadPool threadPool) { - this.threadPool = (BlockingThreadPool) threadPool; - } - - @ManagedAttribute(description = "Minimum number Of threads") - public long getMin() { - return threadPool.min; - } - - @ManagedAttribute(description = "Maximum number of threads") - public int getMax() { - return threadPool.max; - } - - @ManagedAttribute(description = "Number of scheduler threads") - public int getScheduleSize() { - return threadPool.scheduledSize; - } - - @ManagedAttribute(description = "Thread keep alive") - public String getKeepAlive() { - return threadPool.keepAlive.format(); - } - - @ManagedAttribute(description = "Thread keep alive (in seconds)") - public long getKeepAliveInSeconds() { - return threadPool.keepAlive.seconds(); - } - - @ManagedAttribute(description = "Current number of threads in the pool") - public long getPoolSize() { - return threadPool.getPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") - public long getActiveCount() { - return threadPool.getActiveCount(); - } - - @ManagedAttribute(description = "Current number of threads in the scheduler pool") - public long getSchedulerPoolSize() { - return threadPool.getSchedulerPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") - public long getSchedulerActiveCount() { - return threadPool.getSchedulerActiveCount(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java deleted file mode 100644 index 2902144ec0c..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/blocking/BlockingThreadPoolModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.blocking; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -public class BlockingThreadPoolModule extends AbstractModule { - - @Override protected void configure() { - bind(ThreadPool.class).to(BlockingThreadPool.class).asEagerSingleton(); - bind(BlockingThreadPoolManagement.class).asEagerSingleton(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java deleted file mode 100644 index 955b1c3fa9a..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPool.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.cached; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.threadpool.support.AbstractThreadPool; - -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; -import static org.elasticsearch.common.unit.TimeValue.*; - -/** - * A thread pool that will create an unbounded number of threads. - * - * @author kimchy (shay.banon) - */ -public class CachedThreadPool extends AbstractThreadPool { - - final TimeValue keepAlive; - - final int scheduledSize; - - public CachedThreadPool() { - this(EMPTY_SETTINGS); - } - - @Inject public CachedThreadPool(Settings settings) { - super(settings); - this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); - this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); - logger.debug("Initializing {} thread pool with keep_alive[{}], scheduled_size[{}]", getType(), keepAlive, scheduledSize); - executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - keepAlive.millis(), TimeUnit.MILLISECONDS, - new SynchronousQueue(), - EsExecutors.daemonThreadFactory(settings, "[tp]")); - scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); - cached = executorService; - started = true; - } - - @Override public String getType() { - return "cached"; - } - - @Override public int getMinThreads() { - return 0; - } - - @Override public int getMaxThreads() { - return -1; - } - - @Override public int getSchedulerThreads() { - return scheduledSize; - } - - @Override public int getPoolSize() { - return ((ThreadPoolExecutor) executorService).getPoolSize(); - } - - @Override public int getActiveCount() { - return ((ThreadPoolExecutor) executorService).getActiveCount(); - } - - @Override public int getSchedulerPoolSize() { - return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); - } - - @Override public int getSchedulerActiveCount() { - return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java deleted file mode 100644 index eaafb1a969b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolManagement.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.cached; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.jmx.MBean; -import org.elasticsearch.jmx.ManagedAttribute; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -@MBean(objectName = "service=threadpool,threadpoolType=cached", description = "Cached Thread Pool") -public class CachedThreadPoolManagement { - - private final CachedThreadPool threadPool; - - @Inject public CachedThreadPoolManagement(ThreadPool threadPool) { - this.threadPool = (CachedThreadPool) threadPool; - } - - @ManagedAttribute(description = "Number of scheduler threads") - public int getScheduleSize() { - return threadPool.scheduledSize; - } - - @ManagedAttribute(description = "Thread keep alive") - public String getKeepAlive() { - return threadPool.keepAlive.format(); - } - - @ManagedAttribute(description = "Thread keep alive (in seconds)") - public long getKeepAliveInSeconds() { - return threadPool.keepAlive.seconds(); - } - - @ManagedAttribute(description = "Current number of threads in the pool") - public long getPoolSize() { - return threadPool.getPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") - public long getActiveCount() { - return threadPool.getActiveCount(); - } - - @ManagedAttribute(description = "Current number of threads in the scheduler pool") - public long getSchedulerPoolSize() { - return threadPool.getSchedulerPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") - public long getSchedulerActiveCount() { - return threadPool.getSchedulerActiveCount(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java deleted file mode 100644 index bc9860ed285..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/cached/CachedThreadPoolModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.cached; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -public class CachedThreadPoolModule extends AbstractModule { - - @Override protected void configure() { - bind(ThreadPool.class).to(CachedThreadPool.class).asEagerSingleton(); - bind(CachedThreadPoolManagement.class).asEagerSingleton(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java deleted file mode 100644 index a08546106fd..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPool.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.fixed; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.jsr166y.ForkJoinPool; -import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; -import org.elasticsearch.threadpool.support.AbstractThreadPool; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; -import static org.elasticsearch.common.unit.TimeValue.*; - -/** - * - */ -public class FixedThreadPool extends AbstractThreadPool { - - final int size; - final TimeValue keepAlive; - - final int scheduledSize; - - public FixedThreadPool() { - this(EMPTY_SETTINGS); - } - - @Inject public FixedThreadPool(Settings settings) { - super(settings); - this.size = componentSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); - this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); - this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); - logger.debug("Initializing {} thread pool with [{}] threads, keep_alive[{}], scheduled_size[{}]", getType(), size, keepAlive, scheduledSize); - scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); - String type = componentSettings.get("type"); - if ("forkjoin".equalsIgnoreCase(type)) { - executorService = new ForkJoinPool(size, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); - } else { - executorService = new ThreadPoolExecutor(size, size, - 0L, TimeUnit.MILLISECONDS, - new LinkedTransferQueue(), - EsExecutors.daemonThreadFactory(settings, "[tp]")); - } - - cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); - started = true; - } - - @Override public int getMinThreads() { - return size; - } - - @Override public int getMaxThreads() { - return size; - } - - @Override public int getSchedulerThreads() { - return scheduledSize; - } - - @Override public int getPoolSize() { - if (executorService instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor) executorService).getPoolSize(); - } - return -1; - } - - @Override public int getActiveCount() { - if (executorService instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor) executorService).getActiveCount(); - } - return -1; - } - - @Override public int getSchedulerPoolSize() { - return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); - } - - @Override public int getSchedulerActiveCount() { - return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); - } - - @Override public String getType() { - return "fixed"; - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolManagement.java deleted file mode 100644 index 5f1fdf85cd6..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolManagement.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.fixed; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.jmx.MBean; -import org.elasticsearch.jmx.ManagedAttribute; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -@MBean(objectName = "service=threadpool,threadpoolType=fixed", description = "Fixed Thread Pool") -public class FixedThreadPoolManagement { - - private final FixedThreadPool threadPool; - - @Inject public FixedThreadPoolManagement(ThreadPool threadPool) { - this.threadPool = (FixedThreadPool) threadPool; - } - - @ManagedAttribute(description = "Number of scheduler threads") - public int getScheduleSize() { - return threadPool.scheduledSize; - } - - @ManagedAttribute(description = "Thread keep alive") - public String getKeepAlive() { - return threadPool.keepAlive.format(); - } - - @ManagedAttribute(description = "Thread keep alive (in seconds)") - public long getKeepAliveInSeconds() { - return threadPool.keepAlive.seconds(); - } - - @ManagedAttribute(description = "Current number of threads in the pool") - public long getPoolSize() { - return threadPool.getPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") - public long getActiveCount() { - return threadPool.getActiveCount(); - } - - @ManagedAttribute(description = "Current number of threads in the scheduler pool") - public long getSchedulerPoolSize() { - return threadPool.getSchedulerPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") - public long getSchedulerActiveCount() { - return threadPool.getSchedulerActiveCount(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolModule.java deleted file mode 100644 index d124d0796c5..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/fixed/FixedThreadPoolModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.fixed; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -public class FixedThreadPoolModule extends AbstractModule { - - @Override protected void configure() { - bind(ThreadPool.class).to(FixedThreadPool.class).asEagerSingleton(); - bind(FixedThreadPoolManagement.class).asEagerSingleton(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java deleted file mode 100644 index c51375c4c8b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPool.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.scaling; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.DynamicExecutors; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor; -import org.elasticsearch.threadpool.support.AbstractThreadPool; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; -import static org.elasticsearch.common.unit.TimeValue.*; - -/** - * @author kimchy (shay.banon) - */ -public class ScalingThreadPool extends AbstractThreadPool { - - final int min; - final int max; - final TimeValue keepAlive; - - final int scheduledSize; - - public ScalingThreadPool() { - this(EMPTY_SETTINGS); - } - - @Inject public ScalingThreadPool(Settings settings) { - super(settings); - this.min = componentSettings.getAsInt("min", 10); - this.max = componentSettings.getAsInt("max", 100); - this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5)); - this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1); - logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize); - scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]")); -// executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]")); - executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]")); - cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]")); - started = true; - } - - @Override public int getMinThreads() { - return min; - } - - @Override public int getMaxThreads() { - return max; - } - - @Override public int getSchedulerThreads() { - return scheduledSize; - } - - @Override public int getPoolSize() { - if (executorService instanceof TransferThreadPoolExecutor) { - return ((TransferThreadPoolExecutor) executorService).getPoolSize(); - } else { - return ((ThreadPoolExecutor) executorService).getPoolSize(); - } - } - - @Override public int getActiveCount() { - if (executorService instanceof TransferThreadPoolExecutor) { - return ((TransferThreadPoolExecutor) executorService).getActiveCount(); - } else { - return ((ThreadPoolExecutor) executorService).getActiveCount(); - } - } - - @Override public int getSchedulerPoolSize() { - return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize(); - } - - @Override public int getSchedulerActiveCount() { - return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount(); - } - - @Override public String getType() { - return "scaling"; - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java deleted file mode 100644 index 40d81e342f3..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolManagement.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.scaling; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.jmx.MBean; -import org.elasticsearch.jmx.ManagedAttribute; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -@MBean(objectName = "service=threadpool,threadpoolType=scaling", description = "Scaling Thread Pool") -public class ScalingThreadPoolManagement { - - private final ScalingThreadPool threadPool; - - @Inject public ScalingThreadPoolManagement(ThreadPool threadPool) { - this.threadPool = (ScalingThreadPool) threadPool; - } - - @ManagedAttribute(description = "Minimum number Of threads") - public long getMin() { - return threadPool.min; - } - - @ManagedAttribute(description = "Maximum number of threads") - public int getMax() { - return threadPool.max; - } - - @ManagedAttribute(description = "Number of scheduler threads") - public int getScheduleSize() { - return threadPool.scheduledSize; - } - - @ManagedAttribute(description = "Thread keep alive") - public String getKeepAlive() { - return threadPool.keepAlive.format(); - } - - @ManagedAttribute(description = "Thread keep alive (in seconds)") - public long getKeepAliveInSeconds() { - return threadPool.keepAlive.seconds(); - } - - @ManagedAttribute(description = "Current number of threads in the pool") - public long getPoolSize() { - return threadPool.getPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing tasks") - public long getActiveCount() { - return threadPool.getActiveCount(); - } - - @ManagedAttribute(description = "Current number of threads in the scheduler pool") - public long getSchedulerPoolSize() { - return threadPool.getSchedulerPoolSize(); - } - - @ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks") - public long getSchedulerActiveCount() { - return threadPool.getSchedulerActiveCount(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java deleted file mode 100644 index 0035999552b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/scaling/ScalingThreadPoolModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.scaling; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * @author kimchy (shay.banon) - */ -public class ScalingThreadPoolModule extends AbstractModule { - - @Override protected void configure() { - bind(ThreadPool.class).to(ScalingThreadPool.class).asEagerSingleton(); - bind(ScalingThreadPoolManagement.class).asEagerSingleton(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java deleted file mode 100644 index 79d6b5bab1a..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool.support; - -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.FutureListener; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolInfo; -import org.elasticsearch.threadpool.ThreadPoolStats; - -import java.util.concurrent.*; - -/** - * @author kimchy (shay.banon) - */ -public abstract class AbstractThreadPool extends AbstractComponent implements ThreadPool { - - protected volatile boolean started; - - protected ExecutorService executorService; - - protected ScheduledExecutorService scheduledExecutorService; - - protected ExecutorService cached; - - protected AbstractThreadPool(Settings settings) { - super(settings); - } - - public abstract String getType(); - - @Override public ThreadPoolInfo info() { - return new ThreadPoolInfo(getType(), getMinThreads(), getMaxThreads(), getSchedulerThreads()); - } - - @Override public ThreadPoolStats stats() { - return new ThreadPoolStats(getPoolSize(), getActiveCount(), getSchedulerPoolSize(), getSchedulerActiveCount()); - } - - @Override public boolean isStarted() { - return started; - } - - @Override public Executor cached() { - return cached; - } - - @Override public void shutdown() { - started = false; - logger.debug("shutting down {} thread pool", getType()); - executorService.shutdown(); - scheduledExecutorService.shutdown(); - cached.shutdown(); - } - - @Override public void shutdownNow() { - started = false; - if (!executorService.isTerminated()) { - executorService.shutdownNow(); - } - if (!scheduledExecutorService.isTerminated()) { - scheduledExecutorService.shutdownNow(); - } - if (cached != executorService) { - if (!cached.isTerminated()) { - cached.shutdownNow(); - } - } - } - - @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - boolean result = executorService.awaitTermination(timeout, unit); - if (cached != executorService) { - result &= cached.awaitTermination(timeout, unit); - } - result &= scheduledExecutorService.awaitTermination(timeout, unit); - return result; - } - - @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval) { - return scheduledExecutorService.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); - } - - @Override public ScheduledFuture schedule(Runnable command, TimeValue delay, ExecutionType executionType) { - if (executionType == ExecutionType.THREADED) { - command = new ThreadedRunnable(command); - } - return scheduledExecutorService.schedule(command, delay.millis(), TimeUnit.MILLISECONDS); - } - - - @Override public long estimatedCurrentTimeInMillis() { - return System.currentTimeMillis(); - } - - @Override public void execute(Runnable command) { - executorService.execute(command); - } - - class LoggingRunnable implements Runnable { - - private final Runnable runnable; - - LoggingRunnable(Runnable runnable) { - this.runnable = runnable; - } - - @Override public void run() { - try { - runnable.run(); - } catch (Exception e) { - logger.warn("failed to run {}", e, runnable.toString()); - } - } - - @Override public int hashCode() { - return runnable.hashCode(); - } - - @Override public boolean equals(Object obj) { - return runnable.equals(obj); - } - - @Override public String toString() { - return "[threaded] " + runnable.toString(); - } - } - - class ThreadedRunnable implements Runnable { - - private final Runnable runnable; - - ThreadedRunnable(Runnable runnable) { - this.runnable = runnable; - } - - @Override public void run() { - cached.execute(runnable); - } - - @Override public int hashCode() { - return runnable.hashCode(); - } - - @Override public boolean equals(Object obj) { - return runnable.equals(obj); - } - - @Override public String toString() { - return "[threaded] " + runnable.toString(); - } - } - - protected static class FutureCallable implements Callable { - - private final Callable callable; - - private final FutureListener listener; - - public FutureCallable(Callable callable, FutureListener listener) { - this.callable = callable; - this.listener = listener; - } - - @Override public T call() throws Exception { - try { - T result = callable.call(); - listener.onResult(result); - return result; - } catch (Exception e) { - listener.onException(e); - throw e; - } - } - } - - protected static class FutureRunnable implements Runnable { - - private final Runnable runnable; - - private final T result; - - private final FutureListener listener; - - private FutureRunnable(Runnable runnable, T result, FutureListener listener) { - this.runnable = runnable; - this.result = result; - this.listener = listener; - } - - @Override public void run() { - try { - runnable.run(); - listener.onResult(result); - } catch (Exception e) { - listener.onException(e); - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - } - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java index 95c33bec43f..9278eebac11 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java @@ -28,7 +28,4 @@ import org.elasticsearch.common.io.stream.Streamable; */ public abstract class BaseTransportRequestHandler implements TransportRequestHandler { - @Override public boolean spawn() { - return true; - } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java index fb88ff82d91..97c2f5b2b43 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/BaseTransportResponseHandler.java @@ -28,7 +28,4 @@ import org.elasticsearch.common.io.stream.Streamable; */ public abstract class BaseTransportResponseHandler implements TransportResponseHandler { - @Override public boolean spawn() { - return true; - } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java index aec3a057ebd..ad1ffebea4f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/FutureTransportResponseHandler.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.threadpool.ThreadPool; /** * A response handler to be used when all interaction will be done through the {@link TransportFuture}. @@ -33,4 +34,8 @@ public abstract class FutureTransportResponseHandler exten @Override public void handleException(TransportException exp) { } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java index b61ca97aec7..baefa949ba6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java @@ -74,6 +74,10 @@ public class PlainTransportFuture extends AbstractFuture extends AbstractFuture { void messageReceived(T request, TransportChannel channel) throws Exception; - boolean spawn(); + String executor(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 69d70810a55..a8e71f62bd5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -38,5 +38,5 @@ public interface TransportResponseHandler { void handleException(TransportException exp); - boolean spawn(); + String executor(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index aebe9817fd7..db0d433a2ad 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -172,7 +172,7 @@ public class TransportService extends AbstractLifecycleComponent(handler, node, action, timeoutHandler)); transport.sendRequest(node, requestId, action, message, options); @@ -191,7 +191,7 @@ public class TransportService extends AbstractLifecycleComponent { - public static final VoidTransportResponseHandler INSTANCE = new VoidTransportResponseHandler(true); - public static final VoidTransportResponseHandler INSTANCE_NOSPAWN = new VoidTransportResponseHandler(false); + public static final VoidTransportResponseHandler INSTANCE_SAME = new VoidTransportResponseHandler(ThreadPool.Names.SAME); + public static final VoidTransportResponseHandler INSTANCE_CACHED = new VoidTransportResponseHandler(ThreadPool.Names.CACHED); - private boolean spawn; + private final String executor; - public VoidTransportResponseHandler() { - this(true); - } - - public VoidTransportResponseHandler(boolean spawn) { - this.spawn = spawn; + public VoidTransportResponseHandler(String executor) { + this.executor = executor; } @Override public VoidStreamable newInstance() { @@ -49,7 +46,7 @@ public class VoidTransportResponseHandler implements TransportResponseHandler implem transportServiceAdapter.sent(data.length); - threadPool.execute(new Runnable() { + threadPool.cached().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, LocalTransport.this, requestId); } @@ -231,24 +231,15 @@ public class LocalTransport extends AbstractLifecycleComponent implem handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); return; } - if (handler.spawn()) { - threadPool.execute(new Runnable() { - @SuppressWarnings({"unchecked"}) @Override public void run() { - try { - handler.handleResponse(streamable); - } catch (Exception e) { - handleException(handler, new ResponseHandlerFailureTransportException(e)); - } + threadPool.executor(handler.executor()).execute(new Runnable() { + @SuppressWarnings({"unchecked"}) @Override public void run() { + try { + handler.handleResponse(streamable); + } catch (Exception e) { + handleException(handler, new ResponseHandlerFailureTransportException(e)); } - }); - } else { - try { - //noinspection unchecked - handler.handleResponse(streamable); - } catch (Exception e) { - handleException(handler, new ResponseHandlerFailureTransportException(e)); } - } + }); } private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index 7a105297e43..dadb731fc8c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -70,7 +70,7 @@ public class LocalTransportChannel implements TransportChannel { stream.writeByte(status); // 0 for request, 1 for response. message.writeTo(stream); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); - targetTransport.threadPool().execute(new Runnable() { + targetTransport.threadPool().cached().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, sourceTransport, null); } @@ -95,7 +95,7 @@ public class LocalTransportChannel implements TransportChannel { too.close(); } final byte[] data = stream.copiedByteArray(); - targetTransport.threadPool().execute(new Runnable() { + targetTransport.threadPool().cached().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, sourceTransport, null); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 2654fb80470..35b4595d7c9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -127,19 +127,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { return; } try { - if (handler.spawn()) { - threadPool.execute(new Runnable() { - @SuppressWarnings({"unchecked"}) @Override public void run() { - try { - handler.handleResponse(streamable); - } catch (Exception e) { - handleException(handler, new ResponseHandlerFailureTransportException(e)); - } - } - }); - } else { + if (handler.executor() == ThreadPool.Names.SAME) { //noinspection unchecked handler.handleResponse(streamable); + } else { + threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, streamable)); } } catch (Exception e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); @@ -162,8 +154,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { error = new RemoteTransportException(error.getMessage(), error); } final RemoteTransportException rtx = (RemoteTransportException) error; - if (handler.spawn()) { - threadPool.execute(new Runnable() { + if (handler.executor() == ThreadPool.Names.SAME) { + handler.handleException(rtx); + } else { + threadPool.executor(handler.executor()).execute(new Runnable() { @Override public void run() { try { handler.handleException(rtx); @@ -172,8 +166,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } }); - } else { - handler.handleException(rtx); } } @@ -188,24 +180,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } final Streamable streamable = handler.newInstance(); streamable.readFrom(buffer); - if (handler.spawn()) { - threadPool.execute(new Runnable() { - @SuppressWarnings({"unchecked"}) @Override public void run() { - try { - handler.messageReceived(streamable, transportChannel); - } catch (Throwable e) { - try { - transportChannel.sendResponse(e); - } catch (IOException e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); - } - } - } - }); - } else { + if (handler.executor() == ThreadPool.Names.SAME) { //noinspection unchecked handler.messageReceived(streamable, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new RequestHandler(handler, streamable, transportChannel, action)); } } catch (Exception e) { try { @@ -221,4 +200,50 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { transport.exceptionCaught(ctx, e); } + + class ResponseHandler implements Runnable { + + private final TransportResponseHandler handler; + private final Streamable streamable; + + public ResponseHandler(TransportResponseHandler handler, Streamable streamable) { + this.handler = handler; + this.streamable = streamable; + } + + @SuppressWarnings({"unchecked"}) @Override public void run() { + try { + handler.handleResponse(streamable); + } catch (Exception e) { + handleException(handler, new ResponseHandlerFailureTransportException(e)); + } + } + } + + class RequestHandler implements Runnable { + private final TransportRequestHandler handler; + private final Streamable streamable; + private final NettyTransportChannel transportChannel; + private final String action; + + public RequestHandler(TransportRequestHandler handler, Streamable streamable, NettyTransportChannel transportChannel, String action) { + this.handler = handler; + this.streamable = streamable; + this.transportChannel = transportChannel; + this.action = action; + } + + @SuppressWarnings({"unchecked"}) @Override public void run() { + try { + handler.messageReceived(streamable, transportChannel); + } catch (Throwable e) { + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java index ca08e292fae..28b5aff28f0 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.testng.annotations.Test; @@ -41,7 +40,7 @@ import static org.hamcrest.Matchers.*; public class MulticastZenPingTests { @Test public void testSimplePings() { - ThreadPool threadPool = new CachedThreadPool(); + ThreadPool threadPool = new ThreadPool(); ClusterName clusterName = new ClusterName("test"); final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index acc715f2de8..6961eba6e20 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; import org.testng.annotations.Test; @@ -43,7 +42,7 @@ import static org.hamcrest.Matchers.*; public class UnicastZenPingTests { @Test public void testSimplePings() { - ThreadPool threadPool = new CachedThreadPool(); + ThreadPool threadPool = new ThreadPool(); ClusterName clusterName = new ClusterName("test"); NettyTransport transportA = new NettyTransport(threadPool); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/monitor/jvm/DeadlockSimulator.java b/modules/elasticsearch/src/test/java/org/elasticsearch/monitor/jvm/DeadlockSimulator.java deleted file mode 100644 index fadcb7c6e49..00000000000 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/monitor/jvm/DeadlockSimulator.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.monitor.jvm; - -import org.elasticsearch.monitor.dump.DumpMonitorService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.scaling.ScalingThreadPool; - -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; - -/** - * @author kimchy - */ -public class DeadlockSimulator { - - public static void main(String[] args) { - ThreadPool threadPool = new ScalingThreadPool(); - DumpMonitorService dumpMonitorService = new DumpMonitorService(); - JvmMonitorService jvmMonitorService = new JvmMonitorService(EMPTY_SETTINGS, threadPool, dumpMonitorService).start(); - - //These are the two resource objects - //we'll try to get locks for - final Object resource1 = "resource1"; - final Object resource2 = "resource2"; - //Here's the first thread. - //It tries to lock resource1 then resource2 - Thread t1 = new Thread() { - public void run() { - //Lock resource 1 - synchronized (resource1) { - System.out.println("Thread 1: locked resource 1"); - //Pause for a bit, simulating some file I/O or - //something. Basically, we just want to give the - //other thread a chance to run. Threads and deadlock - //are asynchronous things, but we're trying to force - //deadlock to happen here... - try { - Thread.sleep(50); - } catch (InterruptedException e) { - } - - //Now wait 'till we can get a lock on resource 2 - synchronized (resource2) { - System.out.println("Thread 1: locked resource 2"); - } - } - } - }; - - //Here's the second thread. - //It tries to lock resource2 then resource1 - Thread t2 = new Thread() { - public void run() { - //This thread locks resource 2 right away - synchronized (resource2) { - System.out.println("Thread 2: locked resource 2"); - //Then it pauses, for the same reason as the first - //thread does - try { - Thread.sleep(50); - } catch (InterruptedException e) { - } - - //Then it tries to lock resource1. - //But wait! Thread 1 locked resource1, and - //won't release it till it gets a lock on resource2. - //This thread holds the lock on resource2, and won't - //release it till it gets resource1. - //We're at an impasse. Neither thread can run, - //and the program freezes up. - synchronized (resource1) { - System.out.println("Thread 2: locked resource 1"); - } - } - } - }; - - //Start the two threads. - //If all goes as planned, deadlock will occur, - //and the program will never exit. - t1.start(); - t2.start(); - } -} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 800bdea01cb..70eb4cb9f20 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -51,7 +50,7 @@ public abstract class AbstractSimpleTransportTests { protected DiscoveryNode serviceBNode; @BeforeMethod public void setUp() { - threadPool = new CachedThreadPool(); + threadPool = new ThreadPool(); build(); serviceA.connectToNode(serviceBNode); serviceB.connectToNode(serviceANode); @@ -72,6 +71,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(StringMessage request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { @@ -89,6 +92,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("hello moshe", equalTo(response.message)); } @@ -116,6 +123,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(StringMessage request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { @@ -133,6 +144,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("hello moshe", equalTo(response.message)); } @@ -159,6 +174,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception { assertThat("moshe", equalTo(request.message)); throw new RuntimeException("bad message !!!"); @@ -171,6 +190,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -213,6 +236,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(StringMessage request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); // don't send back a response @@ -231,6 +258,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -256,6 +287,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void messageReceived(StringMessage request, TransportChannel channel) { TimeValue sleep = TimeValue.parseTimeValue(request.message, null); try { @@ -278,6 +313,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("got response instead of exception", false, equalTo(true)); } @@ -306,6 +345,10 @@ public abstract class AbstractSimpleTransportTests { return new StringMessage(); } + @Override public String executor() { + return ThreadPool.Names.CACHED; + } + @Override public void handleResponse(StringMessage response) { assertThat("hello " + counter + "ms", equalTo(response.message)); }