Fix TcpTransport#sendRequest to raise NotConnectedExcepiton if we get disconnected while sending

This also fixes a race in AbstractSimpleTransportTestCase where we never wait long enough
for all response to finish causing expected failures.
This commit is contained in:
Simon Willnauer 2016-07-12 10:56:20 +02:00
parent d2ac7d450b
commit 199a5a1f04
3 changed files with 22 additions and 9 deletions

View File

@ -918,7 +918,18 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions);
}
};
sendMessage(targetChannel, message, onRequestSent, false);
try {
sendMessage(targetChannel, message, onRequestSent, false);
} catch (IOException ex) {
if (nodeConnected(node)) {
throw ex;
} else {
// we might got disconnected in between the nodeChannel(node, options) call and the sending -
// in that case throw a subclass of ConnectTransportException since some code retries based on this
// see TransportMasterNodeAction for instance
throw new NodeNotConnectedException(node, "Node not connected");
}
}
addedReleaseListener = true;
} finally {
IOUtils.close(stream);

View File

@ -86,6 +86,7 @@ public interface Transport extends LifecycleComponent {
/**
* Sends the request to the node.
* @throws NodeNotConnectedException if the given node is not connected
*/
void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
IOException, TransportException;

View File

@ -554,6 +554,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
CountDownLatch doneLatch = new CountDownLatch(1);
CountDownLatch allResponded = new CountDownLatch(1);
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
@ -569,6 +570,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
} finally {
allResponded.countDown();
}
}
});
@ -641,6 +644,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
doneLatch.countDown();
allResponded.await();
}
@TestLogging(value = "test. transport.tracer:TRACE")
@ -1086,14 +1090,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVersionFrom0to0() throws Exception {
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME,
new TransportRequestHandler<Version0Request>() {
@Override
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
channel.sendResponse(response);
}
(request, channel) -> {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
channel.sendResponse(response);
});
Version0Request version0Request = new Version0Request();