diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index e5504e73683..d4896e841ab 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -234,6 +234,12 @@ public abstract class TransportShardReplicationOperationAction { + class AllocateDangledRequestHandler extends BaseTransportRequestHandler { public static final String ACTION = "/gateway/local/allocate_dangled"; diff --git a/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java b/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java index 7cc4f163de4..ef601c6a1ee 100644 --- a/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java +++ b/src/main/java/org/elasticsearch/transport/BaseTransportRequestHandler.java @@ -24,4 +24,11 @@ package org.elasticsearch.transport; */ public abstract class BaseTransportRequestHandler implements TransportRequestHandler { + /** + * Default force execution to false. + */ + @Override + public boolean isForceExecution() { + return false; + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java b/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java index 7bb82e9a0a4..e7337e7a368 100644 --- a/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java +++ b/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java @@ -29,4 +29,9 @@ public interface TransportRequestHandler { void messageReceived(T request, TransportChannel channel) throws Exception; String executor(); + + /** + * See {@link org.elasticsearch.common.util.concurrent.AbstractRunnable#isForceExecution()}. + */ + boolean isForceExecution(); } diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index f3d83f9dc32..9dc9fbfcdf9 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -24,6 +24,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.*; @@ -31,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; @@ -186,7 +188,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem return this.threadPool; } - protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { + protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { try { transportServiceAdapter.received(data.length); StreamInput stream = new BytesStreamInput(data, false); @@ -232,7 +234,35 @@ public class LocalTransport extends AbstractLifecycleComponent implem } final TransportRequest request = handler.newInstance(); request.readFrom(stream); - handler.messageReceived(request, transportChannel); + if (handler.executor() == ThreadPool.Names.SAME) { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new AbstractRunnable() { + @Override + public void run() { + try { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } catch (Throwable e) { + if (lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + }); + } } catch (Throwable e) { try { transportChannel.sendResponse(e); @@ -254,7 +284,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } handleParsedRespone(response, handler); } - + protected void handleParsedRespone(final TransportResponse response, final TransportResponseHandler handler) { threadPool.executor(handler.executor()).execute(new Runnable() { @SuppressWarnings({"unchecked"}) diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index a3b9bc0a068..71284942fa6 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; @@ -249,7 +250,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - class RequestHandler implements Runnable { + class RequestHandler extends AbstractRunnable { private final TransportRequestHandler handler; private final TransportRequest request; private final NettyTransportChannel transportChannel; @@ -279,5 +280,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } } }