Saving some cycles here and there on the IO loop: * Don't instantiate new `Runnable` to execute on `SAME` in a few spots * Don't instantiate complicated wrapped stream for empty messages * Stop instantiating almost never used `ClusterStateObserver` in two spots * Some minor cleanup and preventing pointless `Predicate<>` instantiation in transport master node action
This commit is contained in:
parent
a3a0c63ccf
commit
08dbd6d989
|
@ -107,35 +107,31 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
|
||||
new AsyncSingleAction(task, request, listener).start();
|
||||
ClusterState state = clusterService.state();
|
||||
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
|
||||
if (task != null) {
|
||||
request.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
}
|
||||
new AsyncSingleAction(task, request, listener).doStart(state);
|
||||
}
|
||||
|
||||
class AsyncSingleAction {
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
private final Request request;
|
||||
private volatile ClusterStateObserver observer;
|
||||
private ClusterStateObserver observer;
|
||||
private final long startTime;
|
||||
private final Task task;
|
||||
|
||||
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
|
||||
this.task = task;
|
||||
this.request = request;
|
||||
if (task != null) {
|
||||
request.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
}
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
ClusterState state = clusterService.state();
|
||||
this.observer
|
||||
= new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
|
||||
doStart(state);
|
||||
this.startTime = threadPool.relativeTimeInMillis();
|
||||
}
|
||||
|
||||
protected void doStart(ClusterState clusterState) {
|
||||
try {
|
||||
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
|
||||
// check for block, if blocked, retry, else, execute locally
|
||||
|
@ -144,8 +140,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
if (!blockException.retryable()) {
|
||||
listener.onFailure(blockException);
|
||||
} else {
|
||||
logger.trace("can't execute due to a cluster block, retrying", blockException);
|
||||
retry(blockException, newState -> {
|
||||
logger.debug("can't execute due to a cluster block, retrying", blockException);
|
||||
retry(clusterState, blockException, newState -> {
|
||||
try {
|
||||
ClusterBlockException newException = checkBlock(request, newState);
|
||||
return (newException == null || !newException.retryable());
|
||||
|
@ -161,7 +157,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
|
||||
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
|
||||
"stepped down before publishing action [{}], scheduling a retry", actionName), t);
|
||||
retry(t, masterChangePredicate);
|
||||
retryOnMasterChange(clusterState, t);
|
||||
} else {
|
||||
delegatedListener.onFailure(t);
|
||||
}
|
||||
|
@ -172,7 +168,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
} else {
|
||||
if (nodes.getMasterNode() == null) {
|
||||
logger.debug("no known master node, scheduling a retry");
|
||||
retry(null, masterChangePredicate);
|
||||
retryOnMasterChange(clusterState, null);
|
||||
} else {
|
||||
DiscoveryNode masterNode = nodes.getMasterNode();
|
||||
final String actionName = getMasterActionName(masterNode);
|
||||
|
@ -187,7 +183,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
logger.debug("connection exception while trying to forward request with action name [{}] to " +
|
||||
"master node [{}], scheduling a retry. Error: [{}]",
|
||||
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
|
||||
retry(cause, masterChangePredicate);
|
||||
retryOnMasterChange(clusterState, cause);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
|
@ -200,7 +196,21 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
}
|
||||
}
|
||||
|
||||
private void retry(final Throwable failure, final Predicate<ClusterState> statePredicate) {
|
||||
private void retryOnMasterChange(ClusterState state, Throwable failure) {
|
||||
retry(state, failure, MasterNodeChangePredicate.build(state));
|
||||
}
|
||||
|
||||
private void retry(ClusterState state, final Throwable failure, final Predicate<ClusterState> statePredicate) {
|
||||
if (observer == null) {
|
||||
final long remainingTimeoutMS = request.masterNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
|
||||
if (remainingTimeoutMS <= 0) {
|
||||
logger.debug(() -> new ParameterizedMessage("timed out before retrying [{}] after failure", actionName), failure);
|
||||
listener.onFailure(new MasterNotDiscoveredException(failure));
|
||||
return;
|
||||
}
|
||||
this.observer = new ClusterStateObserver(
|
||||
state, clusterService, TimeValue.timeValueMillis(remainingTimeoutMS), logger, threadPool.getThreadContext());
|
||||
}
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,9 @@ package org.elasticsearch.transport;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -33,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class InboundHandler {
|
||||
|
||||
|
@ -79,6 +82,9 @@ public class InboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
// Empty stream constant to avoid instantiating a new stream for empty messages.
|
||||
private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput(ByteBuffer.wrap(BytesRef.EMPTY_BYTES));
|
||||
|
||||
private void messageReceived(TcpChannel channel, InboundMessage message) throws IOException {
|
||||
final InetSocketAddress remoteAddress = channel.getRemoteAddress();
|
||||
final Header header = message.getHeader();
|
||||
|
@ -94,8 +100,6 @@ public class InboundHandler {
|
|||
} else {
|
||||
// Responses do not support short circuiting currently
|
||||
assert message.isShortCircuit() == false;
|
||||
final StreamInput streamInput = namedWriteableStream(message.openOrGetStreamInput());
|
||||
assertRemoteVersion(streamInput, header.getVersion());
|
||||
final TransportResponseHandler<?> handler;
|
||||
long requestId = header.getRequestId();
|
||||
if (header.isHandshake()) {
|
||||
|
@ -111,17 +115,26 @@ public class InboundHandler {
|
|||
}
|
||||
// ignore if its null, the service logs it
|
||||
if (handler != null) {
|
||||
if (header.isError()) {
|
||||
handlerResponseError(streamInput, handler);
|
||||
final StreamInput streamInput;
|
||||
if (message.getContentLength() > 0 || header.getVersion().equals(Version.CURRENT) == false) {
|
||||
streamInput = namedWriteableStream(message.openOrGetStreamInput());
|
||||
assertRemoteVersion(streamInput, header.getVersion());
|
||||
if (header.isError()) {
|
||||
handlerResponseError(streamInput, handler);
|
||||
} else {
|
||||
handleResponse(remoteAddress, streamInput, handler);
|
||||
}
|
||||
// Check the entire message has been read
|
||||
final int nextByte = streamInput.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId ["
|
||||
+ requestId + "], handler [" + handler + "], error [" + header.isError()
|
||||
+ "]; resetting");
|
||||
}
|
||||
} else {
|
||||
handleResponse(remoteAddress, streamInput, handler);
|
||||
}
|
||||
// Check the entire message has been read
|
||||
final int nextByte = streamInput.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + header.isError() + "]; resetting");
|
||||
assert header.isError() == false;
|
||||
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,12 +186,20 @@ public class InboundHandler {
|
|||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
|
||||
+ action + "], available [" + stream.available() + "]; resetting");
|
||||
}
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler<>(reg, request, transportChannel));
|
||||
final String executor = reg.getExecutor();
|
||||
if (ThreadPool.Names.SAME.equals(executor)) {
|
||||
try {
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
} catch (Exception e) {
|
||||
sendErrorResponse(reg.getAction(), transportChannel, e);
|
||||
}
|
||||
} else {
|
||||
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sendErrorResponse(action, transportChannel, e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,17 +223,20 @@ public class InboundHandler {
|
|||
"Failed to deserialize response from handler [" + handler + "]", e));
|
||||
return;
|
||||
}
|
||||
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
final String executor = handler.executor();
|
||||
if (ThreadPool.Names.SAME.equals(executor)) {
|
||||
doHandleResponse(handler, response);
|
||||
} else {
|
||||
threadPool.executor(executor).execute(() -> doHandleResponse(handler, response));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
handler.handleResponse(response);
|
||||
}
|
||||
});
|
||||
private <T extends TransportResponse> void doHandleResponse(TransportResponseHandler<T> handler, T response) {
|
||||
try {
|
||||
handler.handleResponse(response);
|
||||
} catch (Exception e) {
|
||||
handleException(handler, new ResponseHandlerFailureTransportException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void handlerResponseError(StreamInput stream, final TransportResponseHandler<?> handler) {
|
||||
|
|
Loading…
Reference in New Issue