Make sure the transport response handler is invoked only once.

There was a small window of time where the transport response handler's handException method was invoked twice. As far as I can tell this happened when node disconnect event was processed just after the request was registered and between a "Node not connected" error was thrown. The TransportService#sendRequest method would invoke the transport response handler's handException method regardless if it was already invoked. This resulted that for one request failure, two retries were executed.

The mpercolate api has an assert that tripped when more than the expected shard level responses were returned. This was caused by the issue described above. For the a single shard level request we had multiple responses and this broke the the the total excepted responses. Also the reduce could be started prematurely, which resulted in an incorrect final response (e.g. total count being incorrect). For example: two shards in total, shard 0 gets reduces twice. The second shard 0 response gets in just before shard 1 response gets in. The reduce starts without shard 1 response.
This commit is contained in:
Martijn van Groningen 2013-09-19 13:24:17 +02:00
parent 1581f25e27
commit e68f99254b
1 changed files with 16 additions and 11 deletions

View File

@ -174,7 +174,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
final TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
if (node == null) {
throw new ElasticSearchIllegalStateException("can't send request to a null node");
}
@ -190,24 +190,29 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} catch (final Throwable e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
clientHandlers.remove(requestId);
final RequestHolder holderToNotify = clientHandlers.remove(requestId);
if (timeoutHandler != null) {
timeoutHandler.future.cancel(false);
}
// If holderToNotify == null then handler has already been taken care of.
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(sendRequestException);
}
});
}
if (throwConnectException) {
if (e instanceof ConnectTransportException) {
throw (ConnectTransportException) e;
}
}
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
handler.handleException(sendRequestException);
}
});
}
}