rename the cached thread pool to generic (from cached), since really, cached is meaningless, and its actually a generic thread pool we use for different operations

This commit is contained in:
Shay Banon 2012-03-09 20:32:33 +02:00
parent 7cc9108192
commit c08b968246
44 changed files with 86 additions and 95 deletions

View File

@ -62,7 +62,7 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
@Override
public String executor() {
if (request.listenerThreaded()) {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
return ThreadPool.Names.SAME;
}

View File

@ -52,7 +52,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -70,7 +70,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -119,7 +119,7 @@ public class TransportNodesRestartAction extends TransportNodesOperationAction<N
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
}
logger.info("Restarting in [{}]", request.delay);
threadPool.schedule(request.delay, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(request.delay, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
boolean restartWithWrapper = false;

View File

@ -69,7 +69,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -51,7 +51,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -51,7 +51,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -108,9 +108,9 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
private void executeListener(final Object listener) {
if (listenerThreaded) {
if (listener instanceof Runnable) {
threadPool.cached().execute((Runnable) listener);
threadPool.generic().execute((Runnable) listener);
} else {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
ActionListener<T> lst = (ActionListener<T>) listener;

View File

@ -79,7 +79,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
@Override
public void onResponse(final Response response) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
@ -93,7 +93,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
@Override
public void onFailure(final Throwable e) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
listener.onFailure(e);

View File

@ -127,7 +127,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
private void start() {
if (nodesIds.length == 0) {
// nothing to notify
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
listener.onResponse(newResponse(request, responses));

View File

@ -100,7 +100,7 @@ public class TransportClientNodesService extends AbstractComponent {
} else {
this.nodesSampler = new SimpleNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, new ScheduledNodeSampler());
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
@ -248,7 +248,7 @@ public class TransportClientNodesService extends AbstractComponent {
try {
nodesSampler.sample();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
}
} catch (Exception e) {
logger.warn("failed to sample", e);

View File

@ -42,8 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*
*
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
@ -63,7 +61,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -64,7 +64,7 @@ public class NodeAliasesUpdatedAction extends AbstractComponent {
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
boolean removed = listeners.remove(listener);
@ -82,7 +82,7 @@ public class NodeAliasesUpdatedAction extends AbstractComponent {
public void nodeAliasesUpdated(final NodeAliasesUpdatedResponse response) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeAliasesUpdated(response);

View File

@ -72,7 +72,7 @@ public class NodeIndexCreatedAction extends AbstractComponent {
public void nodeIndexCreated(final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexCreated(index, nodeId);

View File

@ -72,7 +72,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
public void nodeIndexDeleted(final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexDeleted(index, nodeId);

View File

@ -64,7 +64,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
boolean removed = listeners.remove(listener);
@ -82,7 +82,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public void nodeMappingCreated(final NodeMappingCreatedResponse response) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexCreated(response);

View File

@ -65,7 +65,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerMappingRefresh(request);

View File

@ -114,7 +114,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() throws ElasticSearchException {
this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build();
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.CACHED, new ReconnectToNodes());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
}
@Override
@ -181,7 +181,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return;
}
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.CACHED, notifyTimeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
onGoingTimeouts.add(notifyTimeout);
clusterStateListeners.add(listener);
// call the post added notification on the same event thread
@ -304,7 +304,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
if (!nodesDelta.removedNodes().isEmpty()) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (DiscoveryNode node : nodesDelta.removedNodes()) {
@ -387,7 +387,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
if (lifecycle.started()) {
reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.CACHED, this);
reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this);
}
}
}

View File

@ -258,7 +258,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// we are already joining, ignore...
return;
}
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
currentJoinThread = Thread.currentThread();

View File

@ -41,8 +41,6 @@ import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection that pings the master periodically to see if its alive.
*
*
*/
public class MasterFaultDetection extends AbstractComponent {
@ -214,7 +212,7 @@ public class MasterFaultDetection extends AbstractComponent {
}
private void notifyDisconnectedFromMaster() {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
@ -226,7 +224,7 @@ public class MasterFaultDetection extends AbstractComponent {
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {

View File

@ -42,8 +42,6 @@ import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection of multiple nodes.
*
*
*/
public class NodesFaultDetection extends AbstractComponent {
@ -180,7 +178,7 @@ public class NodesFaultDetection extends AbstractComponent {
}
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {

View File

@ -159,7 +159,7 @@ public class MembershipAction extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
}

View File

@ -229,7 +229,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
@Override
public void ping(final PingListener listener, final TimeValue timeout) {
if (!pingEnabled) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
listener.onPing(new PingResponse[0]);
@ -242,7 +242,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
try {
@ -252,7 +252,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
});
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
@ -522,7 +522,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible

View File

@ -168,11 +168,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);

View File

@ -194,7 +194,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
}
final boolean fIgnoreRecoverAfterTime = ignoreRecoverAfterTime;
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
performStateRecovery(fIgnoreRecoverAfterTime);
@ -210,7 +210,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (!ignoreRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
if (recovered.compareAndSet(false, true)) {

View File

@ -65,7 +65,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -66,7 +66,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -79,7 +79,7 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
@Override
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
logger.debug("reading state from gateway {} ...", this);

View File

@ -144,7 +144,7 @@ public class SimpleBloomCache extends AbstractIndexComponent implements BloomCac
filter.loading.set(true);
BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName);
if (asyncLoad) {
threadPool.cached().execute(loader);
threadPool.generic().execute(loader);
} else {
loader.run();
filter = fieldCache.get(fieldName);
@ -159,7 +159,7 @@ public class SimpleBloomCache extends AbstractIndexComponent implements BloomCac
// do the async loading
BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName);
if (asyncLoad) {
threadPool.cached().execute(loader);
threadPool.generic().execute(loader);
} else {
loader.run();
filter = fieldCache.get(fieldName);

View File

@ -166,7 +166,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
recoveryStatus = new RecoveryStatus();

View File

@ -244,7 +244,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
for (final int shardId : shardIds) {
executor = executor == null ? threadPool.cached() : executor;
executor = executor == null ? threadPool.generic() : executor;
executor.execute(new Runnable() {
@Override
public void run() {

View File

@ -646,7 +646,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRetryRecovery(TimeValue retryAfter) {
threadPool.schedule(retryAfter, ThreadPool.Names.CACHED, new Runnable() {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
recoveryTarget.startRecovery(request, true, PeerRecoveryListener.this);
@ -719,7 +719,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return;
}
final ShardRouting fShardRouting = shardRouting;
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {

View File

@ -282,7 +282,7 @@ public class RecoverySource extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -148,7 +148,7 @@ public class RecoveryTarget extends AbstractComponent {
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
doRecovery(shard, request, fromRetry, listener);
@ -298,7 +298,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -326,7 +326,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -355,7 +355,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -385,7 +385,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -416,7 +416,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -484,7 +484,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -78,7 +78,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
@Override
protected String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -97,7 +97,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
ImmutableSet<RiverName> indices = ImmutableSet.copyOf(this.rivers.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size());
for (final RiverName riverName : indices) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {

View File

@ -58,7 +58,7 @@ public class ThreadPool extends AbstractComponent {
public static class Names {
public static final String SAME = "same";
public static final String CACHED = "cached";
public static final String GENERIC = "generic";
public static final String INDEX = "index";
public static final String BULK = "bulk";
public static final String SEARCH = "search";
@ -87,7 +87,7 @@ public class ThreadPool extends AbstractComponent {
Map<String, Settings> groupSettings = settings.getGroups("threadpool");
Map<String, ExecutorHolder> executors = Maps.newHashMap();
executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build()));
executors.put(Names.GENERIC, build(Names.GENERIC, "cached", groupSettings.get(Names.GENERIC), settingsBuilder().put("keep_alive", "30s").build()));
executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.BULK, build(Names.BULK, "cached", groupSettings.get(Names.BULK), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
@ -147,8 +147,8 @@ public class ThreadPool extends AbstractComponent {
return new ThreadPoolStats(stats);
}
public Executor cached() {
return executor(Names.CACHED);
public Executor generic() {
return executor(Names.GENERIC);
}
public Executor executor(String name) {

View File

@ -179,7 +179,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
try {
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId);
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.CACHED, timeoutHandler);
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutHandler));
transport.sendRequest(node, requestId, action, message, options);
@ -198,7 +198,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
threadPool.executor(ThreadPool.Names.CACHED).execute(new Runnable() {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
handler.handleException(sendRequestException);
@ -275,7 +275,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
public void raiseNodeConnected(final DiscoveryNode node) {
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) {
@ -290,7 +290,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
if (lifecycle.stoppedOrClosed()) {
return;
}
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) {
@ -304,7 +304,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));

View File

@ -28,7 +28,6 @@ import org.elasticsearch.threadpool.ThreadPool;
public class VoidTransportResponseHandler implements TransportResponseHandler<VoidStreamable> {
public static final VoidTransportResponseHandler INSTANCE_SAME = new VoidTransportResponseHandler(ThreadPool.Names.SAME);
public static final VoidTransportResponseHandler INSTANCE_CACHED = new VoidTransportResponseHandler(ThreadPool.Names.CACHED);
private final String executor;

View File

@ -178,7 +178,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
transportServiceAdapter.sent(data.length);
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);

View File

@ -75,7 +75,7 @@ public class LocalTransportChannel implements TransportChannel {
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
final byte[] data = cachedEntry.bytes().copiedByteArray();
targetTransport.threadPool().cached().execute(new Runnable() {
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
@ -107,7 +107,7 @@ public class LocalTransportChannel implements TransportChannel {
too.close();
}
final byte[] data = stream.copiedByteArray();
targetTransport.threadPool().cached().execute(new Runnable() {
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);

View File

@ -77,8 +77,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
* There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or
* batch) with high payload that will cause regular request. (like search or single index) to take
* longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD).
*
*
*/
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
@ -315,7 +313,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
protected void doStop() throws ElasticSearchException {
final CountDownLatch latch = new CountDownLatch(1);
// make sure we run it on another thread than a possible IO handler thread
threadPool.cached().execute(new Runnable() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {

View File

@ -67,7 +67,7 @@ public class BenchmarkNettyLargeMessages {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override

View File

@ -56,7 +56,7 @@ public class TransportBenchmark {
}
public static void main(String[] args) {
final String executor = ThreadPool.Names.CACHED;
final String executor = ThreadPool.Names.GENERIC;
final boolean waitForRequest = true;
final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES);
final int NUMBER_OF_CLIENTS = 1;

View File

@ -79,7 +79,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -103,7 +103,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -138,7 +138,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -161,7 +161,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -195,7 +195,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -219,7 +219,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -254,7 +254,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -273,7 +273,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -326,7 +326,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -351,7 +351,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -385,7 +385,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -414,7 +414,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override
@ -450,7 +450,7 @@ public abstract class AbstractSimpleTransportTests {
@Override
public String executor() {
return ThreadPool.Names.CACHED;
return ThreadPool.Names.GENERIC;
}
@Override