Transport client: Fix remove address to actually work (#21743)

* Transport client: Fix remove address to actually work

The removeTransportAddress method of TransportClient removes the address
from the list of nodes that the client pings to sniff for nodes.
However, it does not remove it from the list of existing connected
nodes. This means removing a node is not possible, as long as that node
is still up.

This change removes the node from the connected nodes list before
triggering sampling (ie sniffing).  While the fix is simple, testing was
not because there were no existing tests for sniffing. This change also
modifies the mocks used by transport client unit tests in order to allow
mocking sniffing.
This commit is contained in:
Ryan Ernst 2016-11-22 22:50:11 -08:00 committed by GitHub
parent 7ad409d5aa
commit d8808210f1
4 changed files with 133 additions and 44 deletions

View File

@ -186,15 +186,25 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
if (closed) {
throw new IllegalStateException("transport client is closed, can't remove an address");
}
List<DiscoveryNode> builder = new ArrayList<>();
List<DiscoveryNode> listNodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : listedNodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
builder.add(otherNode);
listNodesBuilder.add(otherNode);
} else {
logger.debug("removing address [{}]", otherNode);
logger.debug("removing address [{}] from listed nodes", otherNode);
}
}
listedNodes = Collections.unmodifiableList(builder);
listedNodes = Collections.unmodifiableList(listNodesBuilder);
List<DiscoveryNode> nodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : nodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
nodesBuilder.add(otherNode);
} else {
logger.debug("disconnecting from node with address [{}]", otherNode);
transportService.disconnectFromNode(otherNode);
}
}
nodes = Collections.unmodifiableList(nodesBuilder);
nodesSampler.sample();
}
return this;

View File

@ -547,6 +547,8 @@ public class TransportService extends AbstractLifecycleComponent {
holderToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
}
}
}

View File

@ -20,8 +20,13 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
@ -64,6 +69,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
this.clusterName = clusterName;
}
protected abstract ClusterState getMockClusterState(DiscoveryNode node);
@Override
@SuppressWarnings("unchecked")
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
@ -71,9 +78,17 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY),
node));
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY),
node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
}
return;
}

View File

@ -19,37 +19,44 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomMatcher;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -62,14 +69,44 @@ public class TransportClientNodesServiceTests extends ESTestCase {
private final FailAndRetryMockTransport<TestResponse> transport;
private final TransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final int nodesCount;
private final int listNodesCount;
private final int sniffNodesCount;
private TransportAddress livenessAddress = buildNewFakeTransportAddress();
public Set<TransportAddress> nodeAddresses = new HashSet<>();
final List<TransportAddress> listNodeAddresses;
// map for each address of the nodes a cluster state request should respond with
final Map<TransportAddress, DiscoveryNodes> nodeMap;
TestIteration() {
Settings settings = Settings.builder().put("cluster.name", "test").build();
TestIteration(Object... extraSettings) {
Settings settings = Settings.builder().put(extraSettings).put("cluster.name", "test").build();
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
List<TransportAddress> listNodes = new ArrayList<>();
Map<TransportAddress, DiscoveryNodes> nodeMap = new HashMap<>();
this.listNodesCount = randomIntBetween(1, 10);
int sniffNodesCount = 0;
for (int i = 0; i < listNodesCount; i++) {
TransportAddress transportAddress = buildNewFakeTransportAddress();
listNodes.add(transportAddress);
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
discoNodes.add(new DiscoveryNode("#list-node#-" + transportAddress, transportAddress, Version.CURRENT));
if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings)) {
final int numSniffNodes = randomIntBetween(0, 3);
for (int j = 0; j < numSniffNodes; ++j) {
TransportAddress sniffAddress = buildNewFakeTransportAddress();
DiscoveryNode sniffNode = new DiscoveryNode("#sniff-node#-" + sniffAddress, sniffAddress, Version.CURRENT);
discoNodes.add(sniffNode);
// also allow sniffing of the sniff node itself
nodeMap.put(sniffAddress, DiscoveryNodes.builder().add(sniffNode).build());
++sniffNodesCount;
}
}
nodeMap.put(transportAddress, discoNodes.build());
}
listNodeAddresses = listNodes;
this.nodeMap = nodeMap;
this.sniffNodesCount = sniffNodesCount;
threadPool = new TestThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
@Override
@ -79,7 +116,12 @@ public class TransportClientNodesServiceTests extends ESTestCase {
@Override
protected TestResponse newResponse() {
return new TestResponse();
return new TestResponse();
}
@Override
protected ClusterState getMockClusterState(DiscoveryNode node) {
return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build();
}
};
transportService = new TransportService(settings, transport, threadPool, new TransportInterceptor() {
@ -101,14 +143,8 @@ public class TransportClientNodesServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService =
new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
this.nodesCount = randomIntBetween(1, 10);
for (int i = 0; i < nodesCount; i++) {
TransportAddress transportAddress = buildNewFakeTransportAddress();
nodeAddresses.add(transportAddress);
transportClientNodesService.addTransportAddresses(transportAddress);
}
transport.endConnectMode();
new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
transportClientNodesService.addTransportAddresses(listNodeAddresses.toArray(new TransportAddress[0]));
}
private <T extends TransportResponse> TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler<T> handler,
@ -145,7 +181,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
@Override
public void close() {
transport.endConnectMode();
transportService.stop();
transportClientNodesService.close();
try {
@ -160,6 +196,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
iteration.transport.endConnectMode(); // stop transport from responding early
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger finalFailures = new AtomicInteger();
final AtomicReference<Throwable> finalFailure = new AtomicReference<>();
@ -230,7 +267,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() +
iteration.transport.failures() + iteration.transport.successes()));
}
@ -241,17 +278,42 @@ public class TransportClientNodesServiceTests extends ESTestCase {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
for (DiscoveryNode discoveryNode : iteration.transportClientNodesService.connectedNodes()) {
assertThat(discoveryNode.getHostName(), startsWith("liveness-"));
assertThat(discoveryNode.getHostAddress(), startsWith("liveness-"));
assertNotEquals(discoveryNode.getAddress(), iteration.livenessAddress);
assertThat(iteration.nodeAddresses, hasItem(discoveryNode.getAddress()));
assertThat(iteration.listNodeAddresses, hasItem(discoveryNode.getAddress()));
}
}
}
}
public void testRemoveAddressSniff() {
checkRemoveAddress(true);
}
public void testRemoveAddressSimple() {
checkRemoveAddress(false);
}
private void checkRemoveAddress(boolean sniff) {
Object[] extraSettings = {TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), sniff};
try(final TestIteration iteration = new TestIteration(extraSettings)) {
final TransportClientNodesService service = iteration.transportClientNodesService;
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount, service.connectedNodes().size());
final TransportAddress addressToRemove = randomFrom(iteration.listNodeAddresses);
service.removeTransportAddress(addressToRemove);
assertThat(service.connectedNodes(), everyItem(not(new CustomMatcher<DiscoveryNode>("removed address") {
@Override
public boolean matches(Object item) {
return item instanceof DiscoveryNode && ((DiscoveryNode)item).getAddress().equals(addressToRemove);
}
})));
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount - 1, service.connectedNodes().size());
}
}
public static class TestRequest extends TransportRequest {
}