From 7a9b667e988f3af048374ed660b4594c6ae96b8f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Dec 2016 21:06:23 +0100 Subject: [PATCH] Introduce a low level protocol handshake (#22094) Today we rely on the version that the API user passes in together with the DiscoveryNode. This commit introduces a low level handshake where nodes exchange their version to be used with the transport protocol that is executed every time a connection to a node is established. This, on the one hand allows to change the wire protocol based on the version we are talking to even without a full cluster restart. Today we would need to carry on a BWC layer across major versions but with a handshake we can rely on the fact that the latest version of the previous minor executes a handshake and uses the latest protocol version across all communication with the N+1 version nodes. This change is yet fully backwards compatible, a followup PR will remove the BWC in 6.0 once this has been back-ported to the 5.x branch --- .../transport/ConnectionProfile.java | 29 +- .../elasticsearch/transport/TcpTransport.java | 249 +++++++++++++++--- .../elasticsearch/transport/Transport.java | 13 + .../transport/TransportService.java | 8 +- .../transport/TransportStatus.java | 12 + .../transport/FailAndRetryMockTransport.java | 8 + .../cluster/NodeConnectionsServiceTests.java | 8 +- .../transport/TCPTransportTests.java | 2 +- .../netty4/Netty4MessageChannelHandler.java | 6 +- .../transport/netty4/Netty4TransportIT.java | 4 +- .../test/transport/CapturingTransport.java | 8 +- .../test/transport/MockTransportService.java | 13 + .../AbstractSimpleTransportTestCase.java | 212 ++++++++++----- .../transport/MockTcpTransport.java | 6 +- 14 files changed, 449 insertions(+), 129 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index f6aa1e8445b..92421adea6a 100644 --- a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -44,16 +44,19 @@ public final class ConnectionProfile { TransportRequestOptions.Type.PING, TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.REG, - TransportRequestOptions.Type.STATE))), 1, null); + TransportRequestOptions.Type.STATE))), 1, null, null); private final List handles; private final int numConnections; private final TimeValue connectTimeout; + private final TimeValue handshakeTimeout; - private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout) { + private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout) + { this.handles = handles; this.numConnections = numConnections; this.connectTimeout = connectTimeout; + this.handshakeTimeout = handshakeTimeout; } /** @@ -64,9 +67,10 @@ public final class ConnectionProfile { private final Set addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class); private int offset = 0; private TimeValue connectTimeout; + private TimeValue handshakeTimeout; /** - * Sets a connect connectTimeout for this connection profile + * Sets a connect timeout for this connection profile */ public void setConnectTimeout(TimeValue connectTimeout) { if (connectTimeout.millis() < 0) { @@ -75,6 +79,16 @@ public final class ConnectionProfile { this.connectTimeout = connectTimeout; } + /** + * Sets a handshake timeout for this connection profile + */ + public void setHandshakeTimeout(TimeValue handshakeTimeout) { + if (handshakeTimeout.millis() < 0) { + throw new IllegalArgumentException("handshakeTimeout must be non-negative but was: " + handshakeTimeout); + } + this.handshakeTimeout = handshakeTimeout; + } + /** * Adds a number of connections for one or more types. Each type can only be added once. * @param numConnections the number of connections to use in the pool for the given connection types @@ -104,7 +118,7 @@ public final class ConnectionProfile { if (types.isEmpty() == false) { throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); } - return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout); + return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout); } } @@ -116,6 +130,13 @@ public final class ConnectionProfile { return connectTimeout; } + /** + * Returns the handshake timeout or null if no explicit timeout is set on this profile. + */ + public TimeValue getHandshakeTimeout() { + return handshakeTimeout; + } + /** * Returns the total number of connections for this profile */ diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 7522ab98ed4..a40284463a6 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -20,6 +20,8 @@ package org.elasticsearch.transport; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; +import com.carrotsearch.hppc.LongObjectHashMap; +import com.carrotsearch.hppc.LongObjectMap; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; @@ -91,6 +93,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -145,9 +148,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope); + // test-setting only + static final Setting CONNECTION_HANDSHAKE = Setting.boolSetting("transport.tcp.handshake", true); + private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final int PING_DATA_SIZE = -1; - protected final boolean blockingClient; private final CircuitBreakerService circuitBreakerService; // package visibility for tests @@ -156,6 +161,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final ThreadPool threadPool; private final BigArrays bigArrays; protected final NetworkService networkService; + private final boolean doHandshakes; protected volatile TransportServiceAdapter transportServiceAdapter; // node id to actual channel @@ -174,6 +180,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private final String transportName; protected final ConnectionProfile defaultConnectionProfile; + private final LongObjectMap> pendingHandshakes = new LongObjectHashMap<>(); + private final AtomicLong requestIdGenerator = new AtomicLong(); + private final CounterMetric numHandshakes = new CounterMetric(); + private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; + public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -189,6 +200,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i this.transportName = transportName; this.blockingClient = TCP_BLOCKING_CLIENT.get(settings); defaultConnectionProfile = buildDefaultConnectionProfile(settings); + this.doHandshakes = CONNECTION_HANDSHAKE.get(settings); } static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { @@ -224,17 +236,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override public void transportServiceAdapter(TransportServiceAdapter service) { + if (service.getRequestHandler(HANDSHAKE_ACTION_NAME) != null) { + throw new IllegalStateException(HANDSHAKE_ACTION_NAME + " is a reserved request handler and must not be registered"); + } this.transportServiceAdapter = service; } - public Settings settings() { - return this.settings; - } - - public boolean isCompressed() { - return compress; - } - public class ScheduledPing extends AbstractLifecycleRunnable { /** @@ -312,23 +319,36 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } public final class NodeChannels implements Connection { - private final Map typeMapping - = new EnumMap<>(TransportRequestOptions.Type.class); + private final Map typeMapping; private final Channel[] channels; private final DiscoveryNode node; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Version version; public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) { this.node = node; this.channels = channels; assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == " + connectionProfile.getNumConnections() + " but was: [" + channels.length + "]"; + typeMapping = new EnumMap<>(TransportRequestOptions.Type.class); for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) { for (TransportRequestOptions.Type type : handle.getTypes()) typeMapping.put(type, handle); } + version = node.getVersion(); } + NodeChannels(NodeChannels channels, Version handshakeVersion) { + this.node = channels.node; + this.channels = channels.channels; + this.typeMapping = channels.typeMapping; + this.version = handshakeVersion; + } + + @Override + public Version getVersion() { + return version; + } public boolean hasChannel(Channel channel) { for (Channel channel1 : channels) { @@ -370,7 +390,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i throw new NodeNotConnectedException(node, "connection already closed"); } Channel channel = channel(options.type()); - sendRequestToChannel(this.node, channel, requestId, action, request, options); + sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte)0); } } @@ -408,6 +428,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i "failed to connect to [{}], cleaning dangling connections", node), e); throw e; } + if (doHandshakes) { // some tests need to disable this + Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING); + final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ? + defaultConnectionProfile.getConnectTimeout(): + connectionProfile.getConnectTimeout(); + final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ? + connectTimeout : connectionProfile.getHandshakeTimeout(); + Version version = executeHandshake(node, channel, handshakeTimeout); + if (version != null) { + // this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported + // this will go away in master once it's all ported to 5.2 but for now we keep this to make + // the backport straight forward + nodeChannels = new NodeChannels(nodeChannels, version); + } + } + // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("connected to node [{}]", node); @@ -486,7 +522,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } @Override - public Connection getConnection(DiscoveryNode node) { + public NodeChannels getConnection(DiscoveryNode node) { NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels == null) { throw new NodeNotConnectedException(node, "Node not connected"); @@ -521,7 +557,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected Map buildProfileSettings() { // extract default profile first and create standard bootstrap - Map profiles = TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings()).getAsGroups(true); + Map profiles = TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true); if (!profiles.containsKey(TransportSettings.DEFAULT_PROFILE)) { profiles = new HashMap<>(profiles); profiles.put(TransportSettings.DEFAULT_PROFILE, Settings.EMPTY); @@ -894,14 +930,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return compress && (!(request instanceof BytesTransportRequest)); } - - protected void sendRequestToChannel(DiscoveryNode node, Channel targetChannel, final long requestId, final String action, - final TransportRequest request, TransportRequestOptions options) throws IOException, + private void sendRequestToChannel(DiscoveryNode node, final Channel targetChannel, final long requestId, final String action, + final TransportRequest request, TransportRequestOptions options, Version channelVersion, + byte status) throws IOException, TransportException { if (compress) { options = TransportRequestOptions.builder(options).withCompress(true).build(); } - byte status = 0; status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); // we wrap this in a release once since if the onRequestSent callback throws an exception @@ -920,7 +955,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // we pick the smallest of the 2, to support both backward and forward compatibility // note, this is the only place we need to do this, since from here on, we use the serialized version // as the version to use also when the node receiving this request will send the response with - Version version = Version.min(getCurrentVersion(), node.getVersion()); + Version version = Version.min(getCurrentVersion(), channelVersion); stream.setVersion(version); threadPool.getThreadContext().writeTo(stream); @@ -995,10 +1030,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i */ public void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId, final String action, TransportResponseOptions options) throws IOException { + sendResponse(nodeVersion, channel, response, requestId, action, options, (byte)0); + } + + private void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId, + final String action, TransportResponseOptions options, byte status) throws IOException { if (compress) { options = TransportResponseOptions.builder(options).withCompress(true).build(); } - byte status = 0; status = TransportStatus.setResponse(status); // TODO share some code with sendRequest ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); // we wrap this in a release once since if the onRequestSent callback throws an exception @@ -1129,6 +1168,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return false; } } + if (dataLen <= 0) { throw new StreamCorruptedException("invalid data length: " + dataLen); } @@ -1218,9 +1258,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i streamIn.setVersion(version); threadPool.getThreadContext().readHeaders(streamIn); if (TransportStatus.isRequest(status)) { - handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress); + handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status); } else { - final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); + final TransportResponseHandler handler; + if (TransportStatus.isHandshake(status) && doHandshakes) { + handler = pendingHandshakes.remove(requestId); + } else { + TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId); + if (theHandler == null && TransportStatus.isError(status)) { + handler = pendingHandshakes.remove(requestId); + } else { + handler = theHandler; + } + } // ignore if its null, the adapter logs it if (handler != null) { if (TransportStatus.isError(status)) { @@ -1297,29 +1347,35 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i }); } - protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, - int messageLengthBytes, Version version, InetSocketAddress remoteAddress) throws IOException { + protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes, + Version version, InetSocketAddress remoteAddress, byte status) throws IOException { final String action = stream.readString(); transportServiceAdapter.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); - if (reg == null) { - throw new ActionNotFoundTransportException(action); - } - if (reg.canTripCircuitBreaker()) { - getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + if (TransportStatus.isHandshake(status) && doHandshakes) { + final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion()); + sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, + TransportStatus.setHandshake((byte)0)); } else { - getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); + final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); + if (reg == null) { + throw new ActionNotFoundTransportException(action); + } + if (reg.canTripCircuitBreaker()) { + getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } else { + getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); + } + transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, + messageLengthBytes); + final TransportRequest request = reg.newRequest(); + request.remoteAddress(new TransportAddress(remoteAddress)); + request.readFrom(stream); + // in case we throw an exception, i.e. when the limit is hit, we don't want to verify + validateRequest(stream, requestId, action); + threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } - transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, - messageLengthBytes); - final TransportRequest request = reg.newRequest(); - request.remoteAddress(new TransportAddress(remoteAddress)); - request.readFrom(stream); - // in case we throw an exception, i.e. when the limit is hit, we don't want to verify - validateRequest(stream, requestId, action); - threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } catch (Exception e) { // the circuit breaker tripped if (transportChannel == null) { @@ -1384,4 +1440,119 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } } + + private static final class VersionHandshakeResponse extends TransportResponse { + private Version version; + + private VersionHandshakeResponse(Version version) { + this.version = version; + } + + private VersionHandshakeResponse() {} + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + version = Version.readVersion(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + assert version != null; + Version.writeVersion(version, out); + } + } + + // pkg private for testing + final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference versionRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + AtomicBoolean handshakeNotSupported = new AtomicBoolean(false); + numHandshakes.inc(); + final long requestId = newRequestId(); + pendingHandshakes.put(requestId, new TransportResponseHandler() { + + @Override + public VersionHandshakeResponse newInstance() { + return new VersionHandshakeResponse(); + } + + @Override + public void handleResponse(VersionHandshakeResponse response) { + final boolean success = versionRef.compareAndSet(null, response.version); + assert success; + latch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + Throwable cause = exp.getCause(); + if (cause != null + && cause instanceof ActionNotFoundTransportException + // this will happen if we talk to a node (pre 5.2) that doesn't haven a handshake handler + // we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version. + && cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) { + handshakeNotSupported.set(true); + } else { + final boolean success = exceptionRef.compareAndSet(null, exp); + assert success; + } + latch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + boolean success = false; + try { + // for the request we use the minCompatVersion since we don't know what's the version of the node we talk to + // we also have no payload on the request but the response will contain the actual version of the node we talk + // to as the payload. + final Version minCompatVersion = getCurrentVersion().minimumCompatibilityVersion(); + sendRequestToChannel(node, channel, requestId, HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte)0)); + if (latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) { + throw new ConnectTransportException(node, "handshake_timeout[" + timeout + "]"); + } + success = true; + if (handshakeNotSupported.get()) { + // this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported + // this will go away in master once it's all ported to 5.2 but for now we keep this to make + // the backport straight forward + return null; + } + if (exceptionRef.get() != null) { + throw new IllegalStateException("handshake failed", exceptionRef.get()); + } else { + Version version = versionRef.get(); + if (getCurrentVersion().isCompatible(version) == false) { + throw new IllegalStateException("Received message from unsupported version: [" + version + + "] minimal compatible version is: [" + getCurrentVersion().minimumCompatibilityVersion() + "]"); + } + return version; + } + } finally { + final TransportResponseHandler removedHandler = pendingHandshakes.remove(requestId); + // in the case of a timeout or an exception on the send part the handshake has not been removed yet. + // but the timeout is tricky since it's basically a race condition so we only assert on the success case. + assert success && removedHandler == null || success == false : "handler for requestId [" + requestId + "] is not been removed"; + } + } + + final int getNumPendingHandshakes() { // for testing + return pendingHandshakes.size(); + } + + final long getNumHandshakes() { + return numHandshakes.count(); // for testing + } + + @Override + public long newRequestId() { + return requestIdGenerator.incrementAndGet(); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index b0821c609c0..e337aaf41b2 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; @@ -83,6 +84,11 @@ public interface Transport extends LifecycleComponent { return new NoopCircuitBreaker("in-flight-noop"); } + /** + * Returns a new request ID to use when sending a message via {@link Connection#sendRequest(long, String, + * TransportRequest, TransportRequestOptions)} + */ + long newRequestId(); /** * Returns a connection for the given node if the node is connected. * Connections returned from this method must not be closed. The lifecylce of this connection is maintained by the Transport @@ -116,5 +122,12 @@ public interface Transport extends LifecycleComponent { */ void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; + + /** + * Returns the version of the node this connection was established with. + */ + default Version getVersion() { + return getNode().getVersion(); + } } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 543cb5c940e..a02b763f2d9 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -86,8 +86,6 @@ public class TransportService extends AbstractLifecycleComponent { final ConcurrentMapLong clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); - private final AtomicLong requestIds = new AtomicLong(); - final CopyOnWriteArrayList connectionListeners = new CopyOnWriteArrayList<>(); private final TransportInterceptor interceptor; @@ -520,7 +518,7 @@ public class TransportService extends AbstractLifecycleComponent { throw new IllegalStateException("can't send request to a null connection"); } DiscoveryNode node = connection.getNode(); - final long requestId = newRequestId(); + final long requestId = transport.newRequestId(); final TimeoutHandler timeoutHandler; try { @@ -643,10 +641,6 @@ public class TransportService extends AbstractLifecycleComponent { return true; } - private long newRequestId() { - return requestIds.getAndIncrement(); - } - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportStatus.java b/core/src/main/java/org/elasticsearch/transport/TransportStatus.java index 42e0e962a62..39472cbe3cd 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportStatus.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportStatus.java @@ -24,6 +24,7 @@ final class TransportStatus { private static final byte STATUS_REQRES = 1 << 0; private static final byte STATUS_ERROR = 1 << 1; private static final byte STATUS_COMPRESS = 1 << 2; + private static final byte STATUS_HANDSHAKE = 1 << 3; public static boolean isRequest(byte value) { return (value & STATUS_REQRES) == 0; @@ -56,4 +57,15 @@ final class TransportStatus { value |= STATUS_COMPRESS; return value; } + + static boolean isHandshake(byte value) { // pkg private since it's only used internally + return (value & STATUS_HANDSHAKE) != 0; + } + + static byte setHandshake(byte value) { // pkg private since it's only used internally + value |= STATUS_HANDSHAKE; + return value; + } + + } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index fabd0c70f16..de75b920ce3 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -48,7 +48,9 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; abstract class FailAndRetryMockTransport implements Transport { @@ -63,6 +65,7 @@ abstract class FailAndRetryMockTransport imp private final AtomicInteger failures = new AtomicInteger(); private final AtomicInteger successes = new AtomicInteger(); private final Set triedNodes = new CopyOnWriteArraySet<>(); + private final AtomicLong requestId = new AtomicLong(); FailAndRetryMockTransport(Random random, ClusterName clusterName) { this.random = new Random(random.nextLong()); @@ -221,4 +224,9 @@ abstract class FailAndRetryMockTransport imp public Map profileBoundAddresses() { return Collections.emptyMap(); } + + @Override + public long newRequestId() { + return requestId.incrementAndGet(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index d5d3099c4d0..2b8333700a3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -170,7 +171,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { final class MockTransport implements Transport { - + private final AtomicLong requestId = new AtomicLong(); Set connectedNodes = ConcurrentCollections.newConcurrentSet(); volatile boolean randomConnectionExceptions = false; @@ -249,6 +250,11 @@ public class NodeConnectionsServiceTests extends ESTestCase { return null; } + @Override + public long newRequestId() { + return requestId.incrementAndGet(); + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index f081cfa45e4..aabc58887b2 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -207,7 +207,7 @@ public class TCPTransportTests extends ESTestCase { } @Override - public Connection getConnection(DiscoveryNode node) { + public NodeChannels getConnection(DiscoveryNode node) { return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()], ConnectionProfile.LIGHT_PROFILE); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 9741266a485..e83cfc62fda 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -23,9 +23,9 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.util.ReferenceCountUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; +import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.Transports; @@ -66,10 +66,10 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { final ByteBuf buffer = (ByteBuf) msg; final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE); final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize; - InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); try { + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh - // buffer, or in the cumulation buffer, which is cleaned each time so it could be bigger than the actual size + // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); transport.messageReceived(reference, ctx.channel(), profileName, remoteAddress, remainingMessageSize); } finally { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java index fe6498b08c1..7e54b53de49 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java @@ -111,9 +111,9 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { protected String handleRequest(Channel channel, String profileName, StreamInput stream, long requestId, int messageLengthBytes, Version version, - InetSocketAddress remoteAddress) throws IOException { + InetSocketAddress remoteAddress, byte status) throws IOException { String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version, - remoteAddress); + remoteAddress, status); channelProfileName = TransportSettings.DEFAULT_PROFILE; return action; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 809fff8bc0d..ffccdaac722 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.lucene.util.LuceneTestCase.rarely; @@ -75,6 +76,8 @@ public class CapturingTransport implements Transport { private ConcurrentMap> requests = new ConcurrentHashMap<>(); private BlockingQueue capturedRequests = ConcurrentCollections.newBlockingQueue(); + private final AtomicLong requestId = new AtomicLong(); + /** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */ public CapturedRequest[] capturedRequests() { @@ -279,6 +282,10 @@ public class CapturingTransport implements Transport { } @Override + public long newRequestId() { + return requestId.incrementAndGet(); + } + public Connection getConnection(DiscoveryNode node) { try { return openConnection(node, null); @@ -286,5 +293,4 @@ public class CapturingTransport implements Transport { throw new UncheckedIOException(e); } } - } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index fad7203283a..dd05457cec1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -500,6 +500,11 @@ public final class MockTransportService extends TransportService { return transport.getLocalAddresses(); } + @Override + public long newRequestId() { + return transport.newRequestId(); + } + @Override public Connection getConnection(DiscoveryNode node) { return new FilteredConnection(transport.getConnection(node)) { @@ -688,4 +693,12 @@ public final class MockTransportService extends TransportService { connection.close(); } } + + public Transport getOriginalTransport() { + Transport transport = transport(); + while (transport instanceof DelegateTransport) { + transport = ((DelegateTransport) transport).transport; + } + return transport; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index aa2271c9655..529013cbaca 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.Constants; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -117,7 +118,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }; serviceA.addConnectionListener(waitForConnection); serviceB.addConnectionListener(waitForConnection); - + int numHandshakes = 1; if (useLocalNode) { logger.info("--> using local node optimization"); serviceA.setLocalNode(nodeA); @@ -126,36 +127,69 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { logger.info("--> actively connecting to local node"); serviceA.connectToNode(nodeA); serviceB.connectToNode(nodeB); + assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport()); + assertNumHandshakes(numHandshakes, serviceB.getOriginalTransport()); + numHandshakes++; } - serviceA.connectToNode(nodeB); serviceB.connectToNode(nodeA); + assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport()); + assertNumHandshakes(numHandshakes, serviceB.getOriginalTransport()); assertThat("failed to wait for all nodes to connect", latch.await(5, TimeUnit.SECONDS), equalTo(true)); serviceA.removeConnectionListener(waitForConnection); serviceB.removeConnectionListener(waitForConnection); } - private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { + private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, + Settings settings, boolean acceptRequests) { MockTransportService service = build( Settings.builder() + .put(settings) .put(Node.NODE_NAME_SETTING.getKey(), name) .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), version, clusterSettings); - service.acceptIncomingRequests(); + if (acceptRequests) { + service.acceptIncomingRequests(); + } return service; } + private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { + return buildService(name, version, clusterSettings, Settings.EMPTY, true); + } + @Override @After public void tearDown() throws Exception { super.tearDown(); - serviceA.close(); - serviceB.close(); - terminate(threadPool); + try { + assertNoPendingHandshakes(serviceA.getOriginalTransport()); + assertNoPendingHandshakes(serviceB.getOriginalTransport()); + } finally { + IOUtils.close(serviceA, serviceB, () -> { + try { + terminate(threadPool); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + } + + public void assertNumHandshakes(long expected, Transport transport) { + if (transport instanceof TcpTransport) { + assertEquals(expected, ((TcpTransport) transport).getNumHandshakes()); + } + } + + public void assertNoPendingHandshakes(Transport transport) { + if (transport instanceof TcpTransport) { + assertEquals(0, ((TcpTransport) transport).getNumPendingHandshakes()); + } } public void testHelloWorld() { @@ -1426,56 +1460,52 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testBlockingIncomingRequests() throws Exception { - TransportService service = build( - Settings.builder() - .put("name", "TS_TEST") - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") - .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") - .build(), - version0, - null); - AtomicBoolean requestProcessed = new AtomicBoolean(); - service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, - (request, channel) -> { - requestProcessed.set(true); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + try (TransportService service = buildService("TS_TEST", version0, null, + Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), false)) { + AtomicBoolean requestProcessed = new AtomicBoolean(false); + service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + requestProcessed.set(true); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + }); + + DiscoveryNode node = + new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + serviceA.close(); + serviceA = buildService("TS_A", version0, null, + Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true); + serviceA.connectToNode(node); + + CountDownLatch latch = new CountDownLatch(1); + serviceA.sendRequest(node, "action", new TestRequest(), new TransportResponseHandler() { + @Override + public TestResponse newInstance() { + return new TestResponse(); + } + + @Override + public void handleResponse(TestResponse response) { + latch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + latch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } }); - DiscoveryNode node = - new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); - serviceA.connectToNode(node); + assertFalse(requestProcessed.get()); - CountDownLatch latch = new CountDownLatch(1); - serviceA.sendRequest(node, "action", new TestRequest(), new TransportResponseHandler() { - @Override - public TestResponse newInstance() { - return new TestResponse(); - } - - @Override - public void handleResponse(TestResponse response) { - latch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - latch.countDown(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - - assertFalse(requestProcessed.get()); - - service.acceptIncomingRequests(); - assertBusy(() -> assertTrue(requestProcessed.get())); - - latch.await(); - service.close(); + service.acceptIncomingRequests(); + assertBusy(() -> assertTrue(requestProcessed.get())); + latch.await(); + } } public static class TestRequest extends TransportRequest { @@ -1752,21 +1782,69 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE); - // connection with one connection and a large timeout -- should consume the one spot in the backlog queue - serviceA.connectToNode(first, builder.build()); - builder.setConnectTimeout(TimeValue.timeValueMillis(1)); - final ConnectionProfile profile = builder.build(); - // now with the 1ms timeout we got and test that is it's applied - long startTime = System.nanoTime(); - ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> { - serviceA.connectToNode(second, profile); - }); - final long now = System.nanoTime(); - final long timeTaken = TimeValue.nsecToMSec(now - startTime); - assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]", - timeTaken < TimeValue.timeValueSeconds(5).millis()); - assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]"); + try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, + Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) { + service.connectToNode(first, builder.build()); + builder.setConnectTimeout(TimeValue.timeValueMillis(1)); + final ConnectionProfile profile = builder.build(); + // now with the 1ms timeout we got and test that is it's applied + long startTime = System.nanoTime(); + ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> service.connectToNode(second, profile)); + final long now = System.nanoTime(); + final long timeTaken = TimeValue.nsecToMSec(now - startTime); + assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]", + timeTaken < TimeValue.timeValueSeconds(5).millis()); + assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]"); + } + } + } + + public void testTcpHandshake() throws IOException, InterruptedException { + assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); + TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); + try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, + Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) { + // this acts like a node that doesn't have support for handshakes + DiscoveryNode node = + new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + serviceA.connectToNode(node); + TcpTransport.NodeChannels connection = originalTransport.getConnection(node); + Version version = originalTransport.executeHandshake(node, connection.channel(TransportRequestOptions.Type.PING), + TimeValue.timeValueSeconds(10)); + assertNull(version); + serviceA.disconnectFromNode(node); + } + + try (TransportService service = buildService("TS_TPC", Version.CURRENT, null)) { + DiscoveryNode node = + new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + serviceA.connectToNode(node); + TcpTransport.NodeChannels connection = originalTransport.getConnection(node); + Version version = originalTransport.executeHandshake(node, connection.channel(TransportRequestOptions.Type.PING), + TimeValue.timeValueSeconds(10)); + assertEquals(version, Version.CURRENT); + } + } + + public void testTcpHandshakeTimeout() throws IOException { + try (ServerSocket socket = new ServerSocket()) { + socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); + socket.setReuseAddress(true); + DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), + socket.getLocalPort()), emptyMap(), + emptySet(), version0); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + builder.setHandshakeTimeout(TimeValue.timeValueMillis(1)); + ConnectTransportException ex = expectThrows(ConnectTransportException.class, + () -> serviceA.connectToNode(dummy, builder.build())); + assertEquals("[][" + dummy.getAddress() +"] handshake_timeout[1ms]", ex.getMessage()); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 0778c344bcc..056daf417bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -22,7 +22,6 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -70,7 +69,6 @@ public class MockTcpTransport extends TcpTransport private final ExecutorService executor; private final Version mockVersion; - @Inject public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -97,7 +95,7 @@ public class MockTcpTransport extends TcpTransport protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { ServerSocket socket = new ServerSocket(); socket.bind(address); - socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings())); + socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); if (tcpReceiveBufferSize.getBytes() > 0) { socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); @@ -212,7 +210,7 @@ public class MockTcpTransport extends TcpTransport if (tcpReceiveBufferSize.getBytes() > 0) { socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); } - socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings())); + socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } @Override