fix bug introduced with refactoring of DiscoveryNode constructors

Transport client was replacing the address of the nodes connecting to with the ones received from the liveness api rather keeping the original listed nodes. Written a test for that.
This commit is contained in:
javanna 2016-03-29 18:36:09 +02:00 committed by Luca Cavanna
parent 8fc9dbbb99
commit 0c70a9d5bd
4 changed files with 110 additions and 53 deletions

View File

@ -383,7 +383,9 @@ public class TransportClientNodesService extends AbstractComponent {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo));
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(),
nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(),
nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
} else {
// although we asked for one node, our target may not have completed
// initialization yet and doesn't have cluster nodes

View File

@ -159,23 +159,6 @@ public class DiscoveryNode implements Writeable<DiscoveryNode>, ToXContent {
this(nodeName, nodeId, address.getHost(), address.getAddress(), address, attributes, roles, version);
}
/**
* Creates a new {@link DiscoveryNode}.
* <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id
* @param node the original node to copy all the information from
*/
public DiscoveryNode(String nodeName, String nodeId, DiscoveryNode node) {
this(nodeName, nodeId, node.hostName, node.hostAddress, node.address, node.attributes, node.roles, node.version);
}
/**
* Creates a new {@link DiscoveryNode}.
* <p>

View File

@ -381,8 +381,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
if (!nodeToSend.id().startsWith(UNICAST_NODE_PREFIX)) {
DiscoveryNode tempNode = new DiscoveryNode("",
UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.id() + "#",
nodeToSend
);
nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.getAddress(), nodeToSend.getAttributes(),
nodeToSend.getRoles(), nodeToSend.getVersion());
logger.trace("replacing {} with temp node {}", nodeToSend, tempNode);
nodeToSend = tempNode;
}

View File

@ -21,9 +21,10 @@ package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
@ -33,6 +34,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
@ -46,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
@ -71,18 +74,70 @@ public class TransportClientNodesServiceTests extends ESTestCase {
return new TestResponse();
}
};
transportService = new TransportService(Settings.EMPTY, transport, threadPool);
transportService = new TransportService(Settings.EMPTY, transport, threadPool) {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action,
TransportRequest request, final TransportResponseHandler<T> handler) {
if (TransportLivenessAction.NAME.equals(action)) {
super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node));
} else {
super.sendRequest(node, action, request, handler);
}
}
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (TransportLivenessAction.NAME.equals(action)) {
super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node));
} else {
super.sendRequest(node, action, request, options, handler);
}
}
};
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);
nodesCount = randomIntBetween(1, 10);
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool,
Version.CURRENT);
this.nodesCount = randomIntBetween(1, 10);
for (int i = 0; i < nodesCount; i++) {
transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i));
}
transport.endConnectMode();
}
private <T extends TransportResponse> TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler<T> handler,
DiscoveryNode node) {
return new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();
}
@Override
@SuppressWarnings("unchecked")
public void handleResponse(T response) {
LivenessResponse livenessResponse = new LivenessResponse(ClusterName.DEFAULT,
new DiscoveryNode(node.getName(), node.getId(), "liveness-hostname" + node.getId(),
"liveness-hostaddress" + node.getId(),
new LocalTransportAddress("liveness-address-" + node.getId()), node.getAttributes(), node.getRoles(),
node.getVersion()));
handler.handleResponse((T)livenessResponse);
}
@Override
public void handleException(TransportException exp) {
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
};
}
@Override
public void close() {
@ -121,37 +176,35 @@ public class TransportClientNodesServiceTests extends ESTestCase {
final AtomicInteger preSendFailures = new AtomicInteger();
iteration.transportClientNodesService.execute(new TransportClientNodesService.NodeListenerCallback<TestResponse>() {
@Override
public void doWithNode(DiscoveryNode node, final ActionListener<TestResponse> retryListener) {
if (rarely()) {
preSendFailures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalArgumentException();
iteration.transportClientNodesService.execute((node, retryListener) -> {
if (rarely()) {
preSendFailures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalArgumentException();
}
iteration.transportService.sendRequest(node, "action", new TestRequest(),
TransportRequestOptions.EMPTY, new BaseTransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
@Override
public void handleResponse(TestResponse response1) {
retryListener.onResponse(response1);
}
@Override
public void handleResponse(TestResponse response) {
retryListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
retryListener.onFailure(exp);
}
@Override
public void handleException(TransportException exp) {
retryListener.onFailure(exp);
}
@Override
public String executor() {
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
}
});
}
@Override
public String executor() {
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
}
});
}, actionListener);
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
@ -173,7 +226,25 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes()));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() +
iteration.transport.failures() + iteration.transport.successes()));
}
}
}
public void testConnectedNodes() {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
for (DiscoveryNode discoveryNode : iteration.transportClientNodesService.connectedNodes()) {
assertThat(discoveryNode.getHostName(), startsWith("liveness-"));
assertThat(discoveryNode.getHostAddress(), startsWith("liveness-"));
assertThat(discoveryNode.getAddress(), instanceOf(LocalTransportAddress.class));
LocalTransportAddress localTransportAddress = (LocalTransportAddress) discoveryNode.getAddress();
//the original listed transport address is kept rather than the one returned from the liveness api
assertThat(localTransportAddress.id(), startsWith("node"));
}
}
}
}