From a4b99e629132abafa21403003bf6a667a10315bb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 1 Jul 2015 13:28:28 +0200 Subject: [PATCH] zen: Don't join master nodes or accept join requests of old and too new nodes. If the version of a node is lower than the minimum supported version or higher than the maximum supported version, a node shouldn't be allowed to join and nodes should join that elected master node Closes #11924 --- .../discovery/zen/ZenDiscovery.java | 13 +++++- .../zen/elect/ElectMasterService.java | 16 ++++++- .../discovery/zen/ElectMasterServiceTest.java | 2 +- .../discovery/zen/ZenDiscoveryTests.java | 42 +++++++++++++++++++ .../zen/ping/unicast/UnicastZenPingTests.java | 2 +- 5 files changed, 70 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7abda98d780..baf6a4a7fdb 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -886,12 +887,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } - private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { + void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { if (!transportService.addressSupported(node.address().getClass())) { // TODO, what should we do now? Maybe inform that node that its crap? logger.warn("received a wrong address type from [{}], ignoring...", node); } else { + // The minimum supported version for a node joining a master: + Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion(); + // Sanity check: maybe we don't end up here, because serialization may have failed. + if (node.getVersion().before(minimumNodeJoinVersion)) { + callback.onFailure( + new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]") + ); + return; + } + // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index 9ba26387ec5..6d80d2589e0 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen.elect; import com.carrotsearch.hppc.ObjectContainer; import com.google.common.collect.Lists; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -36,13 +37,17 @@ public class ElectMasterService extends AbstractComponent { public static final String DISCOVERY_ZEN_MINIMUM_MASTER_NODES = "discovery.zen.minimum_master_nodes"; + // This is the minimum version a master needs to be on, otherwise it gets ignored + // This is based on the minimum compatible version of the current version this node is on + private final Version minMasterVersion; private final NodeComparator nodeComparator = new NodeComparator(); private volatile int minimumMasterNodes; @Inject - public ElectMasterService(Settings settings) { + public ElectMasterService(Settings settings, Version version) { super(settings); + this.minMasterVersion = version.minimumCompatibilityVersion(); this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1); logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes); } @@ -108,7 +113,14 @@ public class ElectMasterService extends AbstractComponent { if (sortedNodes == null || sortedNodes.isEmpty()) { return null; } - return sortedNodes.get(0); + DiscoveryNode masterNode = sortedNodes.get(0); + // Sanity check: maybe we don't end up here, because serialization may have failed. + if (masterNode.getVersion().before(minMasterVersion)) { + logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion); + return null; + } else { + return masterNode; + } } private List sortedMasterNodes(Iterable nodes) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java index a4ac5ff023d..4552109bd4a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java @@ -32,7 +32,7 @@ import java.util.*; public class ElectMasterServiceTest extends ElasticsearchTestCase { ElectMasterService electMasterService() { - return new ElectMasterService(Settings.EMPTY); + return new ElectMasterService(Settings.EMPTY, Version.CURRENT); } List generateRandomNodes() { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java index 565c964bf52..f0fe007a089 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import com.google.common.collect.Iterables; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -35,7 +36,9 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; +import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -45,7 +48,10 @@ import org.hamcrest.Matchers; import org.junit.Test; import java.io.IOException; +import java.lang.ref.Reference; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -215,4 +221,40 @@ public class ZenDiscoveryTests extends ElasticsearchIntegrationTest { assertThat(reference.get(), notNullValue()); assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting ")); } + + @Test + public void testHandleNodeJoin_incompatibleMinVersion() { + Settings nodeSettings = Settings.settingsBuilder() + .put("discovery.type", "zen") // <-- To override the local setting if set externally + .build(); + String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0); + ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName); + + DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0); + final AtomicReference holder = new AtomicReference<>(); + zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() { + @Override + public void onSuccess() { + } + + @Override + public void onFailure(Throwable t) { + holder.set((IllegalStateException) t); + } + }); + + assertThat(holder.get(), notNullValue()); + assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [1.6.0] that is lower than the minimum compatible version [2.0.0-SNAPSHOT]")); + } + + @Test + public void testJoinElectedMaster_incompatibleMinVersion() { + ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY, Version.V_2_0_0); + + DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_2_0_0); + assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node)); + node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0); + assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue()); + } + } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index 8c77529dee9..f7c897de140 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -57,7 +57,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase { ThreadPool threadPool = new ThreadPool(getClass().getName()); ClusterName clusterName = new ClusterName("test"); NetworkService networkService = new NetworkService(settings); - ElectMasterService electMasterService = new ElectMasterService(settings); + ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT); NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); final TransportService transportServiceA = new TransportService(transportA, threadPool).start();