diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index e3beeeafc58..e34ed776f79 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -107,35 +107,31 @@ public abstract class TransportMasterNodeAction listener) { - new AsyncSingleAction(task, request, listener).start(); + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); + if (task != null) { + request.setParentTask(clusterService.localNode().getId(), task.getId()); + } + new AsyncSingleAction(task, request, listener).doStart(state); } class AsyncSingleAction { private final ActionListener listener; private final Request request; - private volatile ClusterStateObserver observer; + private ClusterStateObserver observer; + private final long startTime; private final Task task; AsyncSingleAction(Task task, Request request, ActionListener listener) { this.task = task; this.request = request; - if (task != null) { - request.setParentTask(clusterService.localNode().getId(), task.getId()); - } this.listener = listener; - } - - public void start() { - ClusterState state = clusterService.state(); - this.observer - = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext()); - doStart(state); + this.startTime = threadPool.relativeTimeInMillis(); } protected void doStart(ClusterState clusterState) { try { - final Predicate masterChangePredicate = MasterNodeChangePredicate.build(clusterState); final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { // check for block, if blocked, retry, else, execute locally @@ -144,8 +140,8 @@ public abstract class TransportMasterNodeAction { + logger.debug("can't execute due to a cluster block, retrying", blockException); + retry(clusterState, blockException, newState -> { try { ClusterBlockException newException = checkBlock(request, newState); return (newException == null || !newException.retryable()); @@ -161,7 +157,7 @@ public abstract class TransportMasterNodeAction new ParameterizedMessage("master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", actionName), t); - retry(t, masterChangePredicate); + retryOnMasterChange(clusterState, t); } else { delegatedListener.onFailure(t); } @@ -172,7 +168,7 @@ public abstract class TransportMasterNodeAction statePredicate) { + private void retryOnMasterChange(ClusterState state, Throwable failure) { + retry(state, failure, MasterNodeChangePredicate.build(state)); + } + + private void retry(ClusterState state, final Throwable failure, final Predicate statePredicate) { + if (observer == null) { + final long remainingTimeoutMS = request.masterNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime); + if (remainingTimeoutMS <= 0) { + logger.debug(() -> new ParameterizedMessage("timed out before retrying [{}] after failure", actionName), failure); + listener.onFailure(new MasterNotDiscoveredException(failure)); + return; + } + this.observer = new ClusterStateObserver( + state, clusterService, TimeValue.timeValueMillis(remainingTimeoutMS), logger, threadPool.getThreadContext()); + } observer.waitForNextChange( new ClusterStateObserver.Listener() { @Override diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 045f016409f..d7aa821b056 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -22,7 +22,9 @@ package org.elasticsearch.transport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -33,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; public class InboundHandler { @@ -79,6 +82,9 @@ public class InboundHandler { } } + // Empty stream constant to avoid instantiating a new stream for empty messages. + private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput(ByteBuffer.wrap(BytesRef.EMPTY_BYTES)); + private void messageReceived(TcpChannel channel, InboundMessage message) throws IOException { final InetSocketAddress remoteAddress = channel.getRemoteAddress(); final Header header = message.getHeader(); @@ -94,8 +100,6 @@ public class InboundHandler { } else { // Responses do not support short circuiting currently assert message.isShortCircuit() == false; - final StreamInput streamInput = namedWriteableStream(message.openOrGetStreamInput()); - assertRemoteVersion(streamInput, header.getVersion()); final TransportResponseHandler handler; long requestId = header.getRequestId(); if (header.isHandshake()) { @@ -111,17 +115,26 @@ public class InboundHandler { } // ignore if its null, the service logs it if (handler != null) { - if (header.isError()) { - handlerResponseError(streamInput, handler); + final StreamInput streamInput; + if (message.getContentLength() > 0 || header.getVersion().equals(Version.CURRENT) == false) { + streamInput = namedWriteableStream(message.openOrGetStreamInput()); + assertRemoteVersion(streamInput, header.getVersion()); + if (header.isError()) { + handlerResponseError(streamInput, handler); + } else { + handleResponse(remoteAddress, streamInput, handler); + } + // Check the entire message has been read + final int nextByte = streamInput.read(); + // calling read() is useful to make sure the message is fully read, even if there is an EOS marker + if (nextByte != -1) { + throw new IllegalStateException("Message not fully read (response) for requestId [" + + requestId + "], handler [" + handler + "], error [" + header.isError() + + "]; resetting"); + } } else { - handleResponse(remoteAddress, streamInput, handler); - } - // Check the entire message has been read - final int nextByte = streamInput.read(); - // calling read() is useful to make sure the message is fully read, even if there is an EOS marker - if (nextByte != -1) { - throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" - + handler + "], error [" + header.isError() + "]; resetting"); + assert header.isError() == false; + handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler); } } } @@ -173,12 +186,20 @@ public class InboundHandler { throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + "], available [" + stream.available() + "]; resetting"); } - threadPool.executor(reg.getExecutor()).execute(new RequestHandler<>(reg, request, transportChannel)); + final String executor = reg.getExecutor(); + if (ThreadPool.Names.SAME.equals(executor)) { + try { + reg.processMessageReceived(request, transportChannel); + } catch (Exception e) { + sendErrorResponse(reg.getAction(), transportChannel, e); + } + } else { + threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); + } } } catch (Exception e) { sendErrorResponse(action, transportChannel, e); } - } } @@ -202,17 +223,20 @@ public class InboundHandler { "Failed to deserialize response from handler [" + handler + "]", e)); return; } - threadPool.executor(handler.executor()).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - handleException(handler, new ResponseHandlerFailureTransportException(e)); - } + final String executor = handler.executor(); + if (ThreadPool.Names.SAME.equals(executor)) { + doHandleResponse(handler, response); + } else { + threadPool.executor(executor).execute(() -> doHandleResponse(handler, response)); + } + } - @Override - protected void doRun() { - handler.handleResponse(response); - } - }); + private void doHandleResponse(TransportResponseHandler handler, T response) { + try { + handler.handleResponse(response); + } catch (Exception e) { + handleException(handler, new ResponseHandlerFailureTransportException(e)); + } } private void handlerResponseError(StreamInput stream, final TransportResponseHandler handler) {