From 199a5a1f04afa23513bee7ec237785a0e599315b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jul 2016 10:56:20 +0200 Subject: [PATCH] Fix TcpTransport#sendRequest to raise NotConnectedExcepiton if we get disconnected while sending This also fixes a race in AbstractSimpleTransportTestCase where we never wait long enough for all response to finish causing expected failures. --- .../elasticsearch/transport/TcpTransport.java | 13 ++++++++++++- .../org/elasticsearch/transport/Transport.java | 1 + .../AbstractSimpleTransportTestCase.java | 17 +++++++++-------- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index cd41afb763a..b0139902a42 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -918,7 +918,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions); } }; - sendMessage(targetChannel, message, onRequestSent, false); + try { + sendMessage(targetChannel, message, onRequestSent, false); + } catch (IOException ex) { + if (nodeConnected(node)) { + throw ex; + } else { + // we might got disconnected in between the nodeChannel(node, options) call and the sending - + // in that case throw a subclass of ConnectTransportException since some code retries based on this + // see TransportMasterNodeAction for instance + throw new NodeNotConnectedException(node, "Node not connected"); + } + } addedReleaseListener = true; } finally { IOUtils.close(stream); diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 6e36363741d..d0b2edf09bb 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -86,6 +86,7 @@ public interface Transport extends LifecycleComponent { /** * Sends the request to the node. + * @throws NodeNotConnectedException if the given node is not connected */ void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index b65b313ebc7..390eaaa18eb 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -554,6 +554,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { CountDownLatch doneLatch = new CountDownLatch(1); + CountDownLatch allResponded = new CountDownLatch(1); serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override @@ -569,6 +570,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (IOException e) { logger.error("Unexpected failure", e); fail(e.getMessage()); + } finally { + allResponded.countDown(); } } }); @@ -641,6 +644,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); doneLatch.countDown(); + allResponded.await(); } @TestLogging(value = "test. transport.tracer:TRACE") @@ -1086,14 +1090,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testVersionFrom0to0() throws Exception { serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, - new TransportRequestHandler() { - @Override - public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; - channel.sendResponse(response); - } + (request, channel) -> { + assertThat(request.value1, equalTo(1)); + Version0Response response = new Version0Response(); + response.value1 = 1; + channel.sendResponse(response); }); Version0Request version0Request = new Version0Request();