From 0c70a9d5bd6c6fe230bdcf1bf33351161504c3b2 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 29 Mar 2016 18:36:09 +0200 Subject: [PATCH] fix bug introduced with refactoring of DiscoveryNode constructors Transport client was replacing the address of the nodes connecting to with the ones received from the liveness api rather keeping the original listed nodes. Written a test for that. --- .../TransportClientNodesService.java | 4 +- .../cluster/node/DiscoveryNode.java | 17 --- .../zen/ping/unicast/UnicastZenPing.java | 5 +- .../TransportClientNodesServiceTests.java | 137 +++++++++++++----- 4 files changed, 110 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index fafdbec864f..69b4cd03e3b 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -383,7 +383,9 @@ public class TransportClientNodesService extends AbstractComponent { // use discovered information but do keep the original transport address, // so people can control which address is exactly used. DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); - newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo)); + newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(), + nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(), + nodeWithInfo.getRoles(), nodeWithInfo.getVersion())); } else { // although we asked for one node, our target may not have completed // initialization yet and doesn't have cluster nodes diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 79fa04460b7..2671fb372ff 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -159,23 +159,6 @@ public class DiscoveryNode implements Writeable, ToXContent { this(nodeName, nodeId, address.getHost(), address.getAddress(), address, attributes, roles, version); } - /** - * Creates a new {@link DiscoveryNode}. - *

- * Note: if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current - * version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used - * the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered - * and updated. - *

- * - * @param nodeName the nodes name - * @param nodeId the nodes unique id - * @param node the original node to copy all the information from - */ - public DiscoveryNode(String nodeName, String nodeId, DiscoveryNode node) { - this(nodeName, nodeId, node.hostName, node.hostAddress, node.address, node.attributes, node.roles, node.version); - } - /** * Creates a new {@link DiscoveryNode}. *

diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 62b44a31456..4806871be3b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -381,8 +381,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen if (!nodeToSend.id().startsWith(UNICAST_NODE_PREFIX)) { DiscoveryNode tempNode = new DiscoveryNode("", UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.id() + "#", - nodeToSend - ); + nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.getAddress(), nodeToSend.getAttributes(), + nodeToSend.getRoles(), nodeToSend.getVersion()); + logger.trace("replacing {} with temp node {}", nodeToSend, tempNode); nodeToSend = tempNode; } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 5e67ac42d0e..1c9e607f363 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -21,9 +21,10 @@ package org.elasticsearch.client.transport; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESTestCase; @@ -33,6 +34,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.Closeable; @@ -46,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -71,18 +74,70 @@ public class TransportClientNodesServiceTests extends ESTestCase { return new TestResponse(); } }; - transportService = new TransportService(Settings.EMPTY, transport, threadPool); + transportService = new TransportService(Settings.EMPTY, transport, threadPool) { + @Override + public void sendRequest(DiscoveryNode node, String action, + TransportRequest request, final TransportResponseHandler handler) { + if (TransportLivenessAction.NAME.equals(action)) { + super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node)); + } else { + super.sendRequest(node, action, request, handler); + } + } + + @Override + public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler) { + if (TransportLivenessAction.NAME.equals(action)) { + super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node)); + } else { + super.sendRequest(node, action, request, options, handler); + } + } + }; transportService.start(); transportService.acceptIncomingRequests(); - transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); - - nodesCount = randomIntBetween(1, 10); + transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, + Version.CURRENT); + this.nodesCount = randomIntBetween(1, 10); for (int i = 0; i < nodesCount; i++) { transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); } transport.endConnectMode(); } + private TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler handler, + DiscoveryNode node) { + return new TransportResponseHandler() { + @Override + public T newInstance() { + return handler.newInstance(); + } + + @Override + @SuppressWarnings("unchecked") + public void handleResponse(T response) { + LivenessResponse livenessResponse = new LivenessResponse(ClusterName.DEFAULT, + new DiscoveryNode(node.getName(), node.getId(), "liveness-hostname" + node.getId(), + "liveness-hostaddress" + node.getId(), + new LocalTransportAddress("liveness-address-" + node.getId()), node.getAttributes(), node.getRoles(), + node.getVersion())); + handler.handleResponse((T)livenessResponse); + } + + @Override + public void handleException(TransportException exp) { + handler.handleException(exp); + } + + @Override + public String executor() { + return handler.executor(); + } + }; + } + @Override public void close() { @@ -121,37 +176,35 @@ public class TransportClientNodesServiceTests extends ESTestCase { final AtomicInteger preSendFailures = new AtomicInteger(); - iteration.transportClientNodesService.execute(new TransportClientNodesService.NodeListenerCallback() { - @Override - public void doWithNode(DiscoveryNode node, final ActionListener retryListener) { - if (rarely()) { - preSendFailures.incrementAndGet(); - //throw whatever exception that is not a subclass of ConnectTransportException - throw new IllegalArgumentException(); + iteration.transportClientNodesService.execute((node, retryListener) -> { + if (rarely()) { + preSendFailures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalArgumentException(); + } + + iteration.transportService.sendRequest(node, "action", new TestRequest(), + TransportRequestOptions.EMPTY, new BaseTransportResponseHandler() { + @Override + public TestResponse newInstance() { + return new TestResponse(); } - iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler() { - @Override - public TestResponse newInstance() { - return new TestResponse(); - } + @Override + public void handleResponse(TestResponse response1) { + retryListener.onResponse(response1); + } - @Override - public void handleResponse(TestResponse response) { - retryListener.onResponse(response); - } + @Override + public void handleException(TransportException exp) { + retryListener.onFailure(exp); + } - @Override - public void handleException(TransportException exp) { - retryListener.onFailure(exp); - } - - @Override - public String executor() { - return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; - } - }); - } + @Override + public String executor() { + return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; + } + }); }, actionListener); assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); @@ -173,7 +226,25 @@ public class TransportClientNodesServiceTests extends ESTestCase { } assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); - assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + public void testConnectedNodes() { + int iters = iterations(10, 100); + for (int i = 0; i