Remove `doHandshake` test-only settings from TcpTransport (#22241)

In #22094 we introduce a test-only setting to simulate transport
impls that don't support handshakes. This commit implements the same logic
without a setting.
This commit is contained in:
Simon Willnauer 2016-12-18 09:26:53 +01:00 committed by GitHub
parent b78f7bc51d
commit ccfeac8dd5
4 changed files with 77 additions and 45 deletions

View File

@ -148,9 +148,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
Setting.Property.NodeScope);
// test-setting only
static final Setting<Boolean> CONNECTION_HANDSHAKE = Setting.boolSetting("transport.tcp.handshake", true);
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1;
protected final boolean blockingClient;
@ -161,7 +158,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final ThreadPool threadPool;
private final BigArrays bigArrays;
protected final NetworkService networkService;
private final boolean doHandshakes;
protected volatile TransportServiceAdapter transportServiceAdapter;
// node id to actual channel
@ -200,7 +196,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.transportName = transportName;
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
this.doHandshakes = CONNECTION_HANDSHAKE.get(settings);
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@ -463,7 +458,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
"failed to connect to [{}], cleaning dangling connections", node), e);
throw e;
}
if (doHandshakes) { // some tests need to disable this
Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING);
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
@ -471,13 +465,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
Version version = executeHandshake(node, channel, handshakeTimeout);
if (version != null) {
// this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported
// this will go away in master once it's all ported to 5.2 but for now we keep this to make
// the backport straight forward
nodeChannels = new NodeChannels(nodeChannels, version);
}
}
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
@ -1130,7 +1117,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
* @param length the payload length in bytes
* @see TcpHeader
*/
private BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException {
final BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException {
try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) {
headerOutput.setVersion(protocolVersion);
TcpHeader.writeHeader(headerOutput, requestId, status, protocolVersion, length);
@ -1306,7 +1293,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {
final TransportResponseHandler<?> handler;
if (TransportStatus.isHandshake(status) && doHandshakes) {
if (TransportStatus.isHandshake(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId);
@ -1398,7 +1385,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
transportServiceAdapter.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status) && doHandshakes) {
if (TransportStatus.isHandshake(status)) {
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
TransportStatus.setHandshake((byte)0));
@ -1509,8 +1496,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
// pkg private for testing
final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException {
protected Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException {
numHandshakes.inc();
final long requestId = newRequestId();
final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel);
@ -1520,7 +1506,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
boolean success = false;
try {
if (isOpen(channel) == false) {
// we have to protect ourself here since sendRequestToChannel won't barf if the channel is closed.
// we have to protect us here since sendRequestToChannel won't barf if the channel is closed.
// it's weird but to change it will cause a lot of impact on the exception handling code all over the codebase.
// yet, if we don't check the state here we might have registered a pending handshake handler but the close
// listener calling #onChannelClosed might have already run and we are waiting on the latch below unitl we time out.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -26,6 +27,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
@ -38,6 +40,7 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
@ -49,10 +52,21 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings) {
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@Override
protected Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException,
InterruptedException {
if (doHandshake) {
return super.executeHandshake(node, channel, timeout);
} else {
return version.minimumCompatibilityVersion();
}
}
@Override
protected Version getCurrentVersion() {
return version;
@ -63,9 +77,9 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
}
@Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings);
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
transportService.start();
return transportService;
}
@ -92,7 +106,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings);
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
try {
transportService.start();
} finally {

View File

@ -29,15 +29,21 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -48,13 +54,15 @@ import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -70,7 +78,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
@ -94,7 +101,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected volatile DiscoveryNode nodeB;
protected volatile MockTransportService serviceB;
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings);
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
@Override
@Before
@ -149,7 +156,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests) {
Settings settings, boolean acceptRequests, boolean doHandshake) {
MockTransportService service = build(
Settings.builder()
.put(settings)
@ -158,7 +165,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version,
clusterSettings);
clusterSettings, doHandshake);
if (acceptRequests) {
service.acceptIncomingRequests();
}
@ -166,7 +173,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
return buildService(name, version, clusterSettings, Settings.EMPTY, true);
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
}
@Override
@ -1463,7 +1470,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testBlockingIncomingRequests() throws Exception {
try (TransportService service = buildService("TS_TEST", version0, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), false)) {
Settings.EMPTY, false, false)) {
AtomicBoolean requestProcessed = new AtomicBoolean(false);
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
@ -1475,7 +1482,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceA.close();
serviceA = buildService("TS_A", version0, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true);
Settings.EMPTY, true, false);
serviceA.connectToNode(node);
CountDownLatch latch = new CountDownLatch(1);
@ -1583,7 +1590,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version0,
null);
null, true);
DiscoveryNode nodeC =
new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceC.acceptIncomingRequests();
@ -1786,7 +1793,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
TransportRequestOptions.Type.STATE);
// connection with one connection and a large timeout -- should consume the one spot in the backlog queue
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) {
Settings.EMPTY, true, false)) {
service.connectToNode(first, builder.build());
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
final ConnectionProfile profile = builder.build();
@ -1805,11 +1812,23 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null,
Settings.builder().put(TcpTransport.CONNECTION_HANDSHAKE.getKey(), false).build(), true)) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList())){
@Override
protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
throws IOException {
return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress,
(byte)(status & ~(1<<3))); // we flip the isHanshake bit back and ackt like the handler is not found
}
}) {
transport.transportServiceAdapter(serviceA.new Adapter());
transport.start();
// this acts like a node that doesn't have support for handshakes
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node));
assertTrue(exception.getCause() instanceof IllegalStateException);
assertEquals("handshake failed", exception.getCause().getMessage());

View File

@ -19,22 +19,35 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.transport.MockTransportService;
import java.io.IOException;
import java.util.Collections;
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
@Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version);
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version) {
@Override
protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel, TimeValue timeout) throws IOException,
InterruptedException {
if (doHandshake) {
return super.executeHandshake(node, mockChannel, timeout);
} else {
return version.minimumCompatibilityVersion();
}
}
};
MockTransportService mockTransportService = new MockTransportService(Settings.EMPTY, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings);
mockTransportService.start();