From 784c9e5fb9c055f5bf25a7ba214d6c8644fd463c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 4 May 2016 20:06:47 -0400 Subject: [PATCH] Introduce node handshake This commit introduces a handshake when initiating a light connection. During this handshake, node information, cluster name, and version are received from the target node of the connection. This information can be used to immediately validate that the target node is a member of the same cluster, and used to set the version on the stream. This will allow us to extend APIs that are used during initial cluster recovery without a major version change. Relates #15971 --- .../TransportClientNodesService.java | 5 +- .../zen/ping/unicast/UnicastZenPing.java | 8 +- .../transport/TransportService.java | 133 ++++++++++- .../node/tasks/TaskManagerTestCase.java | 5 +- .../bulk/TransportBulkActionTookTests.java | 2 +- .../TransportBroadcastByNodeActionTests.java | 2 +- .../TransportMasterNodeActionTests.java | 2 +- .../nodes/TransportNodesActionTests.java | 2 +- .../BroadcastReplicationTests.java | 2 +- .../TransportReplicationActionTests.java | 2 +- ...ortInstanceSingleOperationActionTests.java | 2 +- .../transport/FailAndRetryMockTransport.java | 13 +- .../TransportClientHeadersTests.java | 4 +- .../TransportClientNodesServiceTests.java | 18 +- .../cluster/NodeConnectionsServiceTests.java | 2 +- .../action/shard/ShardStateActionTests.java | 2 +- .../health/ClusterStateHealthTests.java | 2 +- .../common/network/NetworkModuleTests.java | 2 +- .../discovery/ZenFaultDetectionTests.java | 8 +- .../zen/ping/unicast/UnicastZenPingIT.java | 138 ++++++++++-- .../PublishClusterStateActionTests.java | 21 +- .../mapper/DynamicMappingDisabledTests.java | 6 +- .../indices/store/IndicesStoreTests.java | 2 +- .../AbstractSimpleTransportTestCase.java | 19 +- .../NettySizeHeaderFrameDecoderTests.java | 4 +- .../NettyTransportServiceHandshakeTests.java | 206 ++++++++++++++++++ .../transport/TransportModuleTests.java | 5 +- .../local/SimpleLocalTransportTests.java | 5 +- .../netty/NettyScheduledPingTests.java | 7 +- .../netty/SimpleNettyTransportTests.java | 5 +- .../messy/tests/IndicesRequestTests.java | 5 +- .../discovery/ec2/Ec2DiscoveryTests.java | 3 +- .../discovery/gce/GceDiscoveryTests.java | 3 +- .../test/transport/MockTransportService.java | 19 +- 34 files changed, 558 insertions(+), 106 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index e407a2e7ada..68ed7c927ac 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -360,9 +360,10 @@ public class TransportClientNodesService extends AbstractComponent { try { // its a listed node, light connect to it... logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNodeLight(listedNode); + transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName); } catch (Throwable e) { logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); + newFilteredNodes.add(listedNode); continue; } } @@ -434,7 +435,7 @@ public class TransportClientNodesService extends AbstractComponent { } else { // its a listed node, light connect to it... logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNodeLight(listedNode); + transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName); } } catch (Exception e) { logger.debug("failed to connect to node [{}], ignoring...", e, listedNode); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 640582af226..cc37504360c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -402,7 +402,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // connect to the node, see if we manage to do it, if not, bail if (!nodeFoundByAddress) { logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNodeLight(finalNodeToSend); + transportService.connectToNodeLight(finalNodeToSend, timeout.getMillis()); } else { logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend); transportService.connectToNode(finalNodeToSend); @@ -473,12 +473,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // that's us, ignore continue; } - if (!pingResponse.clusterName().equals(clusterName)) { - // not part of the cluster - logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), - pingResponse.clusterName().value()); - continue; - } SendPingsHandler sendPingsHandler = receivedResponses.get(response.id); if (sendPingsHandler == null) { if (!closed) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b9663da72c2..89cc68debfd 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -19,12 +19,16 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.metrics.MeanMetric; @@ -50,6 +54,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; @@ -67,10 +72,12 @@ import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; public class TransportService extends AbstractLifecycleComponent { public static final String DIRECT_RESPONSE_PROFILE = ".direct"; + private static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); protected final Transport transport; protected final ThreadPool threadPool; + private final ClusterName clusterName; protected final TaskManager taskManager; volatile Map requestHandlers = Collections.emptyMap(); @@ -110,15 +117,16 @@ public class TransportService extends AbstractLifecycleComponent HandshakeRequest.INSTANCE, + ThreadPool.Names.SAME, + (request, channel) -> channel.sendResponse( + new HandshakeResponse(localNode, clusterName, localNode != null ? localNode.getVersion() : Version.CURRENT))); } @Override @@ -263,11 +277,120 @@ public class TransportService extends AbstractLifecycleComponent() { + @Override + public HandshakeResponse newInstance() { + return new HandshakeResponse(); + } + }).txGet(); + } catch (Exception e) { + throw new ConnectTransportException(node, "handshake failed", e); + } + + if (checkClusterName && !Objects.equals(clusterName, response.clusterName)) { + throw new ConnectTransportException(node, "handshake failed, mismatched cluster name [" + response.clusterName + "]"); + } else if (!isVersionCompatible(response.version)) { + throw new ConnectTransportException(node, "handshake failed, incompatible version [" + response.version + "]"); + } + + return response.discoveryNode; + } + + private boolean isVersionCompatible(Version version) { + return version.minimumCompatibilityVersion().equals( + localNode != null ? localNode.getVersion().minimumCompatibilityVersion() : Version.CURRENT.minimumCompatibilityVersion()); + } + + public static class HandshakeRequest extends TransportRequest { + + public static final HandshakeRequest INSTANCE = new HandshakeRequest(); + + private HandshakeRequest() { + } + + } + + public static class HandshakeResponse extends TransportResponse { + private DiscoveryNode discoveryNode; + private ClusterName clusterName; + private Version version; + + public HandshakeResponse() { + } + + public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { + this.discoveryNode = discoveryNode; + this.version = version; + this.clusterName = clusterName; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); + clusterName = ClusterName.readClusterName(in); + version = Version.readVersion(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalWriteable(discoveryNode); + clusterName.writeTo(out); + Version.writeVersion(version, out); + } } public void disconnectFromNode(DiscoveryNode node) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 0490dfd96aa..9c0e2bfcafd 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -184,9 +184,11 @@ public abstract class TaskManagerTestCase extends ESTestCase { public static class TestNode implements Releasable { public TestNode(String name, ThreadPool threadPool, Settings settings) { + clusterService = createClusterService(threadPool); + ClusterName clusterName = clusterService.state().getClusterName(); transportService = new TransportService(settings, new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry(), - new NoneCircuitBreakerService()), threadPool) { + new NoneCircuitBreakerService()), threadPool, clusterName) { @Override protected TaskManager createTaskManager() { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { @@ -197,7 +199,6 @@ public abstract class TaskManagerTestCase extends ESTestCase { } }; transportService.start(); - clusterService = createClusterService(threadPool); clusterService.add(transportService.getTaskManager()); discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 39202fcc43a..db1d09b3aa7 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -86,7 +86,7 @@ public class TransportBulkActionTookTests extends ESTestCase { private TransportBulkAction createAction(boolean controlled, AtomicLong expected) { CapturingTransport capturingTransport = new CapturingTransport(); - TransportService transportService = new TransportService(capturingTransport, threadPool); + TransportService transportService = new TransportService(capturingTransport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index f76fdf4fd20..23b0df27480 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -190,7 +190,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - final TransportService transportService = new TransportService(transport, THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); setClusterState(clusterService, TEST_INDEX); diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index a35accc5fc5..ccdb13f710a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -85,7 +85,7 @@ public class TransportMasterNodeActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); localNode = new DiscoveryNode("local_node", DummyTransportAddress.INSTANCE, Collections.emptyMap(), diff --git a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index ad2326c3148..6a7f7ac3398 100644 --- a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -119,7 +119,7 @@ public class TransportNodesActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - final TransportService transportService = new TransportService(transport, THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); int numNodes = randomIntBetween(3, 10); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 0bd7f9bf18a..5253097818e 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -91,7 +91,7 @@ public class BroadcastReplicationTests extends ESTestCase { super.setUp(); LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry(), circuitBreakerService); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), null); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index a10ce35ca41..e9a92ff25b9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -120,7 +120,7 @@ public class TransportReplicationActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index 2dd31548cb9..86165461c84 100644 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -141,7 +141,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); action = new TestTransportInstanceSingleOperationAction( diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 22b04a19a26..0aaec63fef0 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -20,7 +20,9 @@ package org.elasticsearch.client.transport; import com.carrotsearch.randomizedtesting.generators.RandomInts; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; @@ -34,6 +36,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; @@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger; abstract class FailAndRetryMockTransport implements Transport { private final Random random; + private final ClusterName clusterName; private boolean connectMode = true; @@ -57,8 +61,9 @@ abstract class FailAndRetryMockTransport imp private final AtomicInteger successes = new AtomicInteger(); private final Set triedNodes = new CopyOnWriteArraySet<>(); - FailAndRetryMockTransport(Random random) { + FailAndRetryMockTransport(Random random, ClusterName clusterName) { this.random = new Random(random.nextLong()); + this.clusterName = clusterName; } @Override @@ -69,7 +74,11 @@ abstract class FailAndRetryMockTransport imp //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.DEFAULT, node)); + if (action.equals(TransportLivenessAction.NAME)) { + transportResponseHandler.handleResponse(new LivenessResponse(clusterName, node)); + } else { + transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, Version.CURRENT)); + } return; } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index a7e472da3b4..ec0a9e5cc08 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -123,8 +123,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { CountDownLatch clusterStateLatch = new CountDownLatch(1); @Inject - public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } @Override @SuppressWarnings("unchecked") diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 0246ec227dd..b566d764231 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -62,8 +62,9 @@ public class TransportClientNodesServiceTests extends ESTestCase { private final int nodesCount; TestIteration() { + ClusterName clusterName = new ClusterName("test"); threadPool = new ThreadPool("transport-client-nodes-service-tests"); - transport = new FailAndRetryMockTransport(random()) { + transport = new FailAndRetryMockTransport(random(), clusterName) { @Override public List getLocalAddresses() { return Collections.emptyList(); @@ -74,12 +75,12 @@ public class TransportClientNodesServiceTests extends ESTestCase { return new TestResponse(); } }; - transportService = new TransportService(Settings.EMPTY, transport, threadPool) { + transportService = new TransportService(Settings.EMPTY, transport, threadPool, clusterName) { @Override public void sendRequest(DiscoveryNode node, String action, TransportRequest request, final TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { - super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node)); + super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node, clusterName)); } else { super.sendRequest(node, action, request, handler); } @@ -90,7 +91,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { TransportRequestOptions options, TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { - super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node)); + super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node, clusterName)); } else { super.sendRequest(node, action, request, options, handler); } @@ -98,8 +99,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { }; transportService.start(); transportService.acceptIncomingRequests(); - transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, - Version.CURRENT); + transportClientNodesService = + new TransportClientNodesService(Settings.EMPTY, clusterName, transportService, threadPool, Version.CURRENT); this.nodesCount = randomIntBetween(1, 10); for (int i = 0; i < nodesCount; i++) { transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); @@ -108,7 +109,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { } private TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler handler, - DiscoveryNode node) { + DiscoveryNode node, + ClusterName clusterName) { return new TransportResponseHandler() { @Override public T newInstance() { @@ -118,7 +120,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { @Override @SuppressWarnings("unchecked") public void handleResponse(T response) { - LivenessResponse livenessResponse = new LivenessResponse(ClusterName.DEFAULT, + LivenessResponse livenessResponse = new LivenessResponse(clusterName, new DiscoveryNode(node.getName(), node.getId(), "liveness-hostname" + node.getId(), "liveness-hostaddress" + node.getId(), new LocalTransportAddress("liveness-address-" + node.getId()), node.getAttributes(), node.getRoles(), diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index c263bcbcf37..a3bb73977e7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -149,7 +149,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); this.transport = new MockTransport(); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, ClusterName.DEFAULT); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c79d198d350..6085bf92c32 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -106,7 +106,7 @@ public class ShardStateActionTests extends ESTestCase { super.setUp(); this.transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); diff --git a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index bf465d8d44e..298175c41ee 100644 --- a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -80,7 +80,7 @@ public class ClusterStateHealthTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); clusterService = createClusterService(threadPool); - transportService = new TransportService(new CapturingTransport(), threadPool); + transportService = new TransportService(new CapturingTransport(), threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index cbc517ceb64..d5fc4630bcf 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -52,7 +52,7 @@ public class NetworkModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { public FakeTransportService() { - super(null, null); + super(null, null, null); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index ab834ca6ae2..e5199048c35 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -115,8 +115,12 @@ public class ZenFaultDetectionTests extends ESTestCase { protected MockTransportService build(Settings settings, Version version) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); - MockTransportService transportService = new MockTransportService(Settings.EMPTY, - new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool); + MockTransportService transportService = + new MockTransportService( + Settings.EMPTY, + new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), + threadPool, + ClusterName.DEFAULT); transportService.start(); transportService.acceptIncomingRequests(); return transportService; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java index 00e3daf1fc8..7847d7027d0 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.zen.elect.ElectMasterService; @@ -35,16 +36,22 @@ import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty.NettyTransport; +import org.jboss.netty.util.internal.ConcurrentHashMap; import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class UnicastZenPingIT extends ESTestCase { public void testSimplePings() throws InterruptedException { @@ -54,36 +61,31 @@ public class UnicastZenPingIT extends ESTestCase { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); ThreadPool threadPool = new ThreadPool(getClass().getName()); - ClusterName clusterName = new ClusterName("test"); + ClusterName test = new ClusterName("test"); + ClusterName mismatch = new ClusterName("mismatch"); NetworkService networkService = new NetworkService(settings); ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT); - NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); - final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); - transportServiceA.acceptIncomingRequests(); - final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), - emptyMap(), emptySet(), Version.CURRENT); - - InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - - NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); - final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); - transportServiceB.acceptIncomingRequests(); - final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), - emptyMap(), emptySet(), Version.CURRENT); - - InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress(); + NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", test, Version.CURRENT); + NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", test, Version.CURRENT); + NetworkHandle handleC = startServices(settings, threadPool, networkService, "UZP_C", new ClusterName("mismatch"), Version.CURRENT); + // just fake that no versions are compatible with this node + Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()); + Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); + NetworkHandle handleD = startServices(settings, threadPool, networkService, "UZP_D", test, versionD); Settings hostsSettings = Settings.builder().putArray("discovery.zen.ping.unicast.hosts", - NetworkAddress.format(new InetSocketAddress(addressA.address().getAddress(), addressA.address().getPort())), - NetworkAddress.format(new InetSocketAddress(addressB.address().getAddress(), addressB.address().getPort()))) + NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort()))) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, test, Version.CURRENT, electMasterService, null); zenPingA.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeA).localNodeId("UZP_A").build(); + return DiscoveryNodes.builder().put(handleA.node).localNodeId("UZP_A").build(); } @Override @@ -93,11 +95,11 @@ public class UnicastZenPingIT extends ESTestCase { }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, test, Version.CURRENT, electMasterService, null); zenPingB.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeB).localNodeId("UZP_B").build(); + return DiscoveryNodes.builder().put(handleB.node).localNodeId("UZP_B").build(); } @Override @@ -107,12 +109,41 @@ public class UnicastZenPingIT extends ESTestCase { }); zenPingB.start(); + UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, mismatch, versionD, electMasterService, null); + zenPingC.setPingContextProvider(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().put(handleC.node).localNodeId("UZP_C").build(); + } + + @Override + public boolean nodeHasJoinedClusterOnce() { + return false; + } + }); + zenPingC.start(); + + UnicastZenPing zenPingD = new UnicastZenPing(hostsSettings, threadPool, handleD.transportService, mismatch, Version.CURRENT, electMasterService, null); + zenPingD.setPingContextProvider(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().put(handleD.node).localNodeId("UZP_D").build(); + } + + @Override + public boolean nodeHasJoinedClusterOnce() { + return false; + } + }); + zenPingD.start(); + try { logger.info("ping from UZP_A"); ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(10)); assertThat(pingResponses.length, equalTo(1)); assertThat(pingResponses[0].node().getId(), equalTo("UZP_B")); assertTrue(pingResponses[0].hasJoinedOnce()); + assertCounters(handleA, handleA, handleB, handleC, handleD); // ping again, this time from B, logger.info("ping from UZP_B"); @@ -120,13 +151,72 @@ public class UnicastZenPingIT extends ESTestCase { assertThat(pingResponses.length, equalTo(1)); assertThat(pingResponses[0].node().getId(), equalTo("UZP_A")); assertFalse(pingResponses[0].hasJoinedOnce()); + assertCounters(handleB, handleA, handleB, handleC, handleD); + logger.info("ping from UZP_C"); + pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(10)); + assertThat(pingResponses.length, equalTo(0)); + assertCounters(handleC, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_D"); + pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(10)); + assertThat(pingResponses.length, equalTo(0)); + assertCounters(handleD, handleA, handleB, handleC, handleD); } finally { zenPingA.close(); zenPingB.close(); - transportServiceA.close(); - transportServiceB.close(); + zenPingC.close(); + zenPingD.close(); + handleA.transportService.close(); + handleB.transportService.close(); + handleC.transportService.close(); + handleD.transportService.close(); terminate(threadPool); } } + + // assert that we tried to ping each of the configured nodes at least once + private void assertCounters(NetworkHandle that, NetworkHandle...handles) { + for (NetworkHandle handle : handles) { + if (handle != that) { + assertThat(that.counters.get(handle.address).get(), greaterThan(0)); + } + } + } + + private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, ClusterName clusterName, Version version) { + NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); + final TransportService transportService = new TransportService(transport, threadPool, clusterName); + transportService.start(); + transportService.acceptIncomingRequests(); + ConcurrentMap counters = new ConcurrentHashMap<>(); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeConnected(DiscoveryNode node) { + counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); + counters.get(node.getAddress()).incrementAndGet(); + } + + @Override + public void onNodeDisconnected(DiscoveryNode node) { + } + }); + final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version); + transportService.setLocalNode(node); + return new NetworkHandle((InetSocketTransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters); + } + + private static class NetworkHandle { + public final InetSocketTransportAddress address; + public final TransportService transportService; + public final DiscoveryNode node; + public final ConcurrentMap counters; + + public NetworkHandle(InetSocketTransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, ConcurrentMap counters) { + this.address = address; + this.transportService = transportService; + this.node = discoveryNode; + this.counters = counters; + } + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 335d1b3e8fd..e6b160eabf8 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -83,6 +83,9 @@ import static org.hamcrest.Matchers.nullValue; @TestLogging("discovery.zen.publish:TRACE") public class PublishClusterStateActionTests extends ESTestCase { + + private static final ClusterName CLUSTER_NAME = ClusterName.DEFAULT; + protected ThreadPool threadPool; protected Map nodes = new HashMap<>(); @@ -101,7 +104,7 @@ public class PublishClusterStateActionTests extends ESTestCase { this.service = service; this.listener = listener; this.logger = logger; - this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); } public MockNode setAsMaster() { @@ -229,7 +232,7 @@ public class PublishClusterStateActionTests extends ESTestCase { } protected MockTransportService buildTransportService(Settings settings, Version version) { - MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool); + MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool, CLUSTER_NAME); transportService.start(); transportService.acceptIncomingRequests(); return transportService; @@ -249,7 +252,7 @@ public class PublishClusterStateActionTests extends ESTestCase { clusterStateSupplier, listener, discoverySettings, - ClusterName.DEFAULT); + CLUSTER_NAME); } public void testSimpleClusterStatePublishing() throws Exception { @@ -343,7 +346,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); - ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); @@ -374,7 +377,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); // cluster state update - add nodeB discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); @@ -417,7 +420,7 @@ public class PublishClusterStateActionTests extends ESTestCase { AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations]; DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).build(); ClusterState previousState; for (int i = 0; i < numberOfIterations; i++) { previousState = clusterState; @@ -451,7 +454,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); - ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); @@ -503,7 +506,7 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).nodes(discoveryNodes).build(); ClusterState previousState = master.clusterState; try { publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); @@ -580,7 +583,7 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).nodes(discoveryNodes).build(); ClusterState previousState = master.clusterState; try { publishState(master.action, clusterState, previousState, minMasterNodes); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index a7ceec92a61..7f9c4a3bbfe 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -74,9 +74,9 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false) .build(); clusterService = createClusterService(THREAD_POOL); - transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), - new NoneCircuitBreakerService()); - transportService = new TransportService(transport, THREAD_POOL); + transport = + new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); indicesService = getInstanceFromNode(IndicesService.class); shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL); actionFilters = new ActionFilters(Collections.emptySet()); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index 721c66c04e1..cf28de64b87 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -87,7 +87,7 @@ public class IndicesStoreTests extends ESTestCase { public void before() { localNode = new DiscoveryNode("abc", new LocalTransportAddress("abc"), emptyMap(), emptySet(), Version.CURRENT); clusterService = createClusterService(threadPool); - indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, new TransportService(null, null), null); + indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, new TransportService(null, null, clusterService.state().getClusterName()), null); } @After diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index edeecd61d8e..45966606d31 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -65,7 +66,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected DiscoveryNode nodeB; protected MockTransportService serviceB; - protected abstract MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry); + protected abstract MockTransportService build(Settings settings, Version version, ClusterName clusterName); @Override @Before @@ -78,8 +79,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version0, new NamedWriteableRegistry() - ); + version0, + ClusterName.DEFAULT); serviceA.acceptIncomingRequests(); nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build( @@ -88,8 +89,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version1, new NamedWriteableRegistry() - ); + version1, + ClusterName.DEFAULT); serviceB.acceptIncomingRequests(); nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1); @@ -1178,7 +1179,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } try { - serviceB.connectToNodeLight(nodeA); + serviceB.connectToNodeLight(nodeA, 100); fail("exception should be thrown"); } catch (ConnectTransportException e) { // all is well @@ -1238,7 +1239,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } try { - serviceB.connectToNodeLight(nodeA); + serviceB.connectToNodeLight(nodeA, 100); fail("exception should be thrown"); } catch (ConnectTransportException e) { // all is well @@ -1298,8 +1299,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version0, new NamedWriteableRegistry() - ); + version0, + ClusterName.DEFAULT); AtomicBoolean requestProcessed = new AtomicBoolean(); service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 46c3cdbe3aa..d2d88b87593 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; @@ -70,7 +71,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); nettyTransport.start(); - TransportService transportService = new TransportService(nettyTransport, threadPool); + TransportService transportService = new TransportService(nettyTransport, threadPool, ClusterName.DEFAULT); nettyTransport.transportServiceAdapter(transportService.createAdapter()); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); @@ -84,6 +85,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { public void terminateThreadPool() throws InterruptedException { nettyTransport.stop(); terminate(threadPool); + threadPool = null; } public void testThatTextMessageIsReturnedOnHTTPLikeRequest() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java new file mode 100644 index 00000000000..b376a55af73 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.netty.NettyTransport; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.containsString; + +public class NettyTransportServiceHandshakeTests extends ESTestCase { + private static ThreadPool threadPool; + + @BeforeClass + public static void startThreadPool() { + threadPool = new ThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName()); + } + + private List transportServices = new ArrayList<>(); + + private NetworkHandle startServices(String nodeNameAndId, Settings settings, Version version, ClusterName clusterName) { + NettyTransport transport = + new NettyTransport( + settings, + threadPool, + new NetworkService(settings), + BigArrays.NON_RECYCLING_INSTANCE, + Version.CURRENT, + new NamedWriteableRegistry(), + new NoneCircuitBreakerService()); + TransportService transportService = new MockTransportService(settings, transport, threadPool, clusterName); + transportService.start(); + transportService.acceptIncomingRequests(); + DiscoveryNode node = + new DiscoveryNode( + nodeNameAndId, + nodeNameAndId, + transportService.boundAddress().publishAddress(), + emptyMap(), + emptySet(), + version); + transportService.setLocalNode(node); + transportServices.add(transportService); + return new NetworkHandle(transportService, node); + } + + @After + public void tearDown() throws Exception { + for (TransportService transportService : transportServices) { + transportService.close(); + } + super.tearDown(); + } + + @AfterClass + public static void terminateThreadPool() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + threadPool = null; + } + + public void testConnectToNodeLight() { + Settings settings = Settings.EMPTY; + + ClusterName test = new ClusterName("test"); + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, test); + NetworkHandle handleB = + startServices( + "TS_B", + settings, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT), + test); + + DiscoveryNode connectedNode = + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + assertNotNull(connectedNode); + + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + } + + public void testMismatchedClusterName() { + Settings settings = Settings.EMPTY; + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, new ClusterName("a")); + NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT, new ClusterName("b")); + + try { + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + fail("expected handshake to fail from mismatched cluster names"); + } catch (ConnectTransportException e) { + assertThat(e.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]")); + } + } + + public void testIncompatibleVersions() { + Settings settings = Settings.EMPTY; + + ClusterName test = new ClusterName("test"); + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, test); + NetworkHandle handleB = + startServices("TS_B", settings, VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()), test); + + try { + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + fail("expected handshake to fail from incompatible versions"); + } catch (ConnectTransportException e) { + assertThat(e.getMessage(), containsString("handshake failed, incompatible version")); + } + } + + public void testIgnoreMismatchedClusterName() { + Settings settings = Settings.EMPTY; + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, new ClusterName("a")); + NetworkHandle handleB = + startServices( + "TS_B", + settings, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT), + new ClusterName("b") + ); + + DiscoveryNode connectedNode = handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100, + false); + assertNotNull(connectedNode); + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + } + + private static class NetworkHandle { + private TransportService transportService; + private DiscoveryNode discoveryNode; + + public NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode) { + this.transportService = transportService; + this.discoveryNode = discoveryNode; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java index 888a73c9386..4eb14014cfc 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -43,8 +44,8 @@ public class TransportModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { @Inject - public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } } } diff --git a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index f071d56e3b9..2907a0a6ca8 100644 --- a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.local; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.transport.MockTransportService; @@ -28,8 +29,8 @@ import org.elasticsearch.transport.AbstractSimpleTransportTestCase; public class SimpleLocalTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { - MockTransportService transportService = MockTransportService.local(settings, version, threadPool); + protected MockTransportService build(Settings settings, Version version, ClusterName clusterName) { + MockTransportService transportService = MockTransportService.local(settings, version, threadPool, clusterName); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 49f86b909a6..8c00ae01b74 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -63,14 +64,16 @@ public class NettyScheduledPingTests extends ESTestCase { NamedWriteableRegistry registryA = new NamedWriteableRegistry(); final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService); - MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); + ClusterName test = new ClusterName("test"); + MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, test); serviceA.start(); serviceA.acceptIncomingRequests(); NamedWriteableRegistry registryB = new NamedWriteableRegistry(); final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService); - MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); + MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, test); + serviceB.start(); serviceB.acceptIncomingRequests(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 7ba50b9c997..dac416128e5 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; @@ -39,9 +40,9 @@ import static org.hamcrest.Matchers.containsString; public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { + protected MockTransportService build(Settings settings, Version version, ClusterName clusterName) { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build(); - MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool); + MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool, clusterName); transportService.start(); return transportService; } diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index b3d01e62fae..c2bc877b903 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -77,6 +77,7 @@ import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkModule; @@ -725,8 +726,8 @@ public class IndicesRequestTests extends ESIntegTestCase { private final Map> requests = new HashMap<>(); @Inject - public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } synchronized List consumeRequests(String action) { diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index d86efaa2a85..c4863680613 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -23,6 +23,7 @@ import com.amazonaws.services.ec2.model.Tag; import org.elasticsearch.Version; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; @@ -65,7 +66,7 @@ public class Ec2DiscoveryTests extends ESTestCase { @Before public void createTransportService() { - transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool, ClusterName.DEFAULT); } protected List buildDynamicNodes(Settings nodeSettings, int nodes) { diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java index 3b265d6a067..4525b1ece1d 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery.gce; import org.elasticsearch.Version; import org.elasticsearch.cloud.gce.GceComputeService; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -93,7 +94,7 @@ public class GceDiscoveryTests extends ESTestCase { @Before public void createTransportService() { - transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool, ClusterName.DEFAULT); } @Before diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 8e85134763d..05b8b23e90c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -20,6 +20,9 @@ package org.elasticsearch.test.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.transport.TransportService; + import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; @@ -97,25 +100,29 @@ public class MockTransportService extends TransportService { } } - public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) { + public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool, ClusterName clusterName) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry, new NoneCircuitBreakerService()); - return new MockTransportService(settings, transport, threadPool); + return new MockTransportService(settings, transport, threadPool, clusterName); } - public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) { + public static MockTransportService nettyFromThreadPool( + Settings settings, + Version version, + ThreadPool threadPool, + ClusterName clusterName) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry, new NoneCircuitBreakerService()); - return new MockTransportService(Settings.EMPTY, transport, threadPool); + return new MockTransportService(Settings.EMPTY, transport, threadPool, clusterName); } private final Transport original; @Inject - public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, new LookupTestTransport(transport), threadPool); + public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, new LookupTestTransport(transport), threadPool, clusterName); this.original = transport; }