SniffNodesSampler should close connection after handling responses (#24632)

With the current implementation, SniffNodesSampler might close the
current connection right after a request is sent but before the response
is correctly handled. This causes to timeouts in the transport client
when the sniffing is activated.

closes #24575
closes #24557
This commit is contained in:
Tanguy Leroux 2017-05-12 16:38:50 +02:00 committed by GitHub
parent 878ae8eb3c
commit f8df2a22e9
2 changed files with 185 additions and 19 deletions

View File

@ -469,14 +469,17 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
*/
Transport.Connection connectionToClose = null;
@Override
public void onAfter() {
IOUtils.closeWhileHandlingException(connectionToClose);
void onDone() {
try {
IOUtils.closeWhileHandlingException(connectionToClose);
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
latch.countDown();
onDone();
if (e instanceof ConnectTransportException) {
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
@ -522,7 +525,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
latch.countDown();
onDone();
}
@Override
@ -532,9 +535,8 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
}
finally {
latch.countDown();
} finally {
onDone();
}
}
});

View File

@ -19,21 +19,13 @@
package org.elasticsearch.client.transport;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -41,19 +33,40 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomMatcher;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.transport.MockTransportService.createNewService;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItem;
@ -322,6 +335,157 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
}
public void testSniffNodesSamplerClosesConnections() throws Exception {
final TestThreadPool threadPool = new TestThreadPool("testSniffNodesSamplerClosesConnections");
Settings remoteSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "remote").build();
try (MockTransportService remoteService = createNewService(remoteSettings, Version.CURRENT, threadPool, null)) {
final MockHandler handler = new MockHandler(remoteService);
remoteService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME, handler);
remoteService.start();
remoteService.acceptIncomingRequests();
Settings clientSettings = Settings.builder()
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
.put(TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.getKey(), TimeValue.timeValueSeconds(1))
.put(TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.getKey(), TimeValue.timeValueSeconds(30))
.build();
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<MockConnection> establishedConnections = new CopyOnWriteArrayList<>();
final List<MockConnection> reusedConnections = new CopyOnWriteArrayList<>();
clientService.addDelegate(remoteService, new MockTransportService.DelegateTransport(clientService.original()) {
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
MockConnection connection = new MockConnection(super.openConnection(node, profile));
establishedConnections.add(connection);
return connection;
}
@Override
public Connection getConnection(DiscoveryNode node) {
MockConnection connection = new MockConnection(super.getConnection(node));
reusedConnections.add(connection);
return connection;
}
});
clientService.start();
clientService.acceptIncomingRequests();
try (TransportClientNodesService transportClientNodesService =
new TransportClientNodesService(clientSettings, clientService, threadPool, (a, b) -> {})) {
assertEquals(0, transportClientNodesService.connectedNodes().size());
assertEquals(0, establishedConnections.size());
assertEquals(0, reusedConnections.size());
transportClientNodesService.addTransportAddresses(remoteService.getLocalDiscoNode().getAddress());
assertEquals(1, transportClientNodesService.connectedNodes().size());
assertClosedConnections(establishedConnections, 1);
transportClientNodesService.doSample();
assertClosedConnections(establishedConnections, 2);
assertOpenConnections(reusedConnections, 1);
handler.blockRequest();
Thread thread = new Thread(transportClientNodesService::doSample);
thread.start();
assertBusy(() -> assertEquals(3, establishedConnections.size()));
assertFalse("Temporary ping connection must be opened", establishedConnections.get(2).isClosed());
handler.releaseRequest();
thread.join();
assertClosedConnections(establishedConnections, 3);
}
}
} finally {
terminate(threadPool);
}
}
private void assertClosedConnections(final List<MockConnection> connections, final int size) {
assertEquals("Expecting " + size + " closed connections but got " + connections.size(), size, connections.size());
connections.forEach(c -> assertConnection(c, true));
}
private void assertOpenConnections(final List<MockConnection> connections, final int size) {
assertEquals("Expecting " + size + " open connections but got " + connections.size(), size, connections.size());
connections.forEach(c -> assertConnection(c, false));
}
private static void assertConnection(final MockConnection connection, final boolean closed) {
assertEquals("Connection [" + connection + "] must be " + (closed ? "closed" : "open"), closed, connection.isClosed());
}
class MockConnection implements Transport.Connection {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Transport.Connection connection;
private MockConnection(Transport.Connection connection) {
this.connection = connection;
}
@Override
public DiscoveryNode getNode() {
return connection.getNode();
}
@Override
public Version getVersion() {
return connection.getVersion();
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connection.close();
}
}
boolean isClosed() {
return closed.get();
}
}
class MockHandler implements TransportRequestHandler<ClusterStateRequest> {
private final AtomicBoolean block = new AtomicBoolean(false);
private final CountDownLatch release = new CountDownLatch(1);
private final MockTransportService transportService;
MockHandler(MockTransportService transportService) {
this.transportService = transportService;
}
@Override
public void messageReceived(ClusterStateRequest request, TransportChannel channel) throws Exception {
if (block.get()) {
release.await();
return;
}
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build();
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L));
}
void blockRequest() {
if (block.compareAndSet(false, true) == false) {
throw new AssertionError("Request handler is already marked as blocking");
}
}
void releaseRequest() {
release.countDown();
}
}
public static class TestRequest extends TransportRequest {
}