diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index cd23fc9b074..2e8cb4f65ce 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -148,9 +148,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope); - // test-setting only - static final Setting CONNECTION_HANDSHAKE = Setting.boolSetting("transport.tcp.handshake", true); - private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final int PING_DATA_SIZE = -1; protected final boolean blockingClient; @@ -161,7 +158,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final ThreadPool threadPool; private final BigArrays bigArrays; protected final NetworkService networkService; - private final boolean doHandshakes; protected volatile TransportServiceAdapter transportServiceAdapter; // node id to actual channel @@ -200,7 +196,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i this.transportName = transportName; this.blockingClient = TCP_BLOCKING_CLIENT.get(settings); defaultConnectionProfile = buildDefaultConnectionProfile(settings); - this.doHandshakes = CONNECTION_HANDSHAKE.get(settings); } static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { @@ -463,21 +458,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i "failed to connect to [{}], cleaning dangling connections", node), e); throw e; } - if (doHandshakes) { // some tests need to disable this - Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING); - final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ? - defaultConnectionProfile.getConnectTimeout(): - connectionProfile.getConnectTimeout(); - final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ? - connectTimeout : connectionProfile.getHandshakeTimeout(); - Version version = executeHandshake(node, channel, handshakeTimeout); - if (version != null) { - // this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported - // this will go away in master once it's all ported to 5.2 but for now we keep this to make - // the backport straight forward - nodeChannels = new NodeChannels(nodeChannels, version); - } - } + Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING); + final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ? + defaultConnectionProfile.getConnectTimeout() : + connectionProfile.getConnectTimeout(); + final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ? + connectTimeout : connectionProfile.getHandshakeTimeout(); + Version version = executeHandshake(node, channel, handshakeTimeout); // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { @@ -1130,7 +1117,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i * @param length the payload length in bytes * @see TcpHeader */ - private BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException { + final BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException { try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) { headerOutput.setVersion(protocolVersion); TcpHeader.writeHeader(headerOutput, requestId, status, protocolVersion, length); @@ -1306,7 +1293,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status); } else { final TransportResponseHandler handler; - if (TransportStatus.isHandshake(status) && doHandshakes) { + if (TransportStatus.isHandshake(status)) { handler = pendingHandshakes.remove(requestId); } else { TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId); @@ -1398,7 +1385,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i transportServiceAdapter.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { - if (TransportStatus.isHandshake(status) && doHandshakes) { + if (TransportStatus.isHandshake(status)) { final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion()); sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte)0)); @@ -1509,8 +1496,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } - // pkg private for testing - final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException { + protected Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException { numHandshakes.inc(); final long requestId = newRequestId(); final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel); @@ -1520,7 +1506,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i boolean success = false; try { if (isOpen(channel) == false) { - // we have to protect ourself here since sendRequestToChannel won't barf if the channel is closed. + // we have to protect us here since sendRequestToChannel won't barf if the channel is closed. // it's weird but to change it will cause a lot of impact on the exception handling code all over the codebase. // yet, if we don't check the state here we might have registered a pending handshake handler but the close // listener calling #onChannelClosed might have already run and we are waiting on the latch below unitl we time out. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index a7a674007ba..3875cea31ac 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty4; +import io.netty.channel.Channel; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -26,6 +27,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -38,6 +40,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; @@ -49,10 +52,21 @@ import static org.hamcrest.Matchers.containsString; public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase { public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, - ClusterSettings clusterSettings) { + ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { + + @Override + protected Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, + InterruptedException { + if (doHandshake) { + return super.executeHandshake(node, channel, timeout); + } else { + return version.minimumCompatibilityVersion(); + } + } + @Override protected Version getCurrentVersion() { return version; @@ -63,9 +77,9 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase } @Override - protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) { + protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build(); - MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings); + MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake); transportService.start(); return transportService; } @@ -92,7 +106,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase .build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> { - MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings); + MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true); try { transportService.start(); } finally { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 283ae288d29..540420f35e7 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -29,15 +29,21 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -48,13 +54,15 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,7 +78,6 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -94,7 +101,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected volatile DiscoveryNode nodeB; protected volatile MockTransportService serviceB; - protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings); + protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); @Override @Before @@ -149,7 +156,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, - Settings settings, boolean acceptRequests) { + Settings settings, boolean acceptRequests, boolean doHandshake) { MockTransportService service = build( Settings.builder() .put(settings) @@ -158,7 +165,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), version, - clusterSettings); + clusterSettings, doHandshake); if (acceptRequests) { service.acceptIncomingRequests(); } @@ -166,7 +173,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { - return buildService(name, version, clusterSettings, Settings.EMPTY, true); + return buildService(name, version, clusterSettings, Settings.EMPTY, true, true); } @Override @@ -1463,7 +1470,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testBlockingIncomingRequests() throws Exception { try (TransportService service = buildService("TS_TEST", version0, null, - Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), false)) { + Settings.EMPTY, false, false)) { AtomicBoolean requestProcessed = new AtomicBoolean(false); service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { @@ -1475,7 +1482,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceA.close(); serviceA = buildService("TS_A", version0, null, - Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true); + Settings.EMPTY, true, false); serviceA.connectToNode(node); CountDownLatch latch = new CountDownLatch(1); @@ -1583,7 +1590,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), version0, - null); + null, true); DiscoveryNode nodeC = new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceC.acceptIncomingRequests(); @@ -1786,7 +1793,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TransportRequestOptions.Type.STATE); // connection with one connection and a large timeout -- should consume the one spot in the backlog queue try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, - Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) { + Settings.EMPTY, true, false)) { service.connectToNode(first, builder.build()); builder.setConnectTimeout(TimeValue.timeValueMillis(1)); final ConnectionProfile profile = builder.build(); @@ -1805,11 +1812,23 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testTcpHandshake() throws IOException, InterruptedException { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); - try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, - Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); + + try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList())){ + @Override + protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId, + int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) + throws IOException { + return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress, + (byte)(status & ~(1<<3))); // we flip the isHanshake bit back and ackt like the handler is not found + } + }) { + transport.transportServiceAdapter(serviceA.new Adapter()); + transport.start(); // this acts like a node that doesn't have support for handshakes DiscoveryNode node = - new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node)); assertTrue(exception.getCause() instanceof IllegalStateException); assertEquals("handshake failed", exception.getCause().getMessage()); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 7ed12d249a5..2cc84c4c0cd 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -19,22 +19,35 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; +import java.io.IOException; import java.util.Collections; public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) { + protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version); + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version) { + @Override + protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel, TimeValue timeout) throws IOException, + InterruptedException { + if (doHandshake) { + return super.executeHandshake(node, mockChannel, timeout); + } else { + return version.minimumCompatibilityVersion(); + } + } + }; MockTransportService mockTransportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings); mockTransportService.start();