From d3e348ef90cccb1b82802089baa6938c45beab29 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 17 Sep 2014 10:25:49 +0200 Subject: [PATCH] [CORE] Add AbstractRunnable support to ThreadPool to simplify async operation on bounded threadpools today we have to catch rejected operation exceptions in various places and notify an ActionListener. This pattern is error prone and adds a lot of boilerplait code. It's also easy to miss catching this exception which only is relevant if nodes are under high load. This commit adds infrastructure that makes ActionListener first class citizen on async actions. Closes #7765 --- .../elasticsearch/action/ActionRunnable.java | 18 +++--- ...TransportSearchDfsQueryAndFetchAction.java | 47 ++++++++-------- ...ransportSearchDfsQueryThenFetchAction.java | 52 +++++++++--------- .../TransportSearchQueryAndFetchAction.java | 48 ++++++++-------- .../TransportSearchQueryThenFetchAction.java | 50 ++++++++--------- ...nsportShardReplicationOperationAction.java | 5 +- .../metadata/MetaDataCreateIndexService.java | 30 ++++------ .../util/concurrent/AbstractRunnable.java | 30 ++++++++++ .../util/concurrent/EsThreadPoolExecutor.java | 20 +++++-- .../elasticsearch/threadpool/ThreadPool.java | 55 +++++++++++-------- .../transport/local/LocalTransport.java | 31 ++++++----- .../netty/MessageChannelHandler.java | 29 +++++----- .../util/concurrent/EsExecutorsTests.java | 8 ++- .../test/transport/MockTransportService.java | 13 +++-- .../test/transport/NettyTransportTests.java | 28 +++++----- 15 files changed, 254 insertions(+), 210 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/ActionRunnable.java b/src/main/java/org/elasticsearch/action/ActionRunnable.java index 3b21cd87751..58e79a37701 100644 --- a/src/main/java/org/elasticsearch/action/ActionRunnable.java +++ b/src/main/java/org/elasticsearch/action/ActionRunnable.java @@ -19,11 +19,13 @@ package org.elasticsearch.action; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; + /** * Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Throwable)} in case an uncaught * exception or error is thrown while the actual action is run. */ -public abstract class ActionRunnable implements Runnable { +public abstract class ActionRunnable extends AbstractRunnable { protected final ActionListener listener; @@ -31,13 +33,11 @@ public abstract class ActionRunnable implements Runnable { this.listener = listener; } - public final void run() { - try { - doRun(); - } catch (Throwable t) { - listener.onFailure(t); - } + /** + * Calls the action listeners {@link ActionListener#onFailure(Throwable)} method with the given exception. + * This method is invoked for all exception thrown by {@link #doRun()} + */ + public void onFailure(Throwable t) { + listener.onFailure(t); } - - protected abstract void doRun(); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 56cbca515d3..1b1dea36ec9 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.search.ReduceSearchPhaseException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -41,6 +42,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -121,31 +123,28 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc } private void finishHim() { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - boolean useScroll = !useSlowScroll && request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + boolean useScroll = !useSlowScroll && request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } - }); - } catch (EsRejectedExecutionException ex) { - listener.onFailure(ex); - } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(t); + } + }); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 1d46f003686..839691e97a1 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.search.type; import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.search.ReduceSearchPhaseException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; @@ -47,6 +48,7 @@ import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -193,36 +195,32 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA } private void finishHim() { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } - }); - } catch (EsRejectedExecutionException ex) { - try { + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } finally { - listener.onFailure(ex); } - } + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + } + }); + + } } - } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index 167c9924924..bf71d893e0c 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.search.ReduceSearchPhaseException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -37,6 +38,8 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; + import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; /** @@ -73,31 +76,28 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio @Override protected void moveToSecondPhase() throws Exception { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - boolean useScroll = !useSlowScroll && request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + boolean useScroll = !useSlowScroll && request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = buildScrollId(request.searchType(), firstResults, null); } - }); - } catch (EsRejectedExecutionException ex) { - listener.onFailure(ex); - } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); } } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a8452ed6c77..386911ef1c5 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.search.type; import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.search.ReduceSearchPhaseException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; @@ -44,6 +45,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -138,35 +140,31 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi } private void finishHim() { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } catch (Throwable e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - listener.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); } - }); - } catch (EsRejectedExecutionException ex) { - try { + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } finally { - listener.onFailure(ex); } - } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + } + }); } } } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index eed327e6f6b..bffc97fa763 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -733,7 +733,7 @@ public abstract class TransportShardReplicationOperationAction executor : defaultExecutorTypeSettings.entrySet()) { executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue())); } - executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same"))); + executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same"))); if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached"); } @@ -183,8 +185,8 @@ public class ThreadPool extends AbstractComponent { long rejected = -1; int largest = -1; long completed = -1; - if (holder.executor instanceof ThreadPoolExecutor) { - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor; + if (holder.executor() instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor(); threads = threadPoolExecutor.getPoolSize(); queue = threadPoolExecutor.getQueue().size(); active = threadPoolExecutor.getActiveCount(); @@ -205,7 +207,7 @@ public class ThreadPool extends AbstractComponent { } public Executor executor(String name) { - Executor executor = executors.get(name).executor; + Executor executor = executors.get(name).executor(); if (executor == null) { throw new ElasticsearchIllegalArgumentException("No executor found for [" + name + "]"); } @@ -232,8 +234,8 @@ public class ThreadPool extends AbstractComponent { estimatedTimeThread.interrupt(); scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { - if (executor.executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor.executor).shutdown(); + if (executor.executor() instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor.executor()).shutdown(); } } } @@ -243,24 +245,24 @@ public class ThreadPool extends AbstractComponent { estimatedTimeThread.interrupt(); scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { - if (executor.executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor.executor).shutdownNow(); + if (executor.executor() instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor.executor()).shutdownNow(); } } while (!retiredExecutors.isEmpty()) { - ((ThreadPoolExecutor) retiredExecutors.remove().executor).shutdownNow(); + ((ThreadPoolExecutor) retiredExecutors.remove().executor()).shutdownNow(); } } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { boolean result = scheduler.awaitTermination(timeout, unit); for (ExecutorHolder executor : executors.values()) { - if (executor.executor instanceof ThreadPoolExecutor) { - result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit); + if (executor.executor() instanceof ThreadPoolExecutor) { + result &= ((ThreadPoolExecutor) executor.executor()).awaitTermination(timeout, unit); } } while (!retiredExecutors.isEmpty()) { - result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor).awaitTermination(timeout, unit); + result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit); } return result; } @@ -294,8 +296,8 @@ public class ThreadPool extends AbstractComponent { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null)); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -322,9 +324,9 @@ public class ThreadPool extends AbstractComponent { int updatedSize = settings.getAsInt("size", previousInfo.getMax()); if (previousInfo.getMax() != updatedSize) { logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize); - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize)); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize)); } return previousExecutorHolder; } @@ -352,15 +354,15 @@ public class ThreadPool extends AbstractComponent { if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); } if (previousInfo.getMin() != updatedMin) { - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedMin); } if (previousInfo.getMax() != updatedSize) { - ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); + ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -404,9 +406,9 @@ public class ThreadPool extends AbstractComponent { ExecutorHolder newExecutorHolder = rebuild(executor.getKey(), oldExecutorHolder, updatedSettings, executor.getValue()); if (!oldExecutorHolder.equals(newExecutorHolder)) { executors = newMapBuilder(executors).put(executor.getKey(), newExecutorHolder).immutableMap(); - if (!oldExecutorHolder.executor.equals(newExecutorHolder.executor) && oldExecutorHolder.executor instanceof EsThreadPoolExecutor) { + if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) { retiredExecutors.add(oldExecutorHolder); - ((EsThreadPoolExecutor) oldExecutorHolder.executor).shutdown(new ExecutorShutdownListener(oldExecutorHolder)); + ((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder)); } } } @@ -535,13 +537,18 @@ public class ThreadPool extends AbstractComponent { } static class ExecutorHolder { - public final Executor executor; + private final Executor executor; public final Info info; ExecutorHolder(Executor executor, Info info) { + assert executor instanceof EsThreadPoolExecutor || executor == MoreExecutors.directExecutor(); this.executor = executor; this.info = info; } + + Executor executor() { + return executor; + } } public static class Info implements Streamable, ToXContent { diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 627f37c61a4..74b0d278bbe 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -268,27 +268,28 @@ public class LocalTransport extends AbstractLifecycleComponent implem } else { threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @Override - public void run() { - try { - //noinspection unchecked - handler.messageReceived(request, transportChannel); - } catch (Throwable e) { - if (lifecycleState() == Lifecycle.State.STARTED) { - // we can only send a response transport is started.... - try { - transportChannel.sendResponse(e); - } catch (Throwable e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); - } - } - } + protected void doRun() throws Exception { + //noinspection unchecked + handler.messageReceived(request, transportChannel); } @Override public boolean isForceExecution() { return handler.isForceExecution(); } + + @Override + public void onFailure(Throwable e) { + if (lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } }); } } catch (Throwable e) { diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 3f489c23013..b1063304473 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -270,25 +270,26 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { @SuppressWarnings({"unchecked"}) @Override - public void run() { - try { - handler.messageReceived(request, transportChannel); - } catch (Throwable e) { - if (transport.lifecycleState() == Lifecycle.State.STARTED) { - // we can only send a response transport is started.... - try { - transportChannel.sendResponse(e); - } catch (Throwable e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); - } - } - } + protected void doRun() throws Exception { + handler.messageReceived(request, transportChannel); } @Override public boolean isForceExecution() { return handler.isForceExecution(); } + + @Override + public void onFailure(Throwable e) { + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } } } diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index d35adf63687..e7a018bb783 100644 --- a/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import com.google.common.base.Predicate; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -73,7 +72,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase { final CountDownLatch exec3Wait = new CountDownLatch(1); executor.execute(new AbstractRunnable() { @Override - public void run() { + protected void doRun() { executed3.set(true); exec3Wait.countDown(); } @@ -82,6 +81,11 @@ public class EsExecutorsTests extends ElasticsearchTestCase { public boolean isForceExecution() { return true; } + + @Override + public void onFailure(Throwable t) { + throw new AssertionError(t); + } }); wait.countDown(); diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index cf088bab476..37f7fb92d01 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -237,12 +237,13 @@ public class MockTransportService extends TransportService { threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override - public void run() { - try { - original.sendRequest(node, requestId, action, clonedRequest, options); - } catch (Throwable e) { - logger.debug("failed to send delayed request", e); - } + public void onFailure(Throwable e) { + logger.debug("failed to send delayed request", e); + } + + @Override + protected void doRun() throws IOException { + original.sendRequest(node, requestId, action, clonedRequest, options); } }); } diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java index 9feec938cb0..82e4a1f846c 100644 --- a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java @@ -155,26 +155,26 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { @SuppressWarnings({"unchecked"}) @Override - public void run() { - try { - handler.messageReceived(request, transportChannel); - } catch (Throwable e) { - if (transport.lifecycleState() == Lifecycle.State.STARTED) { - // we can only send a response transport is started.... - try { - transportChannel.sendResponse(e); - } catch (Throwable e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); - } - } - } + protected void doRun() throws Exception { + handler.messageReceived(request, transportChannel); } @Override public boolean isForceExecution() { return handler.isForceExecution(); } + + @Override + public void onFailure(Throwable e) { + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } } } }); return pipeline;