ThreadPool: Refactor into several pools, with configurable types per pool, closes #687.

This commit is contained in:
kimchy 2011-02-15 07:00:24 +02:00
parent 1b5cdb181a
commit 3ed848a495
124 changed files with 993 additions and 2310 deletions

View File

@ -46,7 +46,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import java.io.File;
import java.util.concurrent.*;
@ -305,7 +304,7 @@ public class SimpleEngineBenchmark {
store.deleteContent();
ThreadPool threadPool = new ScalingThreadPool();
ThreadPool threadPool = new ThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index()));

View File

@ -67,6 +67,7 @@ public class SingleThreadIndexingStress {
System.out.println("Indexing [" + COUNT + "] ...");
int i = 1;
for (; i <= COUNT; i++) {
// client1.admin().cluster().preparePingSingle("test", "type1", Integer.toString(i)).execute().actionGet();
client1.prepareIndex("test", "type1").setId(Integer.toString(i)).setSource(source(Integer.toString(i), "test" + i))
.setCreate(false).execute().actionGet();
if ((i % 10000) == 0) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport;
@ -49,7 +48,7 @@ public class BenchmarkNettyLargeMessages {
Settings settings = ImmutableSettings.settingsBuilder()
.build();
final ThreadPool threadPool = new CachedThreadPool(settings);
final ThreadPool threadPool = new ThreadPool();
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
@ -65,12 +64,12 @@ public class BenchmarkNettyLargeMessages {
return new BenchmarkMessage();
}
@Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public boolean spawn() {
return true;
@Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
}
});
@ -85,6 +84,10 @@ public class BenchmarkNettyLargeMessages {
return new BenchmarkMessage();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(BenchmarkMessage response) {
}
@ -108,6 +111,10 @@ public class BenchmarkNettyLargeMessages {
return new BenchmarkMessage();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(BenchmarkMessage response) {
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.fixed.FixedThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
@ -54,14 +53,8 @@ public class TransportBenchmark {
public abstract Transport newTransport(Settings settings, ThreadPool threadPool);
}
public static ThreadPool newThreadPool(Settings settings) {
// return new ForkjoinThreadPool(settings);
return new FixedThreadPool(settings);
// return new CachedThreadPool(settings);
}
public static void main(String[] args) {
final boolean spawn = true;
final String executor = ThreadPool.Names.CACHED;
final boolean waitForRequest = true;
final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES);
final int NUMBER_OF_CLIENTS = 1;
@ -74,10 +67,10 @@ public class TransportBenchmark {
Settings settings = ImmutableSettings.settingsBuilder()
.build();
final ThreadPool serverThreadPool = newThreadPool(settings);
final ThreadPool serverThreadPool = new ThreadPool();
final TransportService serverTransportService = new TransportService(type.newTransport(settings, serverThreadPool), serverThreadPool).start();
final ThreadPool clientThreadPool = newThreadPool(settings);
final ThreadPool clientThreadPool = new ThreadPool();
final TransportService clientTransportService = new TransportService(type.newTransport(settings, clientThreadPool), clientThreadPool).start();
final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress());
@ -87,12 +80,12 @@ public class TransportBenchmark {
return new BenchmarkMessage();
}
@Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
@Override public String executor() {
return executor;
}
@Override public boolean spawn() {
return spawn;
@Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
}
});
@ -105,6 +98,10 @@ public class TransportBenchmark {
return new BenchmarkMessage();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(BenchmarkMessage response) {
}
@ -128,6 +125,10 @@ public class TransportBenchmark {
return new BenchmarkMessage();
}
@Override public String executor() {
return executor;
}
@Override public void handleResponse(BenchmarkMessage response) {
if (response.id != id) {
System.out.println("NO ID MATCH [" + response.id + "] and [" + id + "]");
@ -139,10 +140,6 @@ public class TransportBenchmark {
exp.printStackTrace();
latch.countDown();
}
@Override public boolean spawn() {
return spawn;
}
};
if (waitForRequest) {

View File

@ -50,6 +50,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
this.clusterName = clusterName;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.HEALTH;
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.network.NetworkInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
import java.io.IOException;
@ -55,15 +54,13 @@ public class NodeInfo extends NodeOperationResponse {
private NetworkInfo network;
private ThreadPoolInfo threadPool;
private TransportInfo transport;
NodeInfo() {
}
public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> attributes, Settings settings,
OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool,
OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network,
TransportInfo transport) {
super(node);
this.attributes = attributes;
@ -72,7 +69,6 @@ public class NodeInfo extends NodeOperationResponse {
this.process = process;
this.jvm = jvm;
this.network = network;
this.threadPool = threadPool;
this.transport = transport;
}
@ -160,20 +156,6 @@ public class NodeInfo extends NodeOperationResponse {
return network();
}
/**
* Thread Pool level information.
*/
public ThreadPoolInfo threadPool() {
return threadPool;
}
/**
* Thread Pool level information.
*/
public ThreadPoolInfo getThreadPool() {
return threadPool();
}
public TransportInfo transport() {
return transport;
}
@ -209,9 +191,6 @@ public class NodeInfo extends NodeOperationResponse {
if (in.readBoolean()) {
network = NetworkInfo.readNetworkInfo(in);
}
if (in.readBoolean()) {
threadPool = ThreadPoolInfo.readThreadPoolInfo(in);
}
if (in.readBoolean()) {
transport = TransportInfo.readTransportInfo(in);
}
@ -249,12 +228,6 @@ public class NodeInfo extends NodeOperationResponse {
out.writeBoolean(true);
network.writeTo(out);
}
if (threadPool == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
threadPool.writeTo(out);
}
if (transport == null) {
out.writeBoolean(false);
} else {

View File

@ -61,6 +61,10 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).remove(key).immutableMap();
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.INFO;
}
@ -100,7 +104,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings,
monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(),
threadPool.info(), transportService.info());
transportService.info());
}
@Override protected boolean accumulateExceptions() {

View File

@ -67,6 +67,10 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
listener.onFailure(new ElasticSearchIllegalStateException("restart is disabled (for now) ...."));
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.RESTART;
}
@ -110,7 +114,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
}
logger.info("Restarting in [{}]", request.delay);
threadPool.schedule(new Runnable() {
threadPool.schedule(request.delay, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
boolean restartWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
@ -135,7 +139,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
}
}
}
}, request.delay, ThreadPool.ExecutionType.THREADED);
});
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
}

View File

@ -64,6 +64,10 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
this.transportService.registerHandler(NodeShutdownRequestHandler.ACTION, new NodeShutdownRequestHandler());
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.SHUTDOWN;
}
@ -114,7 +118,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
latch.countDown();
} else {
logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
@ -136,7 +140,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
// now, kill the master
logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode());
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from master");
}
@ -177,7 +181,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
}
logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
@ -212,6 +216,10 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
return VoidStreamable.INSTANCE;
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(VoidStreamable request, TransportChannel channel) throws Exception {
if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled");

View File

@ -28,7 +28,6 @@ import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
import org.elasticsearch.monitor.os.OsStats;
import org.elasticsearch.monitor.process.ProcessStats;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
@ -50,8 +49,6 @@ public class NodeStats extends NodeOperationResponse {
private NetworkStats network;
private ThreadPoolStats threadPool;
private TransportStats transport;
NodeStats() {
@ -59,14 +56,13 @@ public class NodeStats extends NodeOperationResponse {
public NodeStats(DiscoveryNode node, NodeIndicesStats indices,
OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network,
ThreadPoolStats threadPool, TransportStats transport) {
TransportStats transport) {
super(node);
this.indices = indices;
this.os = os;
this.process = process;
this.jvm = jvm;
this.network = network;
this.threadPool = threadPool;
this.transport = transport;
}
@ -140,20 +136,6 @@ public class NodeStats extends NodeOperationResponse {
return network();
}
/**
* Thread Pool level stats.
*/
public ThreadPoolStats threadPool() {
return threadPool;
}
/**
* Thread Pool level stats.
*/
public ThreadPoolStats getThreadPool() {
return threadPool();
}
public TransportStats transport() {
return transport;
}
@ -185,9 +167,6 @@ public class NodeStats extends NodeOperationResponse {
if (in.readBoolean()) {
network = NetworkStats.readNetworkStats(in);
}
if (in.readBoolean()) {
threadPool = ThreadPoolStats.readThreadPoolStats(in);
}
if (in.readBoolean()) {
transport = TransportStats.readTransportStats(in);
}
@ -225,12 +204,6 @@ public class NodeStats extends NodeOperationResponse {
out.writeBoolean(true);
network.writeTo(out);
}
if (threadPool == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
threadPool.writeTo(out);
}
if (transport == null) {
out.writeBoolean(false);
} else {

View File

@ -53,6 +53,10 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.STATS;
}
@ -92,7 +96,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(),
monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(),
threadPool.stats(), transportService.stats());
transportService.stats());
}
@Override protected boolean accumulateExceptions() {

View File

@ -48,6 +48,10 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
super(settings, threadPool, clusterService, transportService);
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Ping.BROADCAST;
}

View File

@ -41,6 +41,10 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected boolean checkWriteConsistency() {
return true;
}

View File

@ -37,6 +37,10 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi
super(settings, threadPool, clusterService, transportService);
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Ping.SINGLE;
}

View File

@ -49,6 +49,10 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
this.clusterName = clusterName;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.STATE;
}

View File

@ -51,6 +51,10 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
this.indexAliasesService = indexAliasesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.ALIASES;
}

View File

@ -57,6 +57,10 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected AnalyzeRequest newRequest() {
return new AnalyzeRequest();
}

View File

@ -56,6 +56,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.Cache.CLEAR;
}

View File

@ -50,6 +50,10 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
this.stateIndexService = stateIndexService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.CLOSE;
}

View File

@ -50,6 +50,10 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
this.createIndexService = createIndexService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.CREATE;
}

View File

@ -50,6 +50,10 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
this.deleteIndexService = deleteIndexService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.DELETE;
}

View File

@ -56,6 +56,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.FLUSH;
}

View File

@ -53,6 +53,10 @@ public class TransportGatewaySnapshotAction extends TransportBroadcastOperationA
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.Gateway.SNAPSHOT;
}

View File

@ -66,6 +66,9 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
this.refreshAction = refreshAction;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.Mapping.DELETE;

View File

@ -51,6 +51,9 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
this.metaDataMappingService = metaDataMappingService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.Mapping.PUT;

View File

@ -50,6 +50,10 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
this.stateIndexService = stateIndexService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.OPEN;
}

View File

@ -57,6 +57,10 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.OPTIMIZE;
}

View File

@ -57,6 +57,10 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.REFRESH;
}

View File

@ -46,6 +46,10 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
this.updateSettingsService = updateSettingsService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.UPDATE_SETTINGS;
}

View File

@ -67,6 +67,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.STATUS;
}

View File

@ -50,6 +50,10 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera
this.indexTemplateService = indexTemplateService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.DELETE_INDEX_TEMPLATE;
}

View File

@ -50,6 +50,10 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio
this.indexTemplateService = indexTemplateService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.PUT_INDEX_TEMPLATE;
}

View File

@ -211,7 +211,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
if (requestsByShard.isEmpty()) {
// all failures, no shards to process, send a response
if (bulkRequest.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
@ -264,7 +264,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private void finishHim() {
if (bulkRequest.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
@ -305,8 +305,8 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
});
}
@Override public boolean spawn() {
return true; // spawn, we do some work here...
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -72,6 +72,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
this.mappingUpdatedAction = mappingUpdatedAction;
}
@Override protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override protected boolean checkWriteConsistency() {
return true;
}

View File

@ -54,6 +54,10 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override protected String transportAction() {
return TransportActions.COUNT;
}

View File

@ -67,6 +67,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true);
}
@Override protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override protected void doExecute(final DeleteRequest deleteRequest, final ActionListener<DeleteResponse> listener) {
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(deleteRequest.index())) {
createIndexAction.execute(new CreateIndexRequest(deleteRequest.index()), new ActionListener<CreateIndexResponse>() {

View File

@ -62,6 +62,10 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
return "indices/index/b_shard/delete";
}
@Override protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override protected void checkBlock(ShardDeleteRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}

View File

@ -48,6 +48,10 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return true;
}
@Override protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override protected ShardDeleteByQueryRequest newRequestInstance() {
return new ShardDeleteByQueryRequest();
}

View File

@ -69,6 +69,10 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override protected String transportAction() {
return TransportActions.GET;
}

View File

@ -147,6 +147,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return TransportActions.INDEX;
}
@Override protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override protected void checkBlock(IndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.xcontent.BoolQueryBuilder;
import org.elasticsearch.index.query.xcontent.MoreLikeThisFieldQueryBuilder;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -264,8 +265,8 @@ public class TransportMoreLikeThisAction extends BaseAction<MoreLikeThisRequest,
});
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -47,6 +47,10 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.PERCOLATE;
}
@Override protected PercolateRequest newRequest() {
return new PercolateRequest();
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -134,8 +135,8 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
});
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchA
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -94,5 +95,9 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
}
});
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -97,7 +97,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
@ -115,7 +115,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
}

View File

@ -104,7 +104,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
@ -122,7 +122,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executeQuery(dfsResult, counter, querySearchRequest, node);
}
@ -190,7 +190,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
@ -208,7 +208,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}

View File

@ -104,7 +104,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
@ -122,7 +122,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}

View File

@ -129,7 +129,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
DiscoveryNode node = nodes.get(target.v1());
@ -145,7 +145,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executePhase(node, target.v2());
}
@ -217,7 +217,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -229,7 +229,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}

View File

@ -135,7 +135,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
DiscoveryNode node = nodes.get(target.v1());
@ -151,7 +151,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
executeQueryPhase(counter, node, target.v2());
}
@ -246,7 +246,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -258,7 +258,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}

View File

@ -144,7 +144,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
@ -166,7 +166,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
performFirstPhase(shardIt.reset());
}
@ -327,7 +327,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -339,7 +339,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}

View File

@ -46,7 +46,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
}
public boolean listenerThreaded() {
return listenerThreaded;
return false; // we control execution of the listener
}
public ThreadPool threadPool() {
@ -107,9 +107,9 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
private void executeListener(final Object listener) {
if (listenerThreaded) {
if (listener instanceof Runnable) {
threadPool.execute((Runnable) listener);
threadPool.cached().execute((Runnable) listener);
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
ActionListener<T> lst = (ActionListener<T>) listener;
try {

View File

@ -30,24 +30,13 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.common.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
@ -60,14 +49,22 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected final ThreadPool threadPool;
final String transportAction;
final String transportShardAction;
final String executor;
protected TransportBroadcastOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
transportService.registerHandler(transportAction(), new TransportHandler());
transportService.registerHandler(transportShardAction(), new ShardTransportHandler());
this.transportAction = transportAction();
this.transportShardAction = transportShardAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
@ -78,6 +75,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract String transportShardAction();
protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
@ -187,7 +186,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
if (localOperations > 0) {
if (request.operationThreading() == BroadcastOperationThreading.SINGLE_THREAD) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = nextShardOrNull(shardIt.reset());
@ -225,7 +224,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
final ShardRequest shardRequest = newShardRequest(shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) {
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
onOperation(shard, shardOperation(shardRequest), true);
@ -247,11 +246,15 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// no node connected, act as failure
onOperation(shard, shardIt, null, false);
} else {
transportService.sendRequest(node, transportShardAction(), shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override public ShardResponse newInstance() {
return newShardResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(ShardResponse response) {
onOperation(shard, response, false);
}
@ -259,11 +262,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
@Override public void handleException(TransportException e) {
onOperation(shard, shardIt, e, false);
}
@Override public boolean spawn() {
// we never spawn here, we will span if needed in onOperation
return false;
}
});
}
}
@ -330,7 +328,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// if we need to execute the listener on a thread, and we are not threaded already
// then do it
if (request.listenerThreaded() && !alreadyThreaded) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
@ -355,6 +353,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
return newRequest();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// we just send back a response, no need to fork a listener
request.listenerThreaded(false);
@ -380,10 +382,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
});
}
@Override public boolean spawn() {
return false;
}
}
class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
@ -392,188 +390,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
return newShardRequest();
}
@Override public void messageReceived(ShardRequest request, TransportChannel channel) throws Exception {
@Override public String executor() {
return executor;
}
@Override public void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
channel.sendResponse(shardOperation(request));
}
}
// FROM HERE: When we move to a single remote call with all shard requests to the same node, then
// the below classes can help
class ShardsTransportHandler extends BaseTransportRequestHandler<ShardsRequest> {
@Override public ShardsRequest newInstance() {
return new ShardsRequest();
}
@Override public void messageReceived(final ShardsRequest request, final TransportChannel channel) throws Exception {
if (request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD) {
final AtomicInteger counter = new AtomicInteger(request.requests().size());
final AtomicInteger index = new AtomicInteger();
final AtomicReferenceArray results = new AtomicReferenceArray(request.requests().size());
for (final ShardRequest singleRequest : request.requests()) {
threadPool.execute(new Runnable() {
@Override public void run() {
int arrIndex = index.getAndIncrement();
try {
results.set(arrIndex, shardOperation(singleRequest));
} catch (Exception e) {
results.set(arrIndex, new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e));
}
if (counter.decrementAndGet() == 0) {
// we are done
List<ShardResponse> responses = newArrayListWithCapacity(request.requests().size());
List<BroadcastShardOperationFailedException> exceptions = null;
for (int i = 0; i < results.length(); i++) {
Object result = results.get(i);
if (result instanceof BroadcastShardOperationFailedException) {
if (exceptions == null) {
exceptions = newArrayList();
}
exceptions.add((BroadcastShardOperationFailedException) result);
} else {
responses.add((ShardResponse) result);
}
}
try {
channel.sendResponse(new ShardsResponse(responses, exceptions));
} catch (IOException e) {
logger.warn("Failed to send broadcast response", e);
}
}
}
});
}
} else {
// single thread
threadPool.execute(new Runnable() {
@Override public void run() {
List<ShardResponse> responses = newArrayListWithCapacity(request.requests().size());
List<BroadcastShardOperationFailedException> exceptions = null;
for (ShardRequest singleRequest : request.requests()) {
try {
responses.add(shardOperation(singleRequest));
} catch (Exception e) {
if (exceptions == null) {
exceptions = newArrayList();
}
exceptions.add(new BroadcastShardOperationFailedException(new ShardId(singleRequest.index(), singleRequest.shardId()), e));
}
}
try {
channel.sendResponse(new ShardsResponse(responses, exceptions));
} catch (IOException e) {
logger.warn("Failed to send broadcast response", e);
}
}
});
}
}
@Override public boolean spawn() {
// we handle the forking here...
return false;
}
}
class ShardsResponse implements Streamable {
private List<ShardResponse> responses;
private List<BroadcastShardOperationFailedException> exceptions;
ShardsResponse() {
}
ShardsResponse(List<ShardResponse> responses, List<BroadcastShardOperationFailedException> exceptions) {
this.responses = responses;
this.exceptions = exceptions;
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
responses = newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
ShardResponse response = newShardResponse();
response.readFrom(in);
responses.add(response);
}
size = in.readVInt();
if (size > 0) {
exceptions = newArrayListWithCapacity(size);
ThrowableObjectInputStream toi = new ThrowableObjectInputStream(in);
for (int i = 0; i < size; i++) {
try {
exceptions.add((BroadcastShardOperationFailedException) toi.readObject());
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load class", e);
}
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(responses.size());
for (BroadcastShardOperationResponse response : responses) {
response.writeTo(out);
}
if (exceptions == null || exceptions.isEmpty()) {
out.writeInt(0);
} else {
out.writeInt(exceptions.size());
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(out);
for (BroadcastShardOperationFailedException ex : exceptions) {
too.writeObject(ex);
}
too.flush();
}
}
}
class ShardsRequest implements Streamable {
private BroadcastOperationThreading operationThreading = BroadcastOperationThreading.SINGLE_THREAD;
private List<ShardRequest> requests;
ShardsRequest() {
}
public List<ShardRequest> requests() {
return this.requests;
}
public BroadcastOperationThreading operationThreading() {
return operationThreading;
}
ShardsRequest(BroadcastOperationThreading operationThreading, List<ShardRequest> requests) {
this.operationThreading = operationThreading;
this.requests = requests;
}
@Override public void readFrom(StreamInput in) throws IOException {
operationThreading = BroadcastOperationThreading.fromId(in.readByte());
int size = in.readVInt();
if (size == 0) {
requests = ImmutableList.of();
} else {
requests = newArrayListWithCapacity(in.readVInt());
for (int i = 0; i < size; i++) {
ShardRequest request = newShardRequest();
request.readFrom(in);
requests.add(request);
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(operationThreading.id());
out.writeVInt(requests.size());
for (BroadcastShardOperationRequest request : requests) {
request.writeTo(out);
}
}
}
}

View File

@ -49,17 +49,25 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
protected final ThreadPool threadPool;
final String transportAction;
final String executor;
protected TransportMasterNodeOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
transportService.registerHandler(transportAction(), new TransportHandler());
this.transportAction = transportAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
}
protected abstract String transportAction();
protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse();
@ -121,7 +129,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
}
});
} else {
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
Response response = masterOperation(request, clusterState);
@ -168,7 +176,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
return;
}
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(nodes.masterNode(), transportAction, request, new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponse();
}
@ -177,6 +185,10 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
listener.onResponse(response);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleException(final TransportException exp) {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
@ -221,35 +233,30 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
return newRequest();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
final ClusterState clusterState = clusterService.state();
if (clusterState.nodes().localNodeMaster() || localExecute(request)) {
checkBlock(request, clusterState);
Response response = masterOperation(request, clusterState);
channel.sendResponse(response);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponse();
// we just send back a response, no need to fork a listener
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}
@Override public void handleResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
logger.error("Failed to send response", e);
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response", e1);
}
@Override public void handleException(TransportException exp) {
try {
channel.sendResponse(exp);
} catch (Exception e) {
logger.error("Failed to send response", e);
}
}
});
}
}
});
}
}
}

View File

@ -51,6 +51,10 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected final TransportService transportService;
final String transportAction;
final String transportNodeAction;
final String executor;
@Inject public TransportNodesOperationAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings);
@ -59,8 +63,12 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
this.clusterService = clusterService;
this.transportService = transportService;
transportService.registerHandler(transportAction(), new TransportHandler());
transportService.registerHandler(transportNodeAction(), new NodeTransportHandler());
this.transportAction = transportAction();
this.transportNodeAction = transportNodeAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
transportService.registerHandler(transportNodeAction, new NodeTransportHandler());
}
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
@ -71,6 +79,8 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected abstract String transportNodeAction();
protected abstract String executor();
protected abstract Request newRequest();
protected abstract Response newResponse(Request request, AtomicReferenceArray nodesResponses);
@ -117,7 +127,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
private void start() {
if (nodesIds.length == 0) {
// nothing to notify
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponse(request, responses));
}
@ -131,7 +141,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
for (final String nodeId : nodesIds) {
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
threadPool.execute(new Runnable() {
threadPool.executor(executor()).execute(new Runnable() {
@Override public void run() {
try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
@ -141,7 +151,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
}
});
} else if (nodeId.equals("_master")) {
threadPool.execute(new Runnable() {
threadPool.executor(executor()).execute(new Runnable() {
@Override public void run() {
try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
@ -155,7 +165,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
onFailure(nodeId, new NoSuchNodeException(nodeId));
} else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
transportService.sendRequest(node, transportNodeAction(), nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@Override public NodeResponse newInstance() {
return newNodeResponse();
}
@ -168,8 +178,8 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
onFailure(node.id(), exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -200,7 +210,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
private void finishHim() {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponse(request, responses));
}
@ -238,12 +248,12 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
});
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public String toString() {
return transportAction();
return transportAction;
}
}
@ -253,12 +263,16 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
return newNodeRequest();
}
@Override public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
@Override public void messageReceived(final NodeRequest request, final TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}
@Override public String toString() {
return transportNodeAction();
return transportNodeAction;
}
@Override public String executor() {
return executor;
}
}
}

View File

@ -92,7 +92,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
shardsResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
@ -110,7 +110,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
}
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
@ -146,6 +146,10 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
return newRequestInstance();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
@ -167,10 +171,5 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
}
});
}
@Override public boolean spawn() {
// no need to spawn, since in the doExecute we always execute with threaded operation set to true
return false;
}
}
}

View File

@ -46,6 +46,9 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
final String transportAction;
@Inject public TransportIndicesReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction) {
super(settings);
@ -53,7 +56,9 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
this.clusterService = clusterService;
this.indexAction = indexAction;
transportService.registerHandler(transportAction(), new TransportHandler());
this.transportAction = transportAction();
transportService.registerHandler(transportAction, new TransportHandler());
}
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
@ -79,7 +84,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
indexResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, indexResponses));
}
@ -98,7 +103,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
}
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, indexResponses));
}
@ -132,6 +137,10 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
return newRequestInstance();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need for a threaded listener, since we just send a response
request.listenerThreaded(false);
@ -148,15 +157,10 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + transportAction() + "] and request [" + request + "]", e1);
logger.warn("Failed to send error response for action [" + transportAction + "] and request [" + request + "]", e1);
}
}
});
}
@Override public boolean spawn() {
// no need to spawn, since we always execute in the index one with threadedOperation set to true
return false;
}
}
}

View File

@ -78,6 +78,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
final String transportAction;
final String transportReplicaAction;
final String executor;
final boolean checkWriteConsistency;
protected TransportShardReplicationOperationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction) {
@ -88,8 +93,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.threadPool = threadPool;
this.shardStateAction = shardStateAction;
transportService.registerHandler(transportAction(), new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction(), new ReplicaOperationTransportHandler());
this.transportAction = transportAction();
this.transportReplicaAction = transportReplicaAction();
this.executor = executor();
this.checkWriteConsistency = checkWriteConsistency();
transportService.registerHandler(transportAction, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
@ -105,6 +115,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract String transportAction();
protected abstract String executor();
protected abstract PrimaryResponse<Response> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
@ -150,6 +162,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return newRequestInstance();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
@ -168,15 +184,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for " + transportAction(), e1);
logger.warn("Failed to send response for " + transportAction, e1);
}
}
});
}
@Override public boolean spawn() {
return false;
}
}
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ShardOperationRequest> {
@ -185,16 +197,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return new ShardOperationRequest();
}
@Override public void messageReceived(ShardOperationRequest request, TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
@Override public String executor() {
return executor;
}
/**
* We spawn, since we want to perform the operation on the replica on a different thread.
*/
@Override public boolean spawn() {
return true;
@Override public void messageReceived(final ShardOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -294,7 +303,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// check here for consistency
if (checkWriteConsistency()) {
if (checkWriteConsistency) {
WriteConsistencyLevel consistencyLevel = defaultWriteConsistencyLevel;
if (request.consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = request.consistencyLevel();
@ -320,7 +329,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
performOnPrimary(shard.id(), fromClusterEvent, true, shard, clusterState);
}
@ -330,12 +339,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction(), request, transportOptions(), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponseInstance();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(Response response) {
listener.onResponse(response);
}
@ -352,10 +365,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
listener.onFailure(exp);
}
}
@Override public boolean spawn() {
return request.listenerThreaded();
}
});
}
break;
@ -364,7 +373,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (!foundPrimary) {
final UnavailableShardsException failure = new UnavailableShardsException(shardIt.shardId(), request.toString());
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(failure);
}
@ -410,7 +419,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
clusterService.remove(this);
final UnavailableShardsException failure = new UnavailableShardsException(shardId, "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(failure);
}
@ -446,7 +455,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -486,7 +495,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -501,7 +510,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -549,7 +558,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -566,7 +575,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -579,15 +588,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
final ShardOperationRequest shardRequest = new ShardOperationRequest(shardIt.shardId().id(), request);
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction(), shardRequest, transportOptions(), new VoidTransportResponseHandler() {
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions(), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable vResponse) {
finishIfPossible();
}
@Override public void handleException(TransportException exp) {
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction() + " on replica " + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(exp) + "]");
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
}
finishIfPossible();
}
@ -595,7 +604,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private void finishIfPossible() {
if (counter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
@ -605,23 +614,18 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
}
@Override public boolean spawn() {
// don't spawn, we will call the listener on a thread pool if needed
return false;
}
});
} else {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction() + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
@ -634,13 +638,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction() + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}

View File

@ -50,14 +50,22 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
protected final ThreadPool threadPool;
final String transportAction;
final String transportShardAction;
final String executor;
protected TransportSingleCustomOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
transportService.registerHandler(transportAction(), new TransportHandler());
transportService.registerHandler(transportShardAction(), new ShardTransportHandler());
this.transportAction = transportAction();
this.transportShardAction = transportShardAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
@ -68,6 +76,8 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
protected abstract String transportShardAction();
protected abstract String executor();
protected abstract ShardsIterator shards(ClusterState state, Request request);
protected abstract Response shardOperation(Request request, int shardId) throws ElasticSearchException;
@ -122,7 +132,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(executor()).execute(new Runnable() {
@Override public void run() {
try {
Response response = shardOperation(request, shard.id());
@ -137,7 +147,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -171,7 +181,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
if (!request.preferLocalShard()) {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
Response response = shardOperation(request, shard.id());
@ -186,7 +196,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -202,14 +212,18 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportShardAction(), new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(final Response response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -222,11 +236,6 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
@Override public void handleException(TransportException exp) {
onFailure(shard, exp);
}
@Override public boolean spawn() {
// no need to spawn, we will execute the listener on a different thread if needed in handleResponse
return false;
}
});
return;
}
@ -242,7 +251,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
if (request.listenerThreaded()) {
final Exception fFailure = failure;
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(fFailure);
}
@ -284,8 +293,8 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
});
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
@ -295,7 +304,11 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
return new ShardSingleOperationRequest();
}
@Override public void messageReceived(ShardSingleOperationRequest request, TransportChannel channel) throws Exception {
@Override public String executor() {
return executor;
}
@Override public void messageReceived(final ShardSingleOperationRequest request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request.request(), request.shardId());
channel.sendResponse(response);
}

View File

@ -50,14 +50,22 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
protected final ThreadPool threadPool;
final String transportAction;
final String transportShardAction;
final String executor;
protected TransportShardSingleOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
transportService.registerHandler(transportAction(), new TransportHandler());
transportService.registerHandler(transportShardAction(), new ShardTransportHandler());
this.transportAction = transportAction();
this.transportShardAction = transportShardAction();
this.executor = executor();
transportService.registerHandler(transportAction, new TransportHandler());
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
@ -68,6 +76,8 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
protected abstract String transportShardAction();
protected abstract String executor();
protected abstract Response shardOperation(Request request, int shardId) throws ElasticSearchException;
protected abstract Request newRequest();
@ -124,7 +134,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
final ShardRouting shard = shardIt.nextActive();
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
threadPool.execute(new Runnable() {
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
Response response = shardOperation(request, shard.id());
@ -139,7 +149,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -167,14 +177,19 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
// no need to check for local nodes, we tried them already in performFirstGet
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportShardAction(), new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(final Response response) {
if (request.listenerThreaded()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
@ -187,11 +202,6 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
@Override public void handleException(TransportException exp) {
onFailure(shard, exp);
}
@Override public boolean spawn() {
// no need to spawn, we will execute the listener on a different thread if needed in handleResponse
return false;
}
});
return;
}
@ -207,7 +217,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
if (request.listenerThreaded()) {
final Exception fFailure = failure;
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(fFailure);
}
@ -225,6 +235,10 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
return newRequest();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
@ -248,10 +262,6 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
});
}
@Override public boolean spawn() {
return false;
}
}
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardSingleOperationRequest> {
@ -260,7 +270,11 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
return new ShardSingleOperationRequest();
}
@Override public void messageReceived(ShardSingleOperationRequest request, TransportChannel channel) throws Exception {
@Override public String executor() {
return executor;
}
@Override public void messageReceived(final ShardSingleOperationRequest request, final TransportChannel channel) throws Exception {
Response response = shardOperation(request.request(), request.shardId());
channel.sendResponse(response);
}

View File

@ -33,10 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.util.HashSet;
import java.util.concurrent.CopyOnWriteArrayList;
@ -94,7 +91,7 @@ public class TransportClientNodesService extends AbstractComponent {
} else {
this.nodesSampler = new ScheduledConnectNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSampler, nodesSamplerInterval, ThreadPool.ExecutionType.THREADED);
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, nodesSampler);
// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
@ -175,16 +172,10 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
try {
NodesInfoResponse nodeInfo = transportService.submitRequest(node, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfoRequest("_local"), new BaseTransportResponseHandler<NodesInfoResponse>() {
NodesInfoResponse nodeInfo = transportService.submitRequest(node, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfoRequest("_local"), new FutureTransportResponseHandler<NodesInfoResponse>() {
@Override public NodesInfoResponse newInstance() {
return new NodesInfoResponse();
}
@Override public void handleResponse(NodesInfoResponse response) {
}
@Override public void handleException(TransportException exp) {
}
}).txGet();
if (!clusterName.equals(nodeInfo.clusterName())) {
logger.warn("Node {} not part of the cluster {}, ignoring...", node, clusterName);
@ -198,7 +189,7 @@ public class TransportClientNodesService extends AbstractComponent {
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.ExecutionType.THREADED);
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}
@ -213,7 +204,7 @@ public class TransportClientNodesService extends AbstractComponent {
final CountDownLatch latch = new CountDownLatch(listedNodes.size());
final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>();
for (final DiscoveryNode listedNode : listedNodes) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
transportService.connectToNode(listedNode); // make sure we are connected to it
@ -223,6 +214,10 @@ public class TransportClientNodesService extends AbstractComponent {
return new NodesInfoResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(NodesInfoResponse response) {
nodesInfoResponses.add(response);
latch.countDown();
@ -271,7 +266,7 @@ public class TransportClientNodesService extends AbstractComponent {
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.ExecutionType.THREADED);
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.client.transport.action.ClientTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
@ -73,6 +74,13 @@ public abstract class BaseClientTransportAction<Request extends ActionRequest, R
return BaseClientTransportAction.this.newInstance();
}
@Override public String executor() {
if (request.listenerThreaded()) {
return ThreadPool.Names.CACHED;
}
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(Response response) {
listener.onResponse(response);
}
@ -80,10 +88,6 @@ public abstract class BaseClientTransportAction<Request extends ActionRequest, R
@Override public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return request.listenerThreaded();
}
});
}

View File

@ -59,6 +59,10 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
return "cluster/mappingUpdated";
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected MappingUpdatedRequest newRequest() {
return new MappingUpdatedRequest();
}

View File

@ -71,14 +71,14 @@ public class NodeIndexCreatedAction extends AbstractComponent {
public void nodeIndexCreated(final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
innerNodeIndexCreated(index, nodeId);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE);
NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE_SAME);
}
}
@ -104,6 +104,10 @@ public class NodeIndexCreatedAction extends AbstractComponent {
innerNodeIndexCreated(message.index, message.nodeId);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
private static class NodeIndexCreatedMessage implements Streamable {

View File

@ -71,14 +71,14 @@ public class NodeIndexDeletedAction extends AbstractComponent {
public void nodeIndexDeleted(final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
innerNodeIndexDeleted(index, nodeId);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE);
NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedMessage(index, nodeId), VoidTransportResponseHandler.INSTANCE_SAME);
}
}
@ -104,6 +104,10 @@ public class NodeIndexDeletedAction extends AbstractComponent {
innerNodeIndexDeleted(message.index, message.nodeId);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
private static class NodeIndexDeletedMessage implements Streamable {

View File

@ -63,14 +63,14 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(new Runnable() {
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
boolean removed = listeners.remove(listener);
if (removed) {
listener.onTimeout();
}
}
}, timeout, ThreadPool.ExecutionType.THREADED);
});
}
public void remove(Listener listener) {
@ -80,14 +80,14 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public void nodeMappingCreated(final NodeMappingCreatedResponse response) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
innerNodeIndexCreated(response);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeMappingCreatedTransportHandler.ACTION, response, VoidTransportResponseHandler.INSTANCE);
NodeMappingCreatedTransportHandler.ACTION, response, VoidTransportResponseHandler.INSTANCE_SAME);
}
}
@ -116,6 +116,10 @@ public class NodeMappingCreatedAction extends AbstractComponent {
innerNodeIndexCreated(response);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
public static class NodeMappingCreatedResponse implements Streamable {

View File

@ -75,14 +75,10 @@ public class ShardStateAction extends AbstractComponent {
logger.warn("sending failed shard for {}, reason [{}]", shardRouting, reason);
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
@Override public void run() {
innerShardFailed(shardRouting, reason);
}
});
innerShardFailed(shardRouting, reason);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler() {
ShardFailedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode());
}
@ -96,14 +92,10 @@ public class ShardStateAction extends AbstractComponent {
}
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
@Override public void run() {
innerShardStarted(shardRouting, reason);
}
});
innerShardStarted(shardRouting, reason);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler() {
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, reason), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode());
}
@ -184,6 +176,10 @@ public class ShardStateAction extends AbstractComponent {
innerShardFailed(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
private class ShardStartedTransportHandler extends BaseTransportRequestHandler<ShardRoutingEntry> {
@ -198,6 +194,10 @@ public class ShardStateAction extends AbstractComponent {
innerShardStarted(request.shardRouting, request.reason);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
private static class ShardRoutingEntry implements Streamable {

View File

@ -129,12 +129,12 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
threadPool.schedule(new Runnable() {
threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
}, request.timeout, ThreadPool.ExecutionType.DEFAULT);
});
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();

View File

@ -93,7 +93,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
// also, if the routing table changed, it means that we have new indices, or shard have started
// or failed, we want to apply this as fast as possible
routingTableDirty = true;
threadPool.execute(new RoutingTableUpdater());
threadPool.cached().execute(new RoutingTableUpdater());
} else {
if (event.nodesAdded()) {
routingTableDirty = true;

View File

@ -95,7 +95,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override protected void doStart() throws ElasticSearchException {
this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build();
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(new ReconnectToNodes(), reconnectInterval, ThreadPool.ExecutionType.THREADED);
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.CACHED, new ReconnectToNodes());
}
@Override protected void doStop() throws ElasticSearchException {
@ -148,7 +148,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return;
}
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(notifyTimeout, timeout, ThreadPool.ExecutionType.THREADED);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.CACHED, notifyTimeout);
onGoingTimeouts.add(notifyTimeout);
clusterStateListeners.add(listener);
// call the post added notification on the same event thread
@ -311,7 +311,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
if (lifecycle.started()) {
reconnectToNodes = threadPool.schedule(this, reconnectInterval, ThreadPool.ExecutionType.THREADED);
reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.CACHED, this);
}
}
}

View File

@ -157,7 +157,7 @@ public class MasterFaultDetection extends AbstractComponent {
}
this.masterPinger = new MasterPinger();
// start the ping process
threadPool.schedule(masterPinger, pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}
public void stop(String reason) {
@ -201,7 +201,7 @@ public class MasterFaultDetection extends AbstractComponent {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
threadPool.schedule(masterPinger, pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
} catch (Exception e) {
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
@ -261,7 +261,7 @@ public class MasterFaultDetection extends AbstractComponent {
final DiscoveryNode masterToPing = masterNode;
if (masterToPing == null) {
// master is null, should not happen, but we are still running, so reschedule
threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout),
@ -283,7 +283,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyDisconnectedFromMaster();
}
// we don't stop on disconnection from master, we keep pinging it
threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
}
}
@ -312,8 +312,8 @@ public class MasterFaultDetection extends AbstractComponent {
}
}
@Override public boolean spawn() {
return false; // no need to spawn, we hardly do anything
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -338,10 +338,8 @@ public class MasterFaultDetection extends AbstractComponent {
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId)));
}
@Override public boolean spawn() {
// no need to spawn here, we just send a response
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}

View File

@ -122,7 +122,7 @@ public class NodesFaultDetection extends AbstractComponent {
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
threadPool.schedule(new SendPingRequest(newNode), pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
@ -168,7 +168,7 @@ public class NodesFaultDetection extends AbstractComponent {
try {
transportService.connectToNode(node);
nodesFD.put(node, new NodeFD());
threadPool.schedule(new SendPingRequest(node), pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
} catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
notifyNodeFailure(node, "transport disconnected (with verified connect)");
@ -217,7 +217,7 @@ public class NodesFaultDetection extends AbstractComponent {
return;
}
nodeFD.retryCount = 0;
threadPool.schedule(SendPingRequest.this, pingInterval, ThreadPool.ExecutionType.DEFAULT);
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
}
}
@ -251,8 +251,8 @@ public class NodesFaultDetection extends AbstractComponent {
}
}
@Override public boolean spawn() {
return false; // no need to spawn, we hardly do anything
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -290,9 +290,8 @@ public class NodesFaultDetection extends AbstractComponent {
channel.sendResponse(new PingResponse());
}
@Override public boolean spawn() {
// no need to spawn here, we just send a response
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
@ -68,15 +69,15 @@ public class MembershipAction extends AbstractComponent {
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), VoidTransportResponseHandler.INSTANCE_SAME);
}
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException {
transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), VoidTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_SAME);
}
public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException {
@ -148,6 +149,10 @@ public class MembershipAction extends AbstractComponent {
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
private static class LeaveRequest implements Streamable {
@ -182,5 +187,9 @@ public class MembershipAction extends AbstractComponent {
listener.onLeave(request.node);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -200,7 +200,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
sendPingRequest(id, true);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
try {
sendPingRequest(id, false);
@ -208,13 +208,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
logger.warn("[{}] failed to send second ping request", e, id);
}
}
}, TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.ExecutionType.THREADED);
threadPool.schedule(new Runnable() {
});
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}, timeout, ThreadPool.ExecutionType.THREADED);
});
}
private void sendPingRequest(int id, boolean remove) {
@ -266,8 +266,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
@ -352,7 +352,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
@ -363,7 +363,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
});
} else {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}

View File

@ -152,13 +152,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(id, timeout, false);
threadPool.schedule(new Runnable() {
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(id, timeout, true);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}, timeout, ThreadPool.ExecutionType.THREADED);
});
}
private void sendPings(final int id, TimeValue timeout, boolean wait) {
@ -202,6 +202,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
return new UnicastPingResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(UnicastPingResponse response) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
@ -243,10 +247,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
logger.warn("failed to send ping to [{}]", exp, node);
}
}
@Override public boolean spawn() {
return false;
}
});
}
if (wait) {
@ -260,11 +260,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
temporalResponses.add(request.pingResponse);
threadPool.schedule(new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
temporalResponses.remove(request.pingResponse);
}
}, TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.ExecutionType.DEFAULT);
});
List<PingResponse> pingResponses = newArrayList(temporalResponses);
DiscoveryNodes discoNodes = nodesProvider.nodes();
@ -286,12 +286,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
return new UnicastPingRequest();
}
@Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(handlePingRequest(request));
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public boolean spawn() {
return false;
@Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(handlePingRequest(request));
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
@ -71,7 +72,7 @@ public class PublishClusterStateAction extends AbstractComponent {
new PublishClusterStateRequest(clusterState),
TransportRequestOptions.options().withHighType(),
new VoidTransportResponseHandler(false) {
new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
@ -112,11 +113,8 @@ public class PublishClusterStateAction extends AbstractComponent {
channel.sendResponse(VoidStreamable.INSTANCE);
}
/**
* No need to spawn, we add submit a new cluster state directly. This allows for faster application.
*/
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -191,13 +191,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (!ignoreTimeout && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
threadPool.schedule(new Runnable() {
threadPool.schedule(recoverAfterTime, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
if (recovered.compareAndSet(false, true)) {
gateway.performStateRecovery(recoveryListener);
}
}
}, recoverAfterTime, ThreadPool.ExecutionType.THREADED);
});
}
} else {
if (recovered.compareAndSet(false, true)) {

View File

@ -61,6 +61,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
return execute(new Request(nodesIds).timeout(timeout));
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return "/gateway/local/meta-state";
}

View File

@ -61,6 +61,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
return execute(new Request(nodesIds).timeout(timeout));
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return "/gateway/local/started-shards";
}

View File

@ -119,7 +119,7 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
}
} else {
if (httpHandler.spawn()) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
httpHandler.handleRequest(request, channel);

View File

@ -316,7 +316,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (logger.isDebugEnabled()) {
logger.debug("scheduling snapshot every [{}]", snapshotInterval);
}
snapshotScheduleFuture = threadPool.schedule(new SnapshotRunnable(), snapshotInterval, ThreadPool.ExecutionType.THREADED);
snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, new SnapshotRunnable());
}
}
@ -332,7 +332,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
}
// schedule it again
if (indexShard.state() != IndexShardState.CLOSED) {
snapshotScheduleFuture = threadPool.schedule(this, snapshotInterval, ThreadPool.ExecutionType.THREADED);
snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, this);
}
}
}

View File

@ -144,7 +144,7 @@ public class RecoverySource extends AbstractComponent {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
@ -167,7 +167,7 @@ public class RecoverySource extends AbstractComponent {
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead;
}
indexInput.close();
@ -195,7 +195,7 @@ public class RecoverySource extends AbstractComponent {
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
@ -212,7 +212,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
int totalOperations = sendSnapshot(snapshot);
@ -229,7 +229,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
@ -258,7 +258,7 @@ public class RecoverySource extends AbstractComponent {
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
counter = 0;
operations = Lists.newArrayList();
}
@ -266,7 +266,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
}
return totalOperations;
}
@ -280,26 +280,13 @@ public class RecoverySource extends AbstractComponent {
return new StartRecoveryRequest();
}
@Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
// we don't spawn, but we execute the expensive recovery process on a cached thread pool
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
// ignore
}
}
}
});
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public boolean spawn() {
return false;
@Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
}
}

View File

@ -278,6 +278,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryPrepareForTranslogOperationsRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
@ -299,6 +303,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryFinalizeRecoveryRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId());
@ -321,6 +329,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryTranslogOperationsRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
@ -344,6 +356,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryFilesInfoRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
@ -368,6 +384,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryCleanFilesRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (String existingFile : shard.store().directory().listAll()) {
@ -390,6 +410,10 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryFileChunkRequest();
}
@Override public String executor() {
return ThreadPool.Names.CACHED;
}
@Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());

View File

@ -581,7 +581,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private void startScheduledTasksIfNeeded() {
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(new EngineRefresher(), refreshInterval, ThreadPool.ExecutionType.DEFAULT);
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
logger.debug("scheduling refresher every {}", refreshInterval);
} else {
logger.debug("scheduled refresher disabled");
@ -590,7 +590,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (optimizeInterval.millis() > 0) {
optimizeScheduleFuture = threadPool.schedule(new EngineOptimizer(), optimizeInterval, ThreadPool.ExecutionType.THREADED);
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, new EngineOptimizer());
logger.debug("scheduling optimizer / merger every {}", optimizeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
@ -609,7 +609,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
if (!engine().refreshNeeded()) {
if (state != IndexShardState.CLOSED) {
refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT);
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this);
}
return;
}
@ -635,7 +635,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
logger.warn("Failed to perform scheduled engine refresh", e);
}
if (state != IndexShardState.CLOSED) {
refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT);
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, this);
}
}
});
@ -663,7 +663,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
if (state != IndexShardState.CLOSED) {
optimizeScheduleFuture = threadPool.schedule(this, optimizeInterval, ThreadPool.ExecutionType.THREADED);
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, this);
}
}
}

View File

@ -72,7 +72,7 @@ public class TranslogService extends AbstractIndexShardComponent {
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
this.future = threadPool.schedule(new TranslogBasedFlush(), interval, ThreadPool.ExecutionType.DEFAULT);
this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
}
@ -109,11 +109,11 @@ public class TranslogService extends AbstractIndexShardComponent {
return;
}
future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT);
future = threadPool.schedule(interval, ThreadPool.Names.SAME, this);
}
private void asyncFlushAndReschedule() {
threadPool.cached().execute(new Runnable() {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override public void run() {
try {
indexShard.flush(new Engine.Flush());
@ -127,7 +127,7 @@ public class TranslogService extends AbstractIndexShardComponent {
lastFlushTime = System.currentTimeMillis();
if (indexShard.state() != IndexShardState.CLOSED) {
future = threadPool.schedule(this, interval, ThreadPool.ExecutionType.DEFAULT);
future = threadPool.schedule(interval, ThreadPool.Names.SAME, this);
}
}
});

View File

@ -178,7 +178,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
try {
indicesService.deleteIndex(index, "deleting index");
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
}
@ -241,7 +241,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] creating index", indexMetaData.index());
}
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
nodeIndexCreatedAction.nodeIndexCreated(indexMetaData.index(), event.state().nodes().localNodeId());
}
@ -478,11 +478,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
@Override public void onRetryRecovery(TimeValue retryAfter) {
threadPool.schedule(new Runnable() {
threadPool.schedule(retryAfter, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
recoveryTarget.startRecovery(request, true, PeerRecoveryListener.this);
}
}, retryAfter, ThreadPool.ExecutionType.THREADED);
});
}
@Override public void onIgnoreRecovery(boolean removeShard, String reason) {

View File

@ -76,6 +76,10 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
return execute(new Request(shardId, onlyUnallocated, nodesIds).timeout(timeout));
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return "/cluster/nodes/indices/shard/store";
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StringStreamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.jmx.JmxService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -74,6 +75,10 @@ public class GetJmxServiceUrlAction extends AbstractComponent {
return VoidStreamable.INSTANCE;
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(VoidStreamable request, TransportChannel channel) throws Exception {
channel.sendResponse(new StringStreamable(jmxService.publishUrl()));
}

View File

@ -101,9 +101,6 @@ public class RestNodesInfoAction extends BaseRestHandler {
if (nodeInfo.network() != null) {
nodeInfo.network().toXContent(builder, request);
}
if (nodeInfo.threadPool() != null) {
nodeInfo.threadPool().toXContent(builder, request);
}
if (nodeInfo.transport() != null) {
nodeInfo.transport().toXContent(builder, request);
}

View File

@ -77,9 +77,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
if (nodeStats.network() != null) {
nodeStats.network().toXContent(builder, request);
}
if (nodeStats.threadPool() != null) {
nodeStats.threadPool().toXContent(builder, request);
}
if (nodeStats.transport() != null) {
nodeStats.transport().toXContent(builder, request);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
@ -76,7 +77,7 @@ public class PublishRiverClusterStateAction extends AbstractComponent {
continue;
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(false) {
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
@ -112,16 +113,13 @@ public class PublishRiverClusterStateAction extends AbstractComponent {
return new PublishClusterStateRequest();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);
}
/**
* No need to spawn, we add submit a new cluster state directly. This allows for faster application.
*/
@Override public boolean spawn() {
return false;
}
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
/**
@ -53,7 +54,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
private final ESLogger logger;
FreeContextResponseHandler(ESLogger logger) {
super(false);
super(ThreadPool.Names.SAME);
this.logger = logger;
}
@ -118,8 +119,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -148,8 +149,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -178,8 +179,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -208,8 +209,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -238,8 +239,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -268,8 +269,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -298,8 +299,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -328,8 +329,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
listener.onFailure(exp);
}
@Override public boolean spawn() {
return false;
@Override public String executor() {
return ThreadPool.Names.SAME;
}
});
}
@ -347,6 +348,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
searchService.freeContext(request.get());
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
@ -362,6 +367,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
DfsSearchResult result = searchService.executeDfsPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryTransportHandler extends BaseTransportRequestHandler<InternalSearchRequest> {
@ -376,6 +385,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
QuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryByIdTransportHandler extends BaseTransportRequestHandler<QuerySearchRequest> {
@ -390,6 +403,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
QuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {
@ -404,6 +421,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryFetchTransportHandler extends BaseTransportRequestHandler<InternalSearchRequest> {
@ -418,6 +439,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryQueryFetchTransportHandler extends BaseTransportRequestHandler<QuerySearchRequest> {
@ -432,6 +457,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchFetchByIdTransportHandler extends BaseTransportRequestHandler<FetchSearchRequest> {
@ -446,6 +475,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
private class SearchQueryFetchScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {
@ -460,5 +493,9 @@ public class SearchServiceTransportAction extends AbstractComponent {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
@Override public String executor() {
return ThreadPool.Names.SEARCH;
}
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
/**
* @author kimchy (Shay Banon)
*/
public interface FutureListener<T> {
void onResult(T result);
void onException(Exception e);
}

View File

@ -19,103 +19,210 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public interface ThreadPool extends Executor {
public class ThreadPool extends AbstractComponent {
ThreadPoolInfo info();
public static class Names {
public static final String SAME = "same";
public static final String CACHED = "cached";
public static final String INDEX = "index";
public static final String SEARCH = "search";
public static final String PERCOLATE = "percolate";
public static final String MANAGEMENT = "management";
public static final String SNAPSHOT = "snapshot";
}
ThreadPoolStats stats();
private final ImmutableMap<String, Executor> executors;
/**
* The minimum number of threads in the thread pool.
*/
int getMinThreads();
private final ScheduledExecutorService scheduler;
/**
* The maximum number of threads in the thread pool.
*/
int getMaxThreads();
public ThreadPool() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
/**
* The size of scheduler threads.
*/
int getSchedulerThreads();
@Inject public ThreadPool(Settings settings) {
super(settings);
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
int getPoolSize();
Map<String, Settings> groupSettings = settings.getGroups("threadpool");
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
int getActiveCount();
Map<String, Executor> executors = Maps.newHashMap();
executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build()));
executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SAME, MoreExecutors.sameThreadExecutor());
this.executors = ImmutableMap.copyOf(executors);
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]"));
}
/**
* The size of the scheduler thread pool.
*/
int getSchedulerPoolSize();
public Executor cached() {
return executor(Names.CACHED);
}
/**
* The approximate number of threads that are actively executing scheduled
* tasks.
*/
int getSchedulerActiveCount();
public Executor executor(String name) {
Executor executor = executors.get(name);
if (executor == null) {
throw new ElasticSearchIllegalArgumentException("No executor found for [" + name + "]");
}
return executor;
}
/**
* Returns <tt>true</tt> if the thread pool has started.
*/
boolean isStarted();
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
}
/**
* Returns a cached executor that will always allocate threads.
*/
Executor cached();
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
if (!Names.SAME.equals(name)) {
command = new ThreadedRunnable(command, executor(name));
}
return scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
}
void shutdownNow();
public void shutdown() {
scheduler.shutdown();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdown();
}
}
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*/
void shutdown();
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean result = scheduler.awaitTermination(timeout, unit);
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
result &= ((ThreadPoolExecutor) executor).awaitTermination(timeout, unit);
}
}
return result;
}
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public void shutdownNow() {
scheduler.shutdownNow();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdownNow();
}
}
}
void execute(Runnable command);
private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) {
if (settings == null) {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
}
String type = settings.get("type", defaultType);
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[" + name + "]");
if ("cached".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
} else if ("fixed".equals(type)) {
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
logger.debug("creating thread_pool [{}], type [{}], size [{}]", name, type, size);
return new ThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS,
new LinkedTransferQueue<Runnable>(),
threadFactory);
} else if ("scaling".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
return DynamicExecutors.newScalingThreadPool(min, size, keepAlive.millis(), threadFactory);
} else if ("blocking".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
SizeValue capacity = settings.getAsSize("capacity", defaultSettings.getAsSize("capacity", new SizeValue(0)));
TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60)));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, keepAlive, waitTime);
return DynamicExecutors.newBlockingThreadPool(min, size, keepAlive.millis(), (int) capacity.singles(), waitTime.millis(), threadFactory);
}
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
/**
* Scheduled a task. Note, when using {@link ExecutionType#DEFAULT}, make sure to not
* execute long running blocking tasks.
*/
ScheduledFuture<?> schedule(Runnable command, TimeValue delay, ExecutionType executionType);
class LoggingRunnable implements Runnable {
/**
* Schedule a repeating task with a task that is very short lived.
*/
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval);
private final Runnable runnable;
/**
* Returns an estimated current time in milliseconds.
*/
long estimatedCurrentTimeInMillis();
LoggingRunnable(Runnable runnable) {
this.runnable = runnable;
}
static enum ExecutionType {
DEFAULT,
THREADED
@Override public void run() {
try {
runnable.run();
} catch (Exception e) {
logger.warn("failed to run {}", e, runnable.toString());
}
}
@Override public int hashCode() {
return runnable.hashCode();
}
@Override public boolean equals(Object obj) {
return runnable.equals(obj);
}
@Override public String toString() {
return "[threaded] " + runnable.toString();
}
}
class ThreadedRunnable implements Runnable {
private final Runnable runnable;
private final Executor executor;
ThreadedRunnable(Runnable runnable, Executor executor) {
this.runnable = runnable;
this.executor = executor;
}
@Override public void run() {
executor.execute(runnable);
}
@Override public int hashCode() {
return runnable.hashCode();
}
@Override public boolean equals(Object obj) {
return runnable.equals(obj);
}
@Override public String toString() {
return "[threaded] " + runnable.toString();
}
}
}

View File

@ -1,141 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.Serializable;
/**
* Thread Pool Info.
*
* @author kimchy (shay.banon)
*/
public class ThreadPoolInfo implements Streamable, Serializable, ToXContent {
private String type;
private int minThreads;
private int maxThreads;
private int schedulerThreads;
ThreadPoolInfo() {
}
public ThreadPoolInfo(String type, int minThreads, int maxThreads, int schedulerThreads) {
this.type = type;
this.minThreads = minThreads;
this.maxThreads = maxThreads;
this.schedulerThreads = schedulerThreads;
}
public static ThreadPoolInfo readThreadPoolInfo(StreamInput in) throws IOException {
ThreadPoolInfo info = new ThreadPoolInfo();
info.readFrom(in);
return info;
}
@Override public void readFrom(StreamInput in) throws IOException {
type = in.readUTF();
minThreads = in.readInt();
maxThreads = in.readInt();
schedulerThreads = in.readInt();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(type);
out.writeInt(minThreads);
out.writeInt(maxThreads);
out.writeInt(schedulerThreads);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("thread_pool");
builder.field("type", type);
builder.field("min_threads", minThreads);
builder.field("max_threads", maxThreads);
builder.field("scheduler_threads", schedulerThreads);
builder.endObject();
return builder;
}
/**
* The type of the thread pool.
*/
public String type() {
return type;
}
/**
* The type of the thread pool.
*/
public String getType() {
return type();
}
/**
* The minimum number of threads in the thread pool.
*/
public int minThreads() {
return minThreads;
}
/**
* The minimum number of threads in the thread pool.
*/
public int getMinThreads() {
return minThreads();
}
/**
* The maximum number of threads in the thread pool.
*/
public int maxThreads() {
return maxThreads;
}
/**
* The maximum number of threads in the thread pool.
*/
public int getMaxThreads() {
return maxThreads();
}
/**
* The size of scheduler threads.
*/
public int schedulerThreads() {
return schedulerThreads;
}
/**
* The size of scheduler threads.
*/
public int getSchedulerThreads() {
return schedulerThreads();
}
}

View File

@ -19,18 +19,13 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
/**
* @author kimchy (shay.banon)
*/
public class ThreadPoolModule extends AbstractModule implements SpawnModules {
public class ThreadPoolModule extends AbstractModule {
private final Settings settings;
@ -38,10 +33,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules {
this.settings = settings;
}
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
}
@Override protected void configure() {
bind(ThreadPool.class).asEagerSingleton();
}
}

View File

@ -1,153 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.Serializable;
/**
* Thread Pool level stats.
*
* @author kimchy (shay.banon)
*/
public class ThreadPoolStats implements Streamable, Serializable, ToXContent {
private int poolSize;
private int activeCount;
private int schedulerPoolSize;
private int schedulerActiveCount;
ThreadPoolStats() {
}
public ThreadPoolStats(int poolSize, int activeCount, int schedulerPoolSize, int schedulerActiveCount) {
this.poolSize = poolSize;
this.activeCount = activeCount;
this.schedulerPoolSize = schedulerPoolSize;
this.schedulerActiveCount = schedulerActiveCount;
}
public static ThreadPoolStats readThreadPoolStats(StreamInput in) throws IOException {
ThreadPoolStats stats = new ThreadPoolStats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
poolSize = in.readVInt();
activeCount = in.readVInt();
schedulerPoolSize = in.readVInt();
schedulerActiveCount = in.readVInt();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(poolSize);
out.writeVInt(activeCount);
out.writeVInt(schedulerPoolSize);
out.writeVInt(schedulerActiveCount);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("thread_pool");
builder.field("pool_size", poolSize);
builder.field("active_count", activeCount);
builder.field("scheduler_pool_size", schedulerPoolSize);
builder.field("scheduler_active_count", schedulerActiveCount);
builder.endObject();
return builder;
}
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int poolSize() {
return poolSize;
}
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
return poolSize();
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int activeCount() {
return activeCount;
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
return activeCount();
}
/**
* The size of the scheduler thread pool.
*/
public int schedulerPoolSize() {
return schedulerPoolSize;
}
/**
* The size of the scheduler thread pool.
*/
public int getSchedulerPoolSize() {
return schedulerPoolSize();
}
/**
* The approximate number of threads that are actively executing scheduled
* tasks.
*/
public int schedulerActiveCount() {
return schedulerActiveCount;
}
/**
* The approximate number of threads that are actively executing scheduled
* tasks.
*/
public int getSchedulerActiveCount() {
return schedulerActiveCount();
}
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* A thread pool that will will block the execute if all threads are busy.
*
* @author kimchy (shay.banon)
*/
public class BlockingThreadPool extends AbstractThreadPool {
final int min;
final int max;
final int capacity;
final TimeValue waitTime;
final TimeValue keepAlive;
final int scheduledSize;
public BlockingThreadPool() {
this(EMPTY_SETTINGS);
}
@Inject public BlockingThreadPool(Settings settings) {
super(settings);
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1);
this.min = componentSettings.getAsInt("min", 10);
this.max = componentSettings.getAsInt("max", 100);
// capacity is set to 0 as it might cause starvation in blocking mode
this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles();
this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60));
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5));
logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
// executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
executorService = DynamicExecutors.newBlockingThreadPool(min, max, keepAlive.millis(), capacity, waitTime.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
}
@Override public String getType() {
return "blocking";
}
@Override public int getMinThreads() {
return min;
}
@Override public int getMaxThreads() {
return max;
}
@Override public int getSchedulerThreads() {
return scheduledSize;
}
@Override public int getPoolSize() {
if (executorService instanceof TransferThreadPoolExecutor) {
return ((TransferThreadPoolExecutor) executorService).getPoolSize();
} else {
return ((ThreadPoolExecutor) executorService).getPoolSize();
}
}
@Override public int getActiveCount() {
if (executorService instanceof TransferThreadPoolExecutor) {
return ((TransferThreadPoolExecutor) executorService).getActiveCount();
} else {
return ((ThreadPoolExecutor) executorService).getActiveCount();
}
}
@Override public int getSchedulerPoolSize() {
return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
}
@Override public int getSchedulerActiveCount() {
return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
}
}

View File

@ -1,83 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.jmx.MBean;
import org.elasticsearch.jmx.ManagedAttribute;
import org.elasticsearch.threadpool.ThreadPool;
/**
* @author kimchy (shay.banon)
*/
@MBean(objectName = "service=threadpool,threadpoolType=blocking", description = "Blocking Thread Pool")
public class BlockingThreadPoolManagement {
private final BlockingThreadPool threadPool;
@Inject public BlockingThreadPoolManagement(ThreadPool threadPool) {
this.threadPool = (BlockingThreadPool) threadPool;
}
@ManagedAttribute(description = "Minimum number Of threads")
public long getMin() {
return threadPool.min;
}
@ManagedAttribute(description = "Maximum number of threads")
public int getMax() {
return threadPool.max;
}
@ManagedAttribute(description = "Number of scheduler threads")
public int getScheduleSize() {
return threadPool.scheduledSize;
}
@ManagedAttribute(description = "Thread keep alive")
public String getKeepAlive() {
return threadPool.keepAlive.format();
}
@ManagedAttribute(description = "Thread keep alive (in seconds)")
public long getKeepAliveInSeconds() {
return threadPool.keepAlive.seconds();
}
@ManagedAttribute(description = "Current number of threads in the pool")
public long getPoolSize() {
return threadPool.getPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing tasks")
public long getActiveCount() {
return threadPool.getActiveCount();
}
@ManagedAttribute(description = "Current number of threads in the scheduler pool")
public long getSchedulerPoolSize() {
return threadPool.getSchedulerPoolSize();
}
@ManagedAttribute(description = "Approximate number of threads that are actively executing scheduled tasks")
public long getSchedulerActiveCount() {
return threadPool.getSchedulerActiveCount();
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.blocking;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.threadpool.ThreadPool;
/**
* @author kimchy (shay.banon)
*/
public class BlockingThreadPoolModule extends AbstractModule {
@Override protected void configure() {
bind(ThreadPool.class).to(BlockingThreadPool.class).asEagerSingleton();
bind(BlockingThreadPoolManagement.class).asEagerSingleton();
}
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool.cached;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* A thread pool that will create an unbounded number of threads.
*
* @author kimchy (shay.banon)
*/
public class CachedThreadPool extends AbstractThreadPool {
final TimeValue keepAlive;
final int scheduledSize;
public CachedThreadPool() {
this(EMPTY_SETTINGS);
}
@Inject public CachedThreadPool(Settings settings) {
super(settings);
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 1);
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueMinutes(5));
logger.debug("Initializing {} thread pool with keep_alive[{}], scheduled_size[{}]", getType(), keepAlive, scheduledSize);
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = executorService;
started = true;
}
@Override public String getType() {
return "cached";
}
@Override public int getMinThreads() {
return 0;
}
@Override public int getMaxThreads() {
return -1;
}
@Override public int getSchedulerThreads() {
return scheduledSize;
}
@Override public int getPoolSize() {
return ((ThreadPoolExecutor) executorService).getPoolSize();
}
@Override public int getActiveCount() {
return ((ThreadPoolExecutor) executorService).getActiveCount();
}
@Override public int getSchedulerPoolSize() {
return ((ThreadPoolExecutor) scheduledExecutorService).getPoolSize();
}
@Override public int getSchedulerActiveCount() {
return ((ThreadPoolExecutor) scheduledExecutorService).getActiveCount();
}
}

Some files were not shown because too many files have changed in this diff Show More