Execute listeners on current thread if threadpool is shutting down

This commit is contained in:
Simon Willnauer 2013-08-29 16:39:54 +02:00
parent f408673bd4
commit 7113731022
1 changed files with 43 additions and 20 deletions

View File

@ -22,7 +22,9 @@ package org.elasticsearch.action.support;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture; import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
@ -50,7 +52,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
public void execute(Request request, ActionListener<Response> listener) { public void execute(Request request, ActionListener<Response> listener) {
if (request.listenerThreaded()) { if (request.listenerThreaded()) {
listener = new ThreadedActionListener<Response>(threadPool, listener); listener = new ThreadedActionListener<Response>(threadPool, listener, logger);
} }
ActionRequestValidationException validationException = request.validate(); ActionRequestValidationException validationException = request.validate();
if (validationException != null) { if (validationException != null) {
@ -67,19 +69,23 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
protected abstract void doExecute(Request request, ActionListener<Response> listener); protected abstract void doExecute(Request request, ActionListener<Response> listener);
static class ThreadedActionListener<Response> implements ActionListener<Response> { static final class ThreadedActionListener<Response> implements ActionListener<Response> {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
ThreadedActionListener(ThreadPool threadPool, ActionListener<Response> listener) { private final ESLogger logger;
ThreadedActionListener(ThreadPool threadPool, ActionListener<Response> listener, ESLogger logger) {
this.threadPool = threadPool; this.threadPool = threadPool;
this.listener = listener; this.listener = listener;
this.logger = logger;
} }
@Override @Override
public void onResponse(final Response response) { public void onResponse(final Response response) {
try {
threadPool.generic().execute(new Runnable() { threadPool.generic().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -90,16 +96,33 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
} }
} }
}); });
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, exectuion rejected [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
try {
listener.onResponse(response);
} catch (Throwable e) {
listener.onFailure(e);
}
}
} }
@Override @Override
public void onFailure(final Throwable e) { public void onFailure(final Throwable e) {
try {
threadPool.generic().execute(new Runnable() { threadPool.generic().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
listener.onFailure(e); listener.onFailure(e);
} }
}); });
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, exectuion rejected for listener [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
listener.onFailure(e);
}
} }
} }
} }