diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index e407a2e7ada..68ed7c927ac 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -360,9 +360,10 @@ public class TransportClientNodesService extends AbstractComponent { try { // its a listed node, light connect to it... logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNodeLight(listedNode); + transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName); } catch (Throwable e) { logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); + newFilteredNodes.add(listedNode); continue; } } @@ -434,7 +435,7 @@ public class TransportClientNodesService extends AbstractComponent { } else { // its a listed node, light connect to it... logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNodeLight(listedNode); + transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName); } } catch (Exception e) { logger.debug("failed to connect to node [{}], ignoring...", e, listedNode); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 640582af226..cc37504360c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -402,7 +402,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // connect to the node, see if we manage to do it, if not, bail if (!nodeFoundByAddress) { logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNodeLight(finalNodeToSend); + transportService.connectToNodeLight(finalNodeToSend, timeout.getMillis()); } else { logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend); transportService.connectToNode(finalNodeToSend); @@ -473,12 +473,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // that's us, ignore continue; } - if (!pingResponse.clusterName().equals(clusterName)) { - // not part of the cluster - logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), - pingResponse.clusterName().value()); - continue; - } SendPingsHandler sendPingsHandler = receivedResponses.get(response.id); if (sendPingsHandler == null) { if (!closed) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b9663da72c2..89cc68debfd 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -19,12 +19,16 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.metrics.MeanMetric; @@ -50,6 +54,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; @@ -67,10 +72,12 @@ import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; public class TransportService extends AbstractLifecycleComponent { public static final String DIRECT_RESPONSE_PROFILE = ".direct"; + private static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); protected final Transport transport; protected final ThreadPool threadPool; + private final ClusterName clusterName; protected final TaskManager taskManager; volatile Map requestHandlers = Collections.emptyMap(); @@ -110,15 +117,16 @@ public class TransportService extends AbstractLifecycleComponent HandshakeRequest.INSTANCE, + ThreadPool.Names.SAME, + (request, channel) -> channel.sendResponse( + new HandshakeResponse(localNode, clusterName, localNode != null ? localNode.getVersion() : Version.CURRENT))); } @Override @@ -263,11 +277,120 @@ public class TransportService extends AbstractLifecycleComponent() { + @Override + public HandshakeResponse newInstance() { + return new HandshakeResponse(); + } + }).txGet(); + } catch (Exception e) { + throw new ConnectTransportException(node, "handshake failed", e); + } + + if (checkClusterName && !Objects.equals(clusterName, response.clusterName)) { + throw new ConnectTransportException(node, "handshake failed, mismatched cluster name [" + response.clusterName + "]"); + } else if (!isVersionCompatible(response.version)) { + throw new ConnectTransportException(node, "handshake failed, incompatible version [" + response.version + "]"); + } + + return response.discoveryNode; + } + + private boolean isVersionCompatible(Version version) { + return version.minimumCompatibilityVersion().equals( + localNode != null ? localNode.getVersion().minimumCompatibilityVersion() : Version.CURRENT.minimumCompatibilityVersion()); + } + + public static class HandshakeRequest extends TransportRequest { + + public static final HandshakeRequest INSTANCE = new HandshakeRequest(); + + private HandshakeRequest() { + } + + } + + public static class HandshakeResponse extends TransportResponse { + private DiscoveryNode discoveryNode; + private ClusterName clusterName; + private Version version; + + public HandshakeResponse() { + } + + public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { + this.discoveryNode = discoveryNode; + this.version = version; + this.clusterName = clusterName; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); + clusterName = ClusterName.readClusterName(in); + version = Version.readVersion(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalWriteable(discoveryNode); + clusterName.writeTo(out); + Version.writeVersion(version, out); + } } public void disconnectFromNode(DiscoveryNode node) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 0490dfd96aa..9c0e2bfcafd 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -184,9 +184,11 @@ public abstract class TaskManagerTestCase extends ESTestCase { public static class TestNode implements Releasable { public TestNode(String name, ThreadPool threadPool, Settings settings) { + clusterService = createClusterService(threadPool); + ClusterName clusterName = clusterService.state().getClusterName(); transportService = new TransportService(settings, new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry(), - new NoneCircuitBreakerService()), threadPool) { + new NoneCircuitBreakerService()), threadPool, clusterName) { @Override protected TaskManager createTaskManager() { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { @@ -197,7 +199,6 @@ public abstract class TaskManagerTestCase extends ESTestCase { } }; transportService.start(); - clusterService = createClusterService(threadPool); clusterService.add(transportService.getTaskManager()); discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 39202fcc43a..db1d09b3aa7 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -86,7 +86,7 @@ public class TransportBulkActionTookTests extends ESTestCase { private TransportBulkAction createAction(boolean controlled, AtomicLong expected) { CapturingTransport capturingTransport = new CapturingTransport(); - TransportService transportService = new TransportService(capturingTransport, threadPool); + TransportService transportService = new TransportService(capturingTransport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY); diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index f76fdf4fd20..23b0df27480 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -190,7 +190,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - final TransportService transportService = new TransportService(transport, THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); setClusterState(clusterService, TEST_INDEX); diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index a35accc5fc5..ccdb13f710a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -85,7 +85,7 @@ public class TransportMasterNodeActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); localNode = new DiscoveryNode("local_node", DummyTransportAddress.INSTANCE, Collections.emptyMap(), diff --git a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index ad2326c3148..6a7f7ac3398 100644 --- a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -119,7 +119,7 @@ public class TransportNodesActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - final TransportService transportService = new TransportService(transport, THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); int numNodes = randomIntBetween(3, 10); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 0bd7f9bf18a..5253097818e 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -91,7 +91,7 @@ public class BroadcastReplicationTests extends ESTestCase { super.setUp(); LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry(), circuitBreakerService); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), null); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index a10ce35ca41..e9a92ff25b9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -120,7 +120,7 @@ public class TransportReplicationActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index 2dd31548cb9..86165461c84 100644 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -141,7 +141,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { super.setUp(); transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); action = new TestTransportInstanceSingleOperationAction( diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 22b04a19a26..0aaec63fef0 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -20,7 +20,9 @@ package org.elasticsearch.client.transport; import com.carrotsearch.randomizedtesting.generators.RandomInts; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; @@ -34,6 +36,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; @@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger; abstract class FailAndRetryMockTransport implements Transport { private final Random random; + private final ClusterName clusterName; private boolean connectMode = true; @@ -57,8 +61,9 @@ abstract class FailAndRetryMockTransport imp private final AtomicInteger successes = new AtomicInteger(); private final Set triedNodes = new CopyOnWriteArraySet<>(); - FailAndRetryMockTransport(Random random) { + FailAndRetryMockTransport(Random random, ClusterName clusterName) { this.random = new Random(random.nextLong()); + this.clusterName = clusterName; } @Override @@ -69,7 +74,11 @@ abstract class FailAndRetryMockTransport imp //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.DEFAULT, node)); + if (action.equals(TransportLivenessAction.NAME)) { + transportResponseHandler.handleResponse(new LivenessResponse(clusterName, node)); + } else { + transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, Version.CURRENT)); + } return; } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index a7e472da3b4..ec0a9e5cc08 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -123,8 +123,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { CountDownLatch clusterStateLatch = new CountDownLatch(1); @Inject - public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } @Override @SuppressWarnings("unchecked") diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 0246ec227dd..b566d764231 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -62,8 +62,9 @@ public class TransportClientNodesServiceTests extends ESTestCase { private final int nodesCount; TestIteration() { + ClusterName clusterName = new ClusterName("test"); threadPool = new ThreadPool("transport-client-nodes-service-tests"); - transport = new FailAndRetryMockTransport(random()) { + transport = new FailAndRetryMockTransport(random(), clusterName) { @Override public List getLocalAddresses() { return Collections.emptyList(); @@ -74,12 +75,12 @@ public class TransportClientNodesServiceTests extends ESTestCase { return new TestResponse(); } }; - transportService = new TransportService(Settings.EMPTY, transport, threadPool) { + transportService = new TransportService(Settings.EMPTY, transport, threadPool, clusterName) { @Override public void sendRequest(DiscoveryNode node, String action, TransportRequest request, final TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { - super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node)); + super.sendRequest(node, action, request, wrapLivenessResponseHandler(handler, node, clusterName)); } else { super.sendRequest(node, action, request, handler); } @@ -90,7 +91,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { TransportRequestOptions options, TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { - super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node)); + super.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node, clusterName)); } else { super.sendRequest(node, action, request, options, handler); } @@ -98,8 +99,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { }; transportService.start(); transportService.acceptIncomingRequests(); - transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, - Version.CURRENT); + transportClientNodesService = + new TransportClientNodesService(Settings.EMPTY, clusterName, transportService, threadPool, Version.CURRENT); this.nodesCount = randomIntBetween(1, 10); for (int i = 0; i < nodesCount; i++) { transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); @@ -108,7 +109,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { } private TransportResponseHandler wrapLivenessResponseHandler(TransportResponseHandler handler, - DiscoveryNode node) { + DiscoveryNode node, + ClusterName clusterName) { return new TransportResponseHandler() { @Override public T newInstance() { @@ -118,7 +120,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { @Override @SuppressWarnings("unchecked") public void handleResponse(T response) { - LivenessResponse livenessResponse = new LivenessResponse(ClusterName.DEFAULT, + LivenessResponse livenessResponse = new LivenessResponse(clusterName, new DiscoveryNode(node.getName(), node.getId(), "liveness-hostname" + node.getId(), "liveness-hostaddress" + node.getId(), new LocalTransportAddress("liveness-address-" + node.getId()), node.getAttributes(), node.getRoles(), diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index c263bcbcf37..a3bb73977e7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -149,7 +149,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); this.transport = new MockTransport(); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, ClusterName.DEFAULT); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c79d198d350..6085bf92c32 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -106,7 +106,7 @@ public class ShardStateActionTests extends ESTestCase { super.setUp(); this.transport = new CapturingTransport(); clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(transport, THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null); diff --git a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index bf465d8d44e..298175c41ee 100644 --- a/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -80,7 +80,7 @@ public class ClusterStateHealthTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); clusterService = createClusterService(threadPool); - transportService = new TransportService(new CapturingTransport(), threadPool); + transportService = new TransportService(new CapturingTransport(), threadPool, clusterService.state().getClusterName()); transportService.start(); transportService.acceptIncomingRequests(); } diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index cbc517ceb64..d5fc4630bcf 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -52,7 +52,7 @@ public class NetworkModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { public FakeTransportService() { - super(null, null); + super(null, null, null); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index ab834ca6ae2..e5199048c35 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -115,8 +115,12 @@ public class ZenFaultDetectionTests extends ESTestCase { protected MockTransportService build(Settings settings, Version version) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); - MockTransportService transportService = new MockTransportService(Settings.EMPTY, - new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool); + MockTransportService transportService = + new MockTransportService( + Settings.EMPTY, + new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), + threadPool, + ClusterName.DEFAULT); transportService.start(); transportService.acceptIncomingRequests(); return transportService; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java index 00e3daf1fc8..7847d7027d0 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.zen.elect.ElectMasterService; @@ -35,16 +36,22 @@ import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty.NettyTransport; +import org.jboss.netty.util.internal.ConcurrentHashMap; import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class UnicastZenPingIT extends ESTestCase { public void testSimplePings() throws InterruptedException { @@ -54,36 +61,31 @@ public class UnicastZenPingIT extends ESTestCase { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); ThreadPool threadPool = new ThreadPool(getClass().getName()); - ClusterName clusterName = new ClusterName("test"); + ClusterName test = new ClusterName("test"); + ClusterName mismatch = new ClusterName("mismatch"); NetworkService networkService = new NetworkService(settings); ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT); - NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); - final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); - transportServiceA.acceptIncomingRequests(); - final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), - emptyMap(), emptySet(), Version.CURRENT); - - InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - - NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); - final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); - transportServiceB.acceptIncomingRequests(); - final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), - emptyMap(), emptySet(), Version.CURRENT); - - InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress(); + NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", test, Version.CURRENT); + NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", test, Version.CURRENT); + NetworkHandle handleC = startServices(settings, threadPool, networkService, "UZP_C", new ClusterName("mismatch"), Version.CURRENT); + // just fake that no versions are compatible with this node + Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()); + Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); + NetworkHandle handleD = startServices(settings, threadPool, networkService, "UZP_D", test, versionD); Settings hostsSettings = Settings.builder().putArray("discovery.zen.ping.unicast.hosts", - NetworkAddress.format(new InetSocketAddress(addressA.address().getAddress(), addressA.address().getPort())), - NetworkAddress.format(new InetSocketAddress(addressB.address().getAddress(), addressB.address().getPort()))) + NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())), + NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort()))) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, test, Version.CURRENT, electMasterService, null); zenPingA.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeA).localNodeId("UZP_A").build(); + return DiscoveryNodes.builder().put(handleA.node).localNodeId("UZP_A").build(); } @Override @@ -93,11 +95,11 @@ public class UnicastZenPingIT extends ESTestCase { }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, test, Version.CURRENT, electMasterService, null); zenPingB.setPingContextProvider(new PingContextProvider() { @Override public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeB).localNodeId("UZP_B").build(); + return DiscoveryNodes.builder().put(handleB.node).localNodeId("UZP_B").build(); } @Override @@ -107,12 +109,41 @@ public class UnicastZenPingIT extends ESTestCase { }); zenPingB.start(); + UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, mismatch, versionD, electMasterService, null); + zenPingC.setPingContextProvider(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().put(handleC.node).localNodeId("UZP_C").build(); + } + + @Override + public boolean nodeHasJoinedClusterOnce() { + return false; + } + }); + zenPingC.start(); + + UnicastZenPing zenPingD = new UnicastZenPing(hostsSettings, threadPool, handleD.transportService, mismatch, Version.CURRENT, electMasterService, null); + zenPingD.setPingContextProvider(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().put(handleD.node).localNodeId("UZP_D").build(); + } + + @Override + public boolean nodeHasJoinedClusterOnce() { + return false; + } + }); + zenPingD.start(); + try { logger.info("ping from UZP_A"); ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(10)); assertThat(pingResponses.length, equalTo(1)); assertThat(pingResponses[0].node().getId(), equalTo("UZP_B")); assertTrue(pingResponses[0].hasJoinedOnce()); + assertCounters(handleA, handleA, handleB, handleC, handleD); // ping again, this time from B, logger.info("ping from UZP_B"); @@ -120,13 +151,72 @@ public class UnicastZenPingIT extends ESTestCase { assertThat(pingResponses.length, equalTo(1)); assertThat(pingResponses[0].node().getId(), equalTo("UZP_A")); assertFalse(pingResponses[0].hasJoinedOnce()); + assertCounters(handleB, handleA, handleB, handleC, handleD); + logger.info("ping from UZP_C"); + pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(10)); + assertThat(pingResponses.length, equalTo(0)); + assertCounters(handleC, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_D"); + pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(10)); + assertThat(pingResponses.length, equalTo(0)); + assertCounters(handleD, handleA, handleB, handleC, handleD); } finally { zenPingA.close(); zenPingB.close(); - transportServiceA.close(); - transportServiceB.close(); + zenPingC.close(); + zenPingD.close(); + handleA.transportService.close(); + handleB.transportService.close(); + handleC.transportService.close(); + handleD.transportService.close(); terminate(threadPool); } } + + // assert that we tried to ping each of the configured nodes at least once + private void assertCounters(NetworkHandle that, NetworkHandle...handles) { + for (NetworkHandle handle : handles) { + if (handle != that) { + assertThat(that.counters.get(handle.address).get(), greaterThan(0)); + } + } + } + + private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, ClusterName clusterName, Version version) { + NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); + final TransportService transportService = new TransportService(transport, threadPool, clusterName); + transportService.start(); + transportService.acceptIncomingRequests(); + ConcurrentMap counters = new ConcurrentHashMap<>(); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeConnected(DiscoveryNode node) { + counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); + counters.get(node.getAddress()).incrementAndGet(); + } + + @Override + public void onNodeDisconnected(DiscoveryNode node) { + } + }); + final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version); + transportService.setLocalNode(node); + return new NetworkHandle((InetSocketTransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters); + } + + private static class NetworkHandle { + public final InetSocketTransportAddress address; + public final TransportService transportService; + public final DiscoveryNode node; + public final ConcurrentMap counters; + + public NetworkHandle(InetSocketTransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, ConcurrentMap counters) { + this.address = address; + this.transportService = transportService; + this.node = discoveryNode; + this.counters = counters; + } + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 335d1b3e8fd..e6b160eabf8 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -83,6 +83,9 @@ import static org.hamcrest.Matchers.nullValue; @TestLogging("discovery.zen.publish:TRACE") public class PublishClusterStateActionTests extends ESTestCase { + + private static final ClusterName CLUSTER_NAME = ClusterName.DEFAULT; + protected ThreadPool threadPool; protected Map nodes = new HashMap<>(); @@ -101,7 +104,7 @@ public class PublishClusterStateActionTests extends ESTestCase { this.service = service; this.listener = listener; this.logger = logger; - this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); } public MockNode setAsMaster() { @@ -229,7 +232,7 @@ public class PublishClusterStateActionTests extends ESTestCase { } protected MockTransportService buildTransportService(Settings settings, Version version) { - MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool); + MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool, CLUSTER_NAME); transportService.start(); transportService.acceptIncomingRequests(); return transportService; @@ -249,7 +252,7 @@ public class PublishClusterStateActionTests extends ESTestCase { clusterStateSupplier, listener, discoverySettings, - ClusterName.DEFAULT); + CLUSTER_NAME); } public void testSimpleClusterStatePublishing() throws Exception { @@ -343,7 +346,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); - ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); @@ -374,7 +377,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); // cluster state update - add nodeB discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); @@ -417,7 +420,7 @@ public class PublishClusterStateActionTests extends ESTestCase { AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations]; DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).build(); ClusterState previousState; for (int i = 0; i < numberOfIterations; i++) { previousState = clusterState; @@ -451,7 +454,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); - ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); assertSameStateFromFull(nodeB.clusterState, clusterState); @@ -503,7 +506,7 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).nodes(discoveryNodes).build(); ClusterState previousState = master.clusterState; try { publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); @@ -580,7 +583,7 @@ public class PublishClusterStateActionTests extends ESTestCase { discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).nodes(discoveryNodes).build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).metaData(metaData).nodes(discoveryNodes).build(); ClusterState previousState = master.clusterState; try { publishState(master.action, clusterState, previousState, minMasterNodes); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index a7ceec92a61..7f9c4a3bbfe 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -74,9 +74,9 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false) .build(); clusterService = createClusterService(THREAD_POOL); - transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), - new NoneCircuitBreakerService()); - transportService = new TransportService(transport, THREAD_POOL); + transport = + new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); + transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName()); indicesService = getInstanceFromNode(IndicesService.class); shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL); actionFilters = new ActionFilters(Collections.emptySet()); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index 721c66c04e1..cf28de64b87 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -87,7 +87,7 @@ public class IndicesStoreTests extends ESTestCase { public void before() { localNode = new DiscoveryNode("abc", new LocalTransportAddress("abc"), emptyMap(), emptySet(), Version.CURRENT); clusterService = createClusterService(threadPool); - indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, new TransportService(null, null), null); + indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, new TransportService(null, null, clusterService.state().getClusterName()), null); } @After diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index edeecd61d8e..45966606d31 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -65,7 +66,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected DiscoveryNode nodeB; protected MockTransportService serviceB; - protected abstract MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry); + protected abstract MockTransportService build(Settings settings, Version version, ClusterName clusterName); @Override @Before @@ -78,8 +79,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version0, new NamedWriteableRegistry() - ); + version0, + ClusterName.DEFAULT); serviceA.acceptIncomingRequests(); nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build( @@ -88,8 +89,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version1, new NamedWriteableRegistry() - ); + version1, + ClusterName.DEFAULT); serviceB.acceptIncomingRequests(); nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1); @@ -1178,7 +1179,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } try { - serviceB.connectToNodeLight(nodeA); + serviceB.connectToNodeLight(nodeA, 100); fail("exception should be thrown"); } catch (ConnectTransportException e) { // all is well @@ -1238,7 +1239,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } try { - serviceB.connectToNodeLight(nodeA); + serviceB.connectToNodeLight(nodeA, 100); fail("exception should be thrown"); } catch (ConnectTransportException e) { // all is well @@ -1298,8 +1299,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), - version0, new NamedWriteableRegistry() - ); + version0, + ClusterName.DEFAULT); AtomicBoolean requestProcessed = new AtomicBoolean(); service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 46c3cdbe3aa..d2d88b87593 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; @@ -70,7 +71,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); nettyTransport.start(); - TransportService transportService = new TransportService(nettyTransport, threadPool); + TransportService transportService = new TransportService(nettyTransport, threadPool, ClusterName.DEFAULT); nettyTransport.transportServiceAdapter(transportService.createAdapter()); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); @@ -84,6 +85,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { public void terminateThreadPool() throws InterruptedException { nettyTransport.stop(); terminate(threadPool); + threadPool = null; } public void testThatTextMessageIsReturnedOnHTTPLikeRequest() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java new file mode 100644 index 00000000000..b376a55af73 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/NettyTransportServiceHandshakeTests.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.netty.NettyTransport; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.containsString; + +public class NettyTransportServiceHandshakeTests extends ESTestCase { + private static ThreadPool threadPool; + + @BeforeClass + public static void startThreadPool() { + threadPool = new ThreadPool(NettyTransportServiceHandshakeTests.class.getSimpleName()); + } + + private List transportServices = new ArrayList<>(); + + private NetworkHandle startServices(String nodeNameAndId, Settings settings, Version version, ClusterName clusterName) { + NettyTransport transport = + new NettyTransport( + settings, + threadPool, + new NetworkService(settings), + BigArrays.NON_RECYCLING_INSTANCE, + Version.CURRENT, + new NamedWriteableRegistry(), + new NoneCircuitBreakerService()); + TransportService transportService = new MockTransportService(settings, transport, threadPool, clusterName); + transportService.start(); + transportService.acceptIncomingRequests(); + DiscoveryNode node = + new DiscoveryNode( + nodeNameAndId, + nodeNameAndId, + transportService.boundAddress().publishAddress(), + emptyMap(), + emptySet(), + version); + transportService.setLocalNode(node); + transportServices.add(transportService); + return new NetworkHandle(transportService, node); + } + + @After + public void tearDown() throws Exception { + for (TransportService transportService : transportServices) { + transportService.close(); + } + super.tearDown(); + } + + @AfterClass + public static void terminateThreadPool() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + threadPool = null; + } + + public void testConnectToNodeLight() { + Settings settings = Settings.EMPTY; + + ClusterName test = new ClusterName("test"); + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, test); + NetworkHandle handleB = + startServices( + "TS_B", + settings, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT), + test); + + DiscoveryNode connectedNode = + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + assertNotNull(connectedNode); + + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + } + + public void testMismatchedClusterName() { + Settings settings = Settings.EMPTY; + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, new ClusterName("a")); + NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT, new ClusterName("b")); + + try { + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + fail("expected handshake to fail from mismatched cluster names"); + } catch (ConnectTransportException e) { + assertThat(e.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]")); + } + } + + public void testIncompatibleVersions() { + Settings settings = Settings.EMPTY; + + ClusterName test = new ClusterName("test"); + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, test); + NetworkHandle handleB = + startServices("TS_B", settings, VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()), test); + + try { + handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100); + fail("expected handshake to fail from incompatible versions"); + } catch (ConnectTransportException e) { + assertThat(e.getMessage(), containsString("handshake failed, incompatible version")); + } + } + + public void testIgnoreMismatchedClusterName() { + Settings settings = Settings.EMPTY; + + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT, new ClusterName("a")); + NetworkHandle handleB = + startServices( + "TS_B", + settings, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT), + new ClusterName("b") + ); + + DiscoveryNode connectedNode = handleA.transportService.connectToNodeLight( + new DiscoveryNode( + "", + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()), + 100, + false); + assertNotNull(connectedNode); + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + } + + private static class NetworkHandle { + private TransportService transportService; + private DiscoveryNode discoveryNode; + + public NetworkHandle(TransportService transportService, DiscoveryNode discoveryNode) { + this.transportService = transportService; + this.discoveryNode = discoveryNode; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java index 888a73c9386..4eb14014cfc 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -43,8 +44,8 @@ public class TransportModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { @Inject - public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } } } diff --git a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index f071d56e3b9..2907a0a6ca8 100644 --- a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.local; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.transport.MockTransportService; @@ -28,8 +29,8 @@ import org.elasticsearch.transport.AbstractSimpleTransportTestCase; public class SimpleLocalTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { - MockTransportService transportService = MockTransportService.local(settings, version, threadPool); + protected MockTransportService build(Settings settings, Version version, ClusterName clusterName) { + MockTransportService transportService = MockTransportService.local(settings, version, threadPool, clusterName); transportService.start(); return transportService; } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 49f86b909a6..8c00ae01b74 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -63,14 +64,16 @@ public class NettyScheduledPingTests extends ESTestCase { NamedWriteableRegistry registryA = new NamedWriteableRegistry(); final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService); - MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); + ClusterName test = new ClusterName("test"); + MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, test); serviceA.start(); serviceA.acceptIncomingRequests(); NamedWriteableRegistry registryB = new NamedWriteableRegistry(); final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService); - MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); + MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, test); + serviceB.start(); serviceB.acceptIncomingRequests(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 7ba50b9c997..dac416128e5 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; @@ -39,9 +40,9 @@ import static org.hamcrest.Matchers.containsString; public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { + protected MockTransportService build(Settings settings, Version version, ClusterName clusterName) { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build(); - MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool); + MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool, clusterName); transportService.start(); return transportService; } diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index b3d01e62fae..c2bc877b903 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -77,6 +77,7 @@ import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkModule; @@ -725,8 +726,8 @@ public class IndicesRequestTests extends ESIntegTestCase { private final Map> requests = new HashMap<>(); @Inject - public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, transport, threadPool, clusterName); } synchronized List consumeRequests(String action) { diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index d86efaa2a85..c4863680613 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -23,6 +23,7 @@ import com.amazonaws.services.ec2.model.Tag; import org.elasticsearch.Version; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; @@ -65,7 +66,7 @@ public class Ec2DiscoveryTests extends ESTestCase { @Before public void createTransportService() { - transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool, ClusterName.DEFAULT); } protected List buildDynamicNodes(Settings nodeSettings, int nodes) { diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java index 3b265d6a067..4525b1ece1d 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery.gce; import org.elasticsearch.Version; import org.elasticsearch.cloud.gce.GceComputeService; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -93,7 +94,7 @@ public class GceDiscoveryTests extends ESTestCase { @Before public void createTransportService() { - transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool, ClusterName.DEFAULT); } @Before diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 8e85134763d..05b8b23e90c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -20,6 +20,9 @@ package org.elasticsearch.test.transport; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.transport.TransportService; + import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; @@ -97,25 +100,29 @@ public class MockTransportService extends TransportService { } } - public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) { + public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool, ClusterName clusterName) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry, new NoneCircuitBreakerService()); - return new MockTransportService(settings, transport, threadPool); + return new MockTransportService(settings, transport, threadPool, clusterName); } - public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) { + public static MockTransportService nettyFromThreadPool( + Settings settings, + Version version, + ThreadPool threadPool, + ClusterName clusterName) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry, new NoneCircuitBreakerService()); - return new MockTransportService(Settings.EMPTY, transport, threadPool); + return new MockTransportService(Settings.EMPTY, transport, threadPool, clusterName); } private final Transport original; @Inject - public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, new LookupTestTransport(transport), threadPool); + public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClusterName clusterName) { + super(settings, new LookupTestTransport(transport), threadPool, clusterName); this.original = transport; }