[CORE] Add AbstractRunnable support to ThreadPool to simplify async operation on bounded threadpools
today we have to catch rejected operation exceptions in various places and notify an ActionListener. This pattern is error prone and adds a lot of boilerplait code. It's also easy to miss catching this exception which only is relevant if nodes are under high load. This commit adds infrastructure that makes ActionListener first class citizen on async actions. Closes #7765
This commit is contained in:
parent
b2477a43c8
commit
d3e348ef90
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
||||
/**
|
||||
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Throwable)} in case an uncaught
|
||||
* exception or error is thrown while the actual action is run.
|
||||
*/
|
||||
public abstract class ActionRunnable<Response> implements Runnable {
|
||||
public abstract class ActionRunnable<Response> extends AbstractRunnable {
|
||||
|
||||
protected final ActionListener<Response> listener;
|
||||
|
||||
|
@ -31,13 +33,11 @@ public abstract class ActionRunnable<Response> implements Runnable {
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
public final void run() {
|
||||
try {
|
||||
doRun();
|
||||
} catch (Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
/**
|
||||
* Calls the action listeners {@link ActionListener#onFailure(Throwable)} method with the given exception.
|
||||
* This method is invoked for all exception thrown by {@link #doRun()}
|
||||
*/
|
||||
public void onFailure(Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
|
||||
protected abstract void doRun();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.search.type;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.search.ReduceSearchPhaseException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -41,6 +42,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
|
|||
import org.elasticsearch.search.query.QuerySearchRequest;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
@ -121,31 +123,28 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
|
|||
}
|
||||
|
||||
private void finishHim() {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
super.onFailure(t);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.search.type;
|
|||
import com.carrotsearch.hppc.IntArrayList;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.search.ReduceSearchPhaseException;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
@ -47,6 +48,7 @@ import org.elasticsearch.search.query.QuerySearchRequest;
|
|||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
@ -193,36 +195,32 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
|||
}
|
||||
|
||||
private void finishHim() {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
} finally {
|
||||
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
try {
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
|
||||
} finally {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
try {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
super.onFailure(failure);
|
||||
} finally {
|
||||
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.search.type;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.search.ReduceSearchPhaseException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -37,6 +38,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
|
|||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
|
||||
|
||||
/**
|
||||
|
@ -73,31 +76,28 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
|
|||
|
||||
@Override
|
||||
protected void moveToSecondPhase() throws Exception {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
super.onFailure(failure);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.search.type;
|
|||
import com.carrotsearch.hppc.IntArrayList;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.search.ReduceSearchPhaseException;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
|
|||
import org.elasticsearch.search.query.QuerySearchResultProvider;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
@ -138,35 +140,31 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
|
|||
}
|
||||
|
||||
private void finishHim() {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
} finally {
|
||||
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
|
||||
@Override
|
||||
public void doRun() throws IOException {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
try {
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
|
||||
} finally {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
try {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
super.onFailure(failure);
|
||||
} finally {
|
||||
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -733,7 +733,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
try {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
protected void doRun() {
|
||||
try {
|
||||
shardOperationOnReplica(shardRequest);
|
||||
} catch (Throwable e) {
|
||||
|
@ -749,6 +749,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
public boolean isForceExecution() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
failReplicaIfNeeded(shard.index(), shard.id(), e);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.lucene.util.CollectionUtil;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
|
@ -139,27 +140,16 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
createIndex(request, listener, mdLock);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
|
||||
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
createIndex(request, listener, mdLock);
|
||||
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
|
||||
@Override
|
||||
public void doRun() throws InterruptedException {
|
||||
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
|
||||
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
|
||||
return;
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
createIndex(request, listener, mdLock);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void validateIndexName(String index, ClusterState state) throws ElasticsearchException {
|
||||
|
|
|
@ -30,4 +30,34 @@ public abstract class AbstractRunnable implements Runnable {
|
|||
public boolean isForceExecution() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public final void run() {
|
||||
try {
|
||||
doRun();
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.interrupted();
|
||||
onFailure(ex);
|
||||
} catch (Throwable t) {
|
||||
onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is invoked for all exception thrown by {@link #doRun()}
|
||||
*/
|
||||
public abstract void onFailure(Throwable t);
|
||||
|
||||
/**
|
||||
* This should be executed if the thread-pool executing this action rejected the execution.
|
||||
* The default implementation forwards to {@link #onFailure(Throwable)}
|
||||
*/
|
||||
public void onRejection(Throwable t) {
|
||||
onFailure(t);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method has the same semantics as {@link Runnable#run()}
|
||||
* @throws InterruptedException if the run method throws an InterruptedException
|
||||
*/
|
||||
protected abstract void doRun() throws Exception;
|
||||
}
|
||||
|
|
|
@ -20,11 +20,9 @@
|
|||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
|
||||
|
@ -75,4 +73,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||
public void onTerminated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
try {
|
||||
super.execute(command);
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
if (command instanceof AbstractRunnable) {
|
||||
// If we are an abstract runnable we can handle the rejection
|
||||
// directly and don't need to rethrow it.
|
||||
((AbstractRunnable)command).onRejection(ex);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -127,7 +129,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
|
||||
executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue()));
|
||||
}
|
||||
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same")));
|
||||
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same")));
|
||||
if (!executors.get(Names.GENERIC).info.getType().equals("cached")) {
|
||||
throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached");
|
||||
}
|
||||
|
@ -183,8 +185,8 @@ public class ThreadPool extends AbstractComponent {
|
|||
long rejected = -1;
|
||||
int largest = -1;
|
||||
long completed = -1;
|
||||
if (holder.executor instanceof ThreadPoolExecutor) {
|
||||
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor;
|
||||
if (holder.executor() instanceof ThreadPoolExecutor) {
|
||||
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
|
||||
threads = threadPoolExecutor.getPoolSize();
|
||||
queue = threadPoolExecutor.getQueue().size();
|
||||
active = threadPoolExecutor.getActiveCount();
|
||||
|
@ -205,7 +207,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
}
|
||||
|
||||
public Executor executor(String name) {
|
||||
Executor executor = executors.get(name).executor;
|
||||
Executor executor = executors.get(name).executor();
|
||||
if (executor == null) {
|
||||
throw new ElasticsearchIllegalArgumentException("No executor found for [" + name + "]");
|
||||
}
|
||||
|
@ -232,8 +234,8 @@ public class ThreadPool extends AbstractComponent {
|
|||
estimatedTimeThread.interrupt();
|
||||
scheduler.shutdown();
|
||||
for (ExecutorHolder executor : executors.values()) {
|
||||
if (executor.executor instanceof ThreadPoolExecutor) {
|
||||
((ThreadPoolExecutor) executor.executor).shutdown();
|
||||
if (executor.executor() instanceof ThreadPoolExecutor) {
|
||||
((ThreadPoolExecutor) executor.executor()).shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -243,24 +245,24 @@ public class ThreadPool extends AbstractComponent {
|
|||
estimatedTimeThread.interrupt();
|
||||
scheduler.shutdownNow();
|
||||
for (ExecutorHolder executor : executors.values()) {
|
||||
if (executor.executor instanceof ThreadPoolExecutor) {
|
||||
((ThreadPoolExecutor) executor.executor).shutdownNow();
|
||||
if (executor.executor() instanceof ThreadPoolExecutor) {
|
||||
((ThreadPoolExecutor) executor.executor()).shutdownNow();
|
||||
}
|
||||
}
|
||||
while (!retiredExecutors.isEmpty()) {
|
||||
((ThreadPoolExecutor) retiredExecutors.remove().executor).shutdownNow();
|
||||
((ThreadPoolExecutor) retiredExecutors.remove().executor()).shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
boolean result = scheduler.awaitTermination(timeout, unit);
|
||||
for (ExecutorHolder executor : executors.values()) {
|
||||
if (executor.executor instanceof ThreadPoolExecutor) {
|
||||
result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit);
|
||||
if (executor.executor() instanceof ThreadPoolExecutor) {
|
||||
result &= ((ThreadPoolExecutor) executor.executor()).awaitTermination(timeout, unit);
|
||||
}
|
||||
}
|
||||
while (!retiredExecutors.isEmpty()) {
|
||||
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor).awaitTermination(timeout, unit);
|
||||
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -294,8 +296,8 @@ public class ThreadPool extends AbstractComponent {
|
|||
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
|
||||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
|
||||
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
||||
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null));
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
||||
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null));
|
||||
}
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
|
@ -322,9 +324,9 @@ public class ThreadPool extends AbstractComponent {
|
|||
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
|
||||
if (previousInfo.getMax() != updatedSize) {
|
||||
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
|
||||
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize));
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
||||
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize));
|
||||
}
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
|
@ -352,15 +354,15 @@ public class ThreadPool extends AbstractComponent {
|
|||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
|
||||
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
||||
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (previousInfo.getMin() != updatedMin) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedMin);
|
||||
}
|
||||
if (previousInfo.getMax() != updatedSize) {
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
|
||||
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
||||
}
|
||||
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null));
|
||||
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null));
|
||||
}
|
||||
return previousExecutorHolder;
|
||||
}
|
||||
|
@ -404,9 +406,9 @@ public class ThreadPool extends AbstractComponent {
|
|||
ExecutorHolder newExecutorHolder = rebuild(executor.getKey(), oldExecutorHolder, updatedSettings, executor.getValue());
|
||||
if (!oldExecutorHolder.equals(newExecutorHolder)) {
|
||||
executors = newMapBuilder(executors).put(executor.getKey(), newExecutorHolder).immutableMap();
|
||||
if (!oldExecutorHolder.executor.equals(newExecutorHolder.executor) && oldExecutorHolder.executor instanceof EsThreadPoolExecutor) {
|
||||
if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) {
|
||||
retiredExecutors.add(oldExecutorHolder);
|
||||
((EsThreadPoolExecutor) oldExecutorHolder.executor).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
|
||||
((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -535,13 +537,18 @@ public class ThreadPool extends AbstractComponent {
|
|||
}
|
||||
|
||||
static class ExecutorHolder {
|
||||
public final Executor executor;
|
||||
private final Executor executor;
|
||||
public final Info info;
|
||||
|
||||
ExecutorHolder(Executor executor, Info info) {
|
||||
assert executor instanceof EsThreadPoolExecutor || executor == MoreExecutors.directExecutor();
|
||||
this.executor = executor;
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
Executor executor() {
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Info implements Streamable, ToXContent {
|
||||
|
|
|
@ -268,27 +268,28 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
//noinspection unchecked
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} catch (Throwable e) {
|
||||
if (lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
protected void doRun() throws Exception {
|
||||
//noinspection unchecked
|
||||
handler.messageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return handler.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -270,25 +270,26 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} catch (Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
protected void doRun() throws Exception {
|
||||
handler.messageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return handler.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -73,7 +72,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
|
|||
final CountDownLatch exec3Wait = new CountDownLatch(1);
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
protected void doRun() {
|
||||
executed3.set(true);
|
||||
exec3Wait.countDown();
|
||||
}
|
||||
|
@ -82,6 +81,11 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
|
|||
public boolean isForceExecution() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
throw new AssertionError(t);
|
||||
}
|
||||
});
|
||||
|
||||
wait.countDown();
|
||||
|
|
|
@ -237,12 +237,13 @@ public class MockTransportService extends TransportService {
|
|||
|
||||
threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
original.sendRequest(node, requestId, action, clonedRequest, options);
|
||||
} catch (Throwable e) {
|
||||
logger.debug("failed to send delayed request", e);
|
||||
}
|
||||
public void onFailure(Throwable e) {
|
||||
logger.debug("failed to send delayed request", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws IOException {
|
||||
original.sendRequest(node, requestId, action, clonedRequest, options);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -155,26 +155,26 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} catch (Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
protected void doRun() throws Exception {
|
||||
handler.messageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return handler.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
} }
|
||||
}
|
||||
});
|
||||
return pipeline;
|
||||
|
|
Loading…
Reference in New Issue