Transport: fix racing condition in timeout handling

If a request comes in at the same moment the timeout handler for it runs, we may leak a timeoutInfoHolder and erroneously log "Transport response handler not found of id" . The same issue could cause the request tracer to fire a traceUnresolvedResponse call instead of traceReceivedResponse , causing a failure of testTracerLog ( see #10187 ) .

This commit makes sure timeoutInfoHolder is visible before removing the corresponding RequestHolder. It also unifies the TransportService.Adapter#remove(requestId) with TransportService.Adapter#onResponseReceived(requestId), as they are always called together to indicate a response was received.

Closes #10187
Closes #10220
This commit is contained in:
Boaz Leskes 2015-03-23 10:12:35 +01:00
parent f5f9739117
commit cde7d9af1c
5 changed files with 79 additions and 77 deletions

View File

@ -255,16 +255,22 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
throw new ElasticsearchIllegalStateException("can't send request to a null node");
}
final long requestId = newRequestId();
TimeoutHandler timeoutHandler = null;
final TimeoutHandler timeoutHandler;
try {
if (options.timeout() == null) {
timeoutHandler = null;
} else {
timeoutHandler = new TimeoutHandler(requestId);
}
clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
if (started.get() == false) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
// it will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
}
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId);
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
transport.sendRequest(node, requestId, action, request, options);
@ -272,13 +278,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
final RequestHolder holderToNotify = clientHandlers.remove(requestId);
// if the scheduler raise a EsRejectedExecutionException (due to shutdown), we may have a timeout handler, but no future
if (timeoutHandler != null) {
FutureUtils.cancel(timeoutHandler.future);
}
// If holderToNotify == null then handler has already been taken care of.
if (holderToNotify != null) {
holderToNotify.cancelTimeout();
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
@ -376,32 +378,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
tracerLog.trace("[{}][{}] sent error response (error: [{}])", requestId, action, t.getMessage());
}
@Override
public void onResponseReceived(long requestId) {
if (traceEnabled()) {
// try to resolve the request
DiscoveryNode sourceNode = null;
String action = null;
RequestHolder holder = clientHandlers.get(requestId);
if (holder != null) {
action = holder.action();
sourceNode = holder.node();
} else {
// lets see if its in the timeout holder
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.get(requestId);
if (timeoutInfoHolder != null) {
action = timeoutInfoHolder.action();
sourceNode = timeoutInfoHolder.node();
}
}
if (action == null) {
traceUnresolvedResponse(requestId);
} else if (shouldTraceAction(action)) {
traceReceivedResponse(requestId, sourceNode, action);
}
}
}
@Override
public void onRequestReceived(long requestId, String action) {
if (traceEnabled() && shouldTraceAction(action)) {
@ -415,21 +391,45 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
@Override
public TransportResponseHandler remove(long requestId) {
public TransportResponseHandler onResponseReceived(final long requestId) {
RequestHolder holder = clientHandlers.remove(requestId);
if (holder == null) {
// lets see if its in the timeout holder
checkForTimeout(requestId);
return null;
}
holder.cancelTimeout();
if (traceEnabled() && shouldTraceAction(holder.action())) {
traceReceivedResponse(requestId, holder.node(), holder.action());
}
return holder.handler();
}
protected void checkForTimeout(long requestId) {
// lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout handling has finished
final DiscoveryNode sourceNode;
final String action;
assert clientHandlers.get(requestId) == null;
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
if (timeoutInfoHolder != null) {
long time = System.currentTimeMillis();
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
action = timeoutInfoHolder.action();
sourceNode = timeoutInfoHolder.node();
} else {
logger.warn("Transport response handler not found of id [{}]", requestId);
action = null;
sourceNode = null;
}
return null;
// call tracer out of lock
if (traceEnabled() == false) {
return;
}
if (action == null) {
assert sourceNode == null;
traceUnresolvedResponse(requestId);
} else if (shouldTraceAction(action)) {
traceReceivedResponse(requestId, sourceNode, action);
}
holder.cancel();
return holder.handler();
}
@Override
@ -504,31 +504,42 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final long sentTime = System.currentTimeMillis();
ScheduledFuture future;
volatile ScheduledFuture future;
TimeoutHandler(long requestId) {
this.requestId = requestId;
}
public long sentTime() {
return sentTime;
}
@Override
public void run() {
if (future.isCancelled()) {
return;
}
final RequestHolder holder = clientHandlers.remove(requestId);
// we get first to make sure we only add the TimeoutInfoHandler if needed.
final RequestHolder holder = clientHandlers.get(requestId);
if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later
long timeoutTime = System.currentTimeMillis();
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action(), sentTime, timeoutTime));
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
// now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
final RequestHolder removedHolder = clientHandlers.remove(requestId);
if (removedHolder != null) {
assert removedHolder == holder : "two different holder instances for request [" + requestId + "]";
removedHolder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
} else {
// response was processed, remove timeout info.
timeoutInfoHandlers.remove(requestId);
}
}
}
/**
* cancels timeout handling. this is a best effort only to avoid running it. remove the requestId from {@link #clientHandlers}
* to make sure this doesn't run.
*/
public void cancel() {
assert clientHandlers.get(requestId) == null : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
FutureUtils.cancel(future);
}
}
static class TimeoutInfoHolder {
private final DiscoveryNode node;
@ -571,13 +582,13 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final String action;
private final TimeoutHandler timeout;
private final TimeoutHandler timeoutHandler;
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, TimeoutHandler timeout) {
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, TimeoutHandler timeoutHandler) {
this.handler = handler;
this.node = node;
this.action = action;
this.timeout = timeout;
this.timeoutHandler = timeoutHandler;
}
public TransportResponseHandler<T> handler() {
@ -592,9 +603,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return this.action;
}
public void cancel() {
if (timeout != null) {
FutureUtils.cancel(timeout.future);
public void cancelTimeout() {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
}
}

View File

@ -41,9 +41,10 @@ public interface TransportServiceAdapter {
/**
* called by the {@link Transport) implementation when a response or an exception has been recieved for a previously
* sent request (before any processing or deserialization was done
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
* found.
*/
void onResponseReceived(long requestId);
TransportResponseHandler onResponseReceived(long requestId);
/**
* called by the {@link Transport) implementation when an incoming request arrives but before
@ -53,8 +54,6 @@ public interface TransportServiceAdapter {
TransportRequestHandler handler(String action);
TransportResponseHandler remove(long requestId);
void raiseNodeConnected(DiscoveryNode node);
void raiseNodeDisconnected(DiscoveryNode node);

View File

@ -235,9 +235,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
if (isRequest) {
handleRequest(stream, requestId, sourceTransport, version);
} else {
// notify with response before we process it and before we remove information about it.
transportServiceAdapter.onResponseReceived(requestId);
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
@ -249,7 +247,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
} catch (Throwable e) {
if (sendRequestId != null) {
TransportResponseHandler handler = transportServiceAdapter.remove(sendRequestId);
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(sendRequestId);
if (handler != null) {
handleException(handler, new RemoteTransportException(nodeName(), localAddress, action, e));
}

View File

@ -120,9 +120,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
buffer.readerIndex(expectedIndexReader);
}
} else {
// notify with response before we process it and before we remove information about it.
transportServiceAdapter.onResponseReceived(requestId);
TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {

View File

@ -20,11 +20,7 @@
package org.elasticsearch.client.transport;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -65,7 +61,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId);
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.DEFAULT, node));
return;
}
@ -82,7 +78,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalStateException();
} else {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId);
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
if (random.nextBoolean()) {
successes.incrementAndGet();
transportResponseHandler.handleResponse(newResponse());