zen: Don't join master nodes or accept join requests of old and too new nodes.
If the version of a node is lower than the minimum supported version or higher than the maximum supported version, a node shouldn't be allowed to join and nodes should join that elected master node Closes #11924
This commit is contained in:
parent
410704fd18
commit
a4b99e6291
|
@ -23,6 +23,7 @@ import com.google.common.base.Objects;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.*;
|
import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -886,12 +887,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
|
void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
|
||||||
|
|
||||||
if (!transportService.addressSupported(node.address().getClass())) {
|
if (!transportService.addressSupported(node.address().getClass())) {
|
||||||
// TODO, what should we do now? Maybe inform that node that its crap?
|
// TODO, what should we do now? Maybe inform that node that its crap?
|
||||||
logger.warn("received a wrong address type from [{}], ignoring...", node);
|
logger.warn("received a wrong address type from [{}], ignoring...", node);
|
||||||
} else {
|
} else {
|
||||||
|
// The minimum supported version for a node joining a master:
|
||||||
|
Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
|
||||||
|
// Sanity check: maybe we don't end up here, because serialization may have failed.
|
||||||
|
if (node.getVersion().before(minimumNodeJoinVersion)) {
|
||||||
|
callback.onFailure(
|
||||||
|
new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]")
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// try and connect to the node, if it fails, we can raise an exception back to the client...
|
// try and connect to the node, if it fails, we can raise an exception back to the client...
|
||||||
transportService.connectToNode(node);
|
transportService.connectToNode(node);
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen.elect;
|
||||||
import com.carrotsearch.hppc.ObjectContainer;
|
import com.carrotsearch.hppc.ObjectContainer;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.lucene.util.CollectionUtil;
|
import org.apache.lucene.util.CollectionUtil;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -36,13 +37,17 @@ public class ElectMasterService extends AbstractComponent {
|
||||||
|
|
||||||
public static final String DISCOVERY_ZEN_MINIMUM_MASTER_NODES = "discovery.zen.minimum_master_nodes";
|
public static final String DISCOVERY_ZEN_MINIMUM_MASTER_NODES = "discovery.zen.minimum_master_nodes";
|
||||||
|
|
||||||
|
// This is the minimum version a master needs to be on, otherwise it gets ignored
|
||||||
|
// This is based on the minimum compatible version of the current version this node is on
|
||||||
|
private final Version minMasterVersion;
|
||||||
private final NodeComparator nodeComparator = new NodeComparator();
|
private final NodeComparator nodeComparator = new NodeComparator();
|
||||||
|
|
||||||
private volatile int minimumMasterNodes;
|
private volatile int minimumMasterNodes;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ElectMasterService(Settings settings) {
|
public ElectMasterService(Settings settings, Version version) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
this.minMasterVersion = version.minimumCompatibilityVersion();
|
||||||
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
|
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
|
||||||
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
|
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
|
||||||
}
|
}
|
||||||
|
@ -108,7 +113,14 @@ public class ElectMasterService extends AbstractComponent {
|
||||||
if (sortedNodes == null || sortedNodes.isEmpty()) {
|
if (sortedNodes == null || sortedNodes.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return sortedNodes.get(0);
|
DiscoveryNode masterNode = sortedNodes.get(0);
|
||||||
|
// Sanity check: maybe we don't end up here, because serialization may have failed.
|
||||||
|
if (masterNode.getVersion().before(minMasterVersion)) {
|
||||||
|
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return masterNode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
|
private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.util.*;
|
||||||
public class ElectMasterServiceTest extends ElasticsearchTestCase {
|
public class ElectMasterServiceTest extends ElasticsearchTestCase {
|
||||||
|
|
||||||
ElectMasterService electMasterService() {
|
ElectMasterService electMasterService() {
|
||||||
return new ElectMasterService(Settings.EMPTY);
|
return new ElectMasterService(Settings.EMPTY, Version.CURRENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<DiscoveryNode> generateRandomNodes() {
|
List<DiscoveryNode> generateRandomNodes() {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
@ -35,7 +36,9 @@ import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
|
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||||
|
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
|
@ -45,7 +48,10 @@ import org.hamcrest.Matchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.ref.Reference;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -215,4 +221,40 @@ public class ZenDiscoveryTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(reference.get(), notNullValue());
|
assertThat(reference.get(), notNullValue());
|
||||||
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
|
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandleNodeJoin_incompatibleMinVersion() {
|
||||||
|
Settings nodeSettings = Settings.settingsBuilder()
|
||||||
|
.put("discovery.type", "zen") // <-- To override the local setting if set externally
|
||||||
|
.build();
|
||||||
|
String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0);
|
||||||
|
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);
|
||||||
|
|
||||||
|
DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
|
||||||
|
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
|
||||||
|
zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
holder.set((IllegalStateException) t);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(holder.get(), notNullValue());
|
||||||
|
assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [1.6.0] that is lower than the minimum compatible version [2.0.0-SNAPSHOT]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJoinElectedMaster_incompatibleMinVersion() {
|
||||||
|
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY, Version.V_2_0_0);
|
||||||
|
|
||||||
|
DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_2_0_0);
|
||||||
|
assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node));
|
||||||
|
node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
|
||||||
|
assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
|
||||||
ThreadPool threadPool = new ThreadPool(getClass().getName());
|
ThreadPool threadPool = new ThreadPool(getClass().getName());
|
||||||
ClusterName clusterName = new ClusterName("test");
|
ClusterName clusterName = new ClusterName("test");
|
||||||
NetworkService networkService = new NetworkService(settings);
|
NetworkService networkService = new NetworkService(settings);
|
||||||
ElectMasterService electMasterService = new ElectMasterService(settings);
|
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);
|
||||||
|
|
||||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
|
||||||
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
|
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
|
||||||
|
|
Loading…
Reference in New Issue