Handle connection close / reset events gracefully during handshake (#22178)

Low level handshake code doesn't handle situations gracefully if the connection
is concurrently closed or reset by peer. This commit adds the relevant code to
fail the handshake if the connection is closed.
This commit is contained in:
Simon Willnauer 2016-12-14 23:04:14 +01:00 committed by GitHub
parent 749039ad4f
commit 80d6539e9c
4 changed files with 118 additions and 56 deletions

View File

@ -20,8 +20,6 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.LongObjectMap;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@ -88,7 +86,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -180,7 +180,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;
private final LongObjectMap<TransportResponseHandler<?>> pendingHandshakes = new LongObjectHashMap<>();
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final AtomicLong requestIdGenerator = new AtomicLong();
private final CounterMetric numHandshakes = new CounterMetric();
private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
@ -242,6 +242,51 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.transportServiceAdapter = service;
}
private static class HandshakeResponseHandler<Channel> implements TransportResponseHandler<VersionHandshakeResponse> {
final AtomicReference<Version> versionRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean handshakeNotSupported = new AtomicBoolean(false);
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
final Channel channel;
public HandshakeResponseHandler(Channel channel) {
this.channel = channel;
}
@Override
public VersionHandshakeResponse newInstance() {
return new VersionHandshakeResponse();
}
@Override
public void handleResponse(VersionHandshakeResponse response) {
final boolean success = versionRef.compareAndSet(null, response.version);
assert success;
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
Throwable cause = exp.getCause();
if (cause != null
&& cause instanceof ActionNotFoundTransportException
// this will happen if we talk to a node (pre 5.2) that doesn't have a handshake handler
// we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version.
&& cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) {
handshakeNotSupported.set(true);
} else {
final boolean success = exceptionRef.compareAndSet(null, exp);
assert success;
}
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
public class ScheduledPing extends AbstractLifecycleRunnable {
/**
@ -462,9 +507,17 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
NodeChannels nodeChannels = connectToChannels(node, profile);
transportServiceAdapter.onConnectionOpened(node);
return nodeChannels;
try {
NodeChannels nodeChannels = connectToChannels(node, profile);
transportServiceAdapter.onConnectionOpened(node);
return nodeChannels;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
// ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure
// only relevant exceptions are logged on the caller end.. this is the same as in connectToNode
throw new ConnectTransportException(node, "general node connection failure", e);
}
}
/**
@ -1466,47 +1519,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// pkg private for testing
final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Version> versionRef = new AtomicReference<>();
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
AtomicBoolean handshakeNotSupported = new AtomicBoolean(false);
numHandshakes.inc();
final long requestId = newRequestId();
pendingHandshakes.put(requestId, new TransportResponseHandler<VersionHandshakeResponse>() {
@Override
public VersionHandshakeResponse newInstance() {
return new VersionHandshakeResponse();
}
@Override
public void handleResponse(VersionHandshakeResponse response) {
final boolean success = versionRef.compareAndSet(null, response.version);
assert success;
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
Throwable cause = exp.getCause();
if (cause != null
&& cause instanceof ActionNotFoundTransportException
// this will happen if we talk to a node (pre 5.2) that doesn't haven a handshake handler
// we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version.
&& cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) {
handshakeNotSupported.set(true);
} else {
final boolean success = exceptionRef.compareAndSet(null, exp);
assert success;
}
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel);
AtomicReference<Version> versionRef = handler.versionRef;
AtomicReference<Exception> exceptionRef = handler.exceptionRef;
pendingHandshakes.put(requestId, handler);
boolean success = false;
try {
// for the request we use the minCompatVersion since we don't know what's the version of the node we talk to
@ -1515,11 +1533,11 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final Version minCompatVersion = getCurrentVersion().minimumCompatibilityVersion();
sendRequestToChannel(node, channel, requestId, HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte)0));
if (latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) {
if (handler.latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) {
throw new ConnectTransportException(node, "handshake_timeout[" + timeout + "]");
}
success = true;
if (handshakeNotSupported.get()) {
if (handler.handshakeNotSupported.get()) {
// this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported
// this will go away in master once it's all ported to 5.2 but for now we keep this to make
// the backport straight forward
@ -1555,4 +1573,18 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
public long newRequestId() {
return requestIdGenerator.incrementAndGet();
}
/**
* Called by sub-classes for each channel that is closed
*/
protected final void onChannelClosed(Channel channel) {
Optional<Map.Entry<Long, HandshakeResponseHandler>> first = pendingHandshakes.entrySet().stream()
.filter((entry) -> entry.getValue().channel == channel).findFirst();
if(first.isPresent()) {
final Long requestId = first.get().getKey();
HandshakeResponseHandler handler = first.get().getValue();
pendingHandshakes.remove(requestId);
handler.handleException(new TransportException("connection reset"));
}
}
}

View File

@ -385,7 +385,6 @@ public class Netty4Transport extends TcpTransport<Channel> {
}
throw e;
}
onAfterChannelsConnected(nodeChannels);
success = true;
} finally {
if (success == false) {
@ -399,14 +398,6 @@ public class Netty4Transport extends TcpTransport<Channel> {
return nodeChannels;
}
/**
* Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is
* not listed as being connected to.
* @param nodeChannels the {@link NodeChannels} that have been connected
*/
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
}
private class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node;
@ -417,6 +408,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
onChannelClosed(future.channel());
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(future.channel())) {
threadPool.generic().execute(() -> disconnectFromNode(node, future.channel(), "channel closed event"));

View File

@ -46,9 +46,11 @@ import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -1847,4 +1849,39 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertEquals("[][" + dummy.getAddress() +"] handshake_timeout[1ms]", ex.getMessage());
}
}
public void testTcpHandshakeConnectionReset() throws IOException, InterruptedException {
try (ServerSocket socket = new ServerSocket()) {
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
socket.setReuseAddress(true);
DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
Thread t = new Thread() {
@Override
public void run() {
try {
Socket accept = socket.accept();
accept.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};
t.start();
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
builder.setHandshakeTimeout(TimeValue.timeValueHours(1));
ConnectTransportException ex = expectThrows(ConnectTransportException.class,
() -> serviceA.connectToNode(dummy, builder.build()));
assertEquals("[][" + dummy.getAddress() +"] general node connection failure", ex.getMessage());
assertEquals("handshake failed", ex.getCause().getMessage());
t.join();
}
}
}

View File

@ -336,6 +336,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
if (isOpen.compareAndSet(true, false)) {
//establish a happens-before edge between closing and accepting a new connection
synchronized (this) {
onChannelClosed(this);
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()),
() -> cancellableThreads.cancel("channel closed"), onClose);
}