From 8f7c5732f19f6b00c607a8e745970c150368df7c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 27 Mar 2019 18:48:35 +0100 Subject: [PATCH] Use default discovery implementation for single-node discovery (#40036) Switches "discovery.type: single-node" from using a separate implementation for single-node discovery to using the existing standard discovery implementation, with two small adaptions: - auto-bootstrapping, but requiring initial_master_nodes not to be set. - not actively pinging other nodes using the Peerfinder - not allowing other nodes to join its single-node cluster (if they have e.g. been set up using regular discovery and connect to the single-disco node). --- .../coordination/ClusterBootstrapService.java | 30 +++- .../cluster/coordination/Coordinator.java | 30 +++- .../discovery/DiscoveryModule.java | 31 ++-- .../elasticsearch/discovery/PeerFinder.java | 2 +- .../discovery/single/SingleNodeDiscovery.java | 139 ----------------- .../ClusterBootstrapServiceTests.java | 50 ++++++ .../coordination/CoordinatorTests.java | 73 +++++++-- .../single/SingleNodeDiscoveryIT.java | 143 ++++++++++-------- .../single/SingleNodeDiscoveryTests.java | 86 ----------- .../test/InternalTestCluster.java | 2 +- 10 files changed, 258 insertions(+), 328 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java delete mode 100644 server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index fa58dd240fa..4711b44593c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -29,10 +29,13 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.node.Node; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; @@ -77,15 +80,28 @@ public class ClusterBootstrapService { public ClusterBootstrapService(Settings settings, TransportService transportService, Supplier> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier, Consumer votingConfigurationConsumer) { - - final List initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings); - bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes)); - if (bootstrapRequirements.size() != initialMasterNodes.size()) { - throw new IllegalArgumentException( - "setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes); + if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) { + if (INITIAL_MASTER_NODES_SETTING.exists(settings)) { + throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + + "] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" + + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "]"); + } + if (DiscoveryNode.isMasterNode(settings) == false) { + throw new IllegalArgumentException("node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" + + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] must be master-eligible"); + } + bootstrapRequirements = Collections.singleton(Node.NODE_NAME_SETTING.get(settings)); + unconfiguredBootstrapTimeout = null; + } else { + final List initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings); + bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes)); + if (bootstrapRequirements.size() != initialMasterNodes.size()) { + throw new IllegalArgumentException( + "setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes); + } + unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings); } - unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.discoveredNodesSupplier = discoveredNodesSupplier; this.isBootstrappedSupplier = isBootstrappedSupplier; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 076d617df29..a7826c88307 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -53,12 +53,14 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.HandshakingTransportAddressConnector; import org.elasticsearch.discovery.PeerFinder; @@ -72,6 +74,7 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -99,6 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final Settings settings; + private final boolean singleNodeDiscovery; private final TransportService transportService; private final MasterService masterService; private final AllocationService allocationService; @@ -149,6 +153,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.masterService = masterService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); + this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings)); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this.persistedStateSupplier = persistedStateSupplier; @@ -448,6 +453,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert Thread.holdsLock(mutex) == false; assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible"; logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest); + + if (singleNodeDiscovery && joinRequest.getSourceNode().equals(getLocalNode()) == false) { + joinCallback.onFailure(new IllegalStateException("cannot join node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + + "] set to [" + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] discovery")); + return; + } + transportService.connectToNode(joinRequest.getSourceNode()); final ClusterState stateForJoinValidation = getStateForMasterService(); @@ -666,6 +678,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); + VotingConfiguration votingConfiguration = coordinationState.get().getLastAcceptedState().getLastCommittedConfiguration(); + if (singleNodeDiscovery && + votingConfiguration.isEmpty() == false && + votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) { + throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" + + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() + + " does not have quorum in voting configuration " + votingConfiguration); + } ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .blocks(ClusterBlocks.builder() .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) @@ -1079,7 +1099,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery CoordinatorPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { - super(settings, transportService, transportAddressConnector, configuredHostsResolver); + super(settings, transportService, transportAddressConnector, + singleNodeDiscovery ? hostsResolver -> Collections.emptyList() : configuredHostsResolver); } @Override @@ -1090,6 +1111,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } + @Override + protected void startProbe(TransportAddress transportAddress) { + if (singleNodeDiscovery == false) { + super.startProbe(transportAddress); + } + } + @Override protected void onFoundPeersUpdated() { synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 10f10e1040d..d000565c3e2 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.discovery.single.SingleNodeDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.plugins.DiscoveryPlugin; @@ -51,7 +50,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.function.BiConsumer; @@ -70,6 +68,8 @@ public class DiscoveryModule { public static final String ZEN_DISCOVERY_TYPE = "legacy-zen"; public static final String ZEN2_DISCOVERY_TYPE = "zen"; + public static final String SINGLE_NODE_DISCOVERY_TYPE = "single-node"; + public static final Setting DISCOVERY_TYPE_SETTING = new Setting<>("discovery.type", ZEN2_DISCOVERY_TYPE, Function.identity(), Property.NodeScope); public static final Setting> LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING = @@ -119,6 +119,8 @@ public class DiscoveryModule { List filteredSeedProviders = seedProviderNames.stream() .map(hostProviders::get).map(Supplier::get).collect(Collectors.toList()); + String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); + final SeedHostsProvider seedHostsProvider = hostsResolver -> { final List addresses = new ArrayList<>(); for (SeedHostsProvider provider : filteredSeedProviders) { @@ -127,23 +129,20 @@ public class DiscoveryModule { return Collections.unmodifiableList(addresses); }; - Map> discoveryTypes = new HashMap<>(); - discoveryTypes.put(ZEN_DISCOVERY_TYPE, - () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, - clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState)); - discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, - transportService, namedWriteableRegistry, allocationService, masterService, - () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, clusterApplier, - joinValidators, new Random(Randomness.get().nextLong()))); - discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier, - gatewayMetaState)); - String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); - Supplier discoverySupplier = discoveryTypes.get(discoveryType); - if (discoverySupplier == null) { + if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) { + discovery = new Coordinator(NODE_NAME_SETTING.get(settings), + settings, clusterSettings, + transportService, namedWriteableRegistry, allocationService, masterService, + () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, + clusterApplier, joinValidators, new Random(Randomness.get().nextLong())); + } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) { + discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, + clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState); + } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } + logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames); - discovery = Objects.requireNonNull(discoverySupplier.get()); } private List getSeedProviderNames(Settings settings) { diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 32fd59d8a2c..c2ec9ae1905 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -308,7 +308,7 @@ public abstract class PeerFinder { return peersRemoved; } - private void startProbe(TransportAddress transportAddress) { + protected void startProbe(TransportAddress transportAddress) { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { logger.trace("startProbe({}) not running", transportAddress); diff --git a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java deleted file mode 100644 index 2a415a74cd0..00000000000 --- a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.single; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; -import org.elasticsearch.cluster.service.ClusterApplierService; -import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.gateway.GatewayMetaState; -import org.elasticsearch.transport.TransportService; - -import java.util.Objects; - -import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; - -/** - * A discovery implementation where the only member of the cluster is the local node. - */ -public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery { - private static final Logger logger = LogManager.getLogger(SingleNodeDiscovery.class); - - private final ClusterName clusterName; - protected final TransportService transportService; - private final ClusterApplier clusterApplier; - private volatile ClusterState clusterState; - - public SingleNodeDiscovery(final Settings settings, final TransportService transportService, - final MasterService masterService, final ClusterApplier clusterApplier, - final GatewayMetaState gatewayMetaState) { - this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - this.transportService = Objects.requireNonNull(transportService); - masterService.setClusterStateSupplier(() -> clusterState); - this.clusterApplier = clusterApplier; - - if (clusterApplier instanceof ClusterApplierService) { - ((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState); - } - } - - @Override - public synchronized void publish(final ClusterChangedEvent event, ActionListener publishListener, - final AckListener ackListener) { - clusterState = event.state(); - ackListener.onCommit(TimeValue.ZERO); - - clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - publishListener.onResponse(null); - ackListener.onNodeAck(transportService.getLocalNode(), null); - } - - @Override - public void onFailure(String source, Exception e) { - publishListener.onFailure(e); - ackListener.onNodeAck(transportService.getLocalNode(), e); - logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", event.source()), e); - } - }); - } - - @Override - public DiscoveryStats stats() { - return new DiscoveryStats(null, null); - } - - @Override - public synchronized void startInitialJoin() { - if (lifecycle.started() == false) { - throw new IllegalStateException("can't start initial join when not started"); - } - // apply a fresh cluster state just so that state recovery gets triggered by GatewayService - // TODO: give discovery module control over GatewayService - clusterState = ClusterState.builder(clusterState).build(); - clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> {}); - } - - @Override - protected synchronized void doStart() { - // set initial state - DiscoveryNode localNode = transportService.getLocalNode(); - clusterState = createInitialState(localNode); - clusterApplier.setInitialState(clusterState); - } - - protected ClusterState createInitialState(DiscoveryNode localNode) { - ClusterState.Builder builder = ClusterState.builder(clusterName); - return builder.nodes(DiscoveryNodes.builder().add(localNode) - .localNodeId(localNode.getId()) - .masterNodeId(localNode.getId()) - .build()) - .blocks(ClusterBlocks.builder() - .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) - .build(); - } - - @Override - protected void doStop() { - - } - - @Override - protected void doClose() { - - } - -} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 7f32c086f97..70c6f5d71bf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -23,6 +23,8 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.transport.TransportRequest; @@ -491,4 +493,52 @@ public class ClusterBootstrapServiceTests extends ESTestCase { deterministicTaskQueue.runAllTasks(); assertTrue(bootstrapped.get()); } + + public void testBootstrapsAutomaticallyWithSingleNodeDiscovery() { + final Settings.Builder settings = Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE) + .put(NODE_NAME_SETTING.getKey(), localNode.getName()); + final AtomicBoolean bootstrapped = new AtomicBoolean(); + + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(settings.build(), + transportService, () -> emptyList(), () -> false, vc -> { + assertTrue(bootstrapped.compareAndSet(false, true)); + assertThat(vc.getNodeIds(), hasSize(1)); + assertThat(vc.getNodeIds(), hasItem(localNode.getId())); + assertTrue(vc.hasQuorum(singletonList(localNode.getId()))); + }); + + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertTrue(bootstrapped.get()); + + bootstrapped.set(false); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + assertFalse(bootstrapped.get()); // should only bootstrap once + } + + public void testFailBootstrapWithBothSingleNodeDiscoveryAndInitialMasterNodes() { + final Settings.Builder settings = Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE) + .put(NODE_NAME_SETTING.getKey(), localNode.getName()) + .put(INITIAL_MASTER_NODES_SETTING.getKey(), "test"); + + assertThat(expectThrows(IllegalArgumentException.class, () -> new ClusterBootstrapService(settings.build(), + transportService, () -> emptyList(), () -> false, vc -> fail())).getMessage(), + containsString("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is not allowed when [discovery.type] is set " + + "to [single-node]")); + } + + public void testFailBootstrapNonMasterEligibleNodeWithSingleNodeDiscovery() { + final Settings.Builder settings = Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE) + .put(NODE_NAME_SETTING.getKey(), localNode.getName()) + .put(Node.NODE_MASTER_SETTING.getKey(), false); + + assertThat(expectThrows(IllegalArgumentException.class, () -> new ClusterBootstrapService(settings.build(), + transportService, () -> emptyList(), () -> false, vc -> fail())).getMessage(), + containsString("node with [discovery.type] set to [single-node] must be master-eligible")); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 16d0161815b..101b5755b4c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.env.NodeEnvironment; @@ -205,7 +206,7 @@ public class CoordinatorTests extends ESTestCase { } public void testDoesNotElectNonMasterNode() { - final Cluster cluster = new Cluster(randomIntBetween(1, 5), false); + final Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY); cluster.runRandomly(); cluster.stabilise(); @@ -905,7 +906,7 @@ public class CoordinatorTests extends ESTestCase { * and join the leader again. */ public void testStayCandidateAfterReceivingFollowerCheckFromKnownMaster() { - final Cluster cluster = new Cluster(2, false); + final Cluster cluster = new Cluster(2, false, Settings.EMPTY); cluster.runRandomly(); cluster.stabilise(); @@ -1030,7 +1031,7 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode shiftedNode = randomFrom(cluster2.clusterNodes).restartedNode(); final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(), - shiftedNode.getLocalNode(), n -> shiftedNode.persistedState); + shiftedNode.getLocalNode(), n -> shiftedNode.persistedState, shiftedNode.nodeSettings); cluster1.clusterNodes.add(newNode); MockLogAppender mockAppender = new MockLogAppender(); @@ -1054,7 +1055,7 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode detachedNode = newNode.restartedNode( metaData -> DetachClusterCommand.updateMetaData(metaData), - term -> DetachClusterCommand.updateCurrentTerm()); + term -> DetachClusterCommand.updateCurrentTerm(), newNode.nodeSettings); cluster1.clusterNodes.replaceAll(cn -> cn == newNode ? detachedNode : cn); cluster1.stabilise(); } @@ -1112,6 +1113,43 @@ public class CoordinatorTests extends ESTestCase { + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); } + public void testSingleNodeDiscoveryWithoutQuorum() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode clusterNode = cluster.getAnyNode(); + logger.debug("rebooting [{}]", clusterNode.getId()); + clusterNode.close(); + cluster.clusterNodes.forEach( + cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode( + new Runnable() { + @Override + public void run() { + cn.transportService.disconnectFromNode(clusterNode.getLocalNode()); + } + + @Override + public String toString() { + return "disconnect from " + clusterNode.getLocalNode() + " after shutdown"; + } + }))); + IllegalStateException ise = expectThrows(IllegalStateException.class, + () -> cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? + cn.restartedNode(Function.identity(), Function.identity(), Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build()) : + cn)); + assertThat(ise.getMessage(), containsString("cannot start with [discovery.type] set to [single-node] when local node")); + assertThat(ise.getMessage(), containsString("does not have quorum in voting configuration")); + } + + public void testSingleNodeDiscoveryWithQuorum() { + final Cluster cluster = new Cluster(1, randomBoolean(), Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build()); + cluster.runRandomly(); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -1185,10 +1223,10 @@ public class CoordinatorTests extends ESTestCase { private List seedHostsList; Cluster(int initialNodeCount) { - this(initialNodeCount, true); + this(initialNodeCount, true, Settings.EMPTY); } - Cluster(int initialNodeCount, boolean allNodesMasterEligible) { + Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); assertThat(initialNodeCount, greaterThan(0)); @@ -1197,7 +1235,7 @@ public class CoordinatorTests extends ESTestCase { clusterNodes = new ArrayList<>(initialNodeCount); for (int i = 0; i < initialNodeCount; i++) { final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), - allNodesMasterEligible || i == 0 || randomBoolean()); + allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings); clusterNodes.add(clusterNode); if (clusterNode.getLocalNode().isMasterNode()) { masterEligibleNodeIds.add(clusterNode.getId()); @@ -1230,7 +1268,7 @@ public class CoordinatorTests extends ESTestCase { final List addedNodes = new ArrayList<>(); for (int i = 0; i < newNodesCount; i++) { - final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true); + final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY); addedNodes.add(clusterNode); } clusterNodes.addAll(addedNodes); @@ -1702,6 +1740,7 @@ public class CoordinatorTests extends ESTestCase { private Coordinator coordinator; private final DiscoveryNode localNode; private final MockPersistedState persistedState; + private final Settings nodeSettings; private AckedFakeThreadPoolMasterService masterService; private DisruptableClusterApplierService clusterApplierService; private ClusterService clusterService; @@ -1709,13 +1748,15 @@ public class CoordinatorTests extends ESTestCase { private DisruptableMockTransport mockTransport; private List> extraJoinValidators = new ArrayList<>(); - ClusterNode(int nodeIndex, boolean masterEligible) { - this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier); + ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings) { + this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings); } - ClusterNode(int nodeIndex, DiscoveryNode localNode, Function persistedStateSupplier) { + ClusterNode(int nodeIndex, DiscoveryNode localNode, Function persistedStateSupplier, + Settings nodeSettings) { this.nodeIndex = nodeIndex; this.localNode = localNode; + this.nodeSettings = nodeSettings; persistedState = persistedStateSupplier.apply(localNode); onNodeLog(localNode, this::setUp).run(); } @@ -1739,7 +1780,8 @@ public class CoordinatorTests extends ESTestCase { } }; - final Settings settings = Settings.builder() + final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ? + nodeSettings : Settings.builder().put(nodeSettings) .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap transportService = mockTransport.createTransportService( @@ -1782,17 +1824,18 @@ public class CoordinatorTests extends ESTestCase { } ClusterNode restartedNode() { - return restartedNode(Function.identity(), Function.identity()); + return restartedNode(Function.identity(), Function.identity(), nodeSettings); } - ClusterNode restartedNode(Function adaptGlobalMetaData, Function adaptCurrentTerm) { + ClusterNode restartedNode(Function adaptGlobalMetaData, Function adaptCurrentTerm, + Settings nodeSettings) { final TransportAddress address = randomBoolean() ? buildNewFakeTransportAddress() : localNode.getAddress(); final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(), UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT); return new ClusterNode(nodeIndex, newLocalNode, - node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm)); + node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm), nodeSettings); } private PersistedState getPersistedState() { diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index 13d314c5750..c3cae8f10ff 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -19,33 +19,27 @@ package org.elasticsearch.discovery.single; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.coordination.JoinHelper; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.SeedHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; -import org.elasticsearch.discovery.zen.ZenPing; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; +import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.NodeConfigurationSource; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportService; -import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; -import java.util.Collections; -import java.util.Stack; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -69,53 +63,6 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase { .build(); } - public void testDoesNotRespondToZenPings() throws Exception { - final Settings settings = - Settings.builder().put("cluster.name", internalCluster().getClusterName()).build(); - final Version version = Version.CURRENT; - final Stack closeables = new Stack<>(); - final TestThreadPool threadPool = new TestThreadPool(getClass().getName()); - try { - final MockTransportService pingTransport = - MockTransportService.createNewService(settings, version, threadPool, null); - pingTransport.start(); - closeables.push(pingTransport); - final TransportService nodeTransport = - internalCluster().getInstance(TransportService.class); - // try to ping the single node directly - final SeedHostsProvider provider = - hostsResolver -> Collections.singletonList(nodeTransport.getLocalNode().getAddress()); - final CountDownLatch latch = new CountDownLatch(1); - final DiscoveryNodes nodes = DiscoveryNodes.builder() - .add(nodeTransport.getLocalNode()) - .add(pingTransport.getLocalNode()) - .localNodeId(pingTransport.getLocalNode().getId()) - .build(); - final ClusterName clusterName = new ClusterName(internalCluster().getClusterName()); - final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build(); - final UnicastZenPing unicastZenPing = - new UnicastZenPing(settings, threadPool, pingTransport, provider, () -> state) { - @Override - protected void finishPingingRound(PingingRound pingingRound) { - latch.countDown(); - super.finishPingingRound(pingingRound); - } - }; - unicastZenPing.start(); - closeables.push(unicastZenPing); - final CompletableFuture responses = new CompletableFuture<>(); - unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3)); - latch.await(); - responses.get(); - assertThat(responses.get().size(), equalTo(0)); - } finally { - while (!closeables.isEmpty()) { - IOUtils.closeWhileHandlingException(closeables.pop()); - } - terminate(threadPool); - } - } - public void testSingleNodesDoNotDiscoverEachOther() throws IOException, InterruptedException { final TransportService service = internalCluster().getInstance(TransportService.class); final int port = service.boundAddress().publishAddress().getPort(); @@ -167,6 +114,78 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase { } } + public void testCannotJoinNodeWithSingleNodeDiscovery() throws Exception { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test", + JoinHelper.class.getCanonicalName(), + Level.INFO, + "failed to join") { + + @Override + public boolean innerMatch(final LogEvent event) { + return event.getThrown() != null + && event.getThrown().getClass() == RemoteTransportException.class + && event.getThrown().getCause() != null + && event.getThrown().getCause().getClass() == IllegalStateException.class + && event.getThrown().getCause().getMessage().contains( + "cannot join node with [discovery.type] set to [single-node]"); + } + }); + final TransportService service = internalCluster().getInstance(TransportService.class); + final int port = service.boundAddress().publishAddress().getPort(); + final NodeConfigurationSource configurationSource = new NodeConfigurationSource() { + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings + .builder() + .put("discovery.type", "zen") + .put("transport.type", getTestTransportType()) + .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") + /* + * We align the port ranges of the two as then with zen discovery these two + * nodes would find each other. + */ + .put("transport.port", port + "-" + (port + 5 - 1)) + .build(); + } + + @Override + public Path nodeConfigPath(int nodeOrdinal) { + return null; + } + }; + try (InternalTestCluster other = + new InternalTestCluster( + randomLong(), + createTempDir(), + false, + false, + 1, + 1, + internalCluster().getClusterName(), + configurationSource, + 0, + "other", + Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class), + Function.identity())) { + + Logger clusterLogger = LogManager.getLogger(JoinHelper.class); + Loggers.addAppender(clusterLogger, mockAppender); + try { + other.beforeTest(random(), 0); + final ClusterState first = internalCluster().getInstance(ClusterService.class).state(); + assertThat(first.nodes().getSize(), equalTo(1)); + assertBusy(() -> mockAppender.assertAllExpectationsMatched()); + } finally { + Loggers.removeAppender(clusterLogger, mockAppender); + mockAppender.stop(); + } + } + } + public void testStatePersistence() throws Exception { createIndex("test"); internalCluster().fullRestart(); diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java deleted file mode 100644 index c3dfad2d437..00000000000 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.single; - -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.util.Stack; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import static org.elasticsearch.test.ClusterServiceUtils.createMasterService; -import static org.hamcrest.Matchers.equalTo; - -public class SingleNodeDiscoveryTests extends ESTestCase { - - public void testInitialJoin() throws Exception { - final Settings settings = Settings.EMPTY; - final Version version = Version.CURRENT; - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - final Stack stack = new Stack<>(); - try { - final MockTransportService transportService = - MockTransportService.createNewService(settings, version, threadPool, null); - stack.push(transportService); - transportService.start(); - final DiscoveryNode node = transportService.getLocalNode(); - final MasterService masterService = createMasterService(threadPool, node); - AtomicReference clusterState = new AtomicReference<>(); - final SingleNodeDiscovery discovery = - new SingleNodeDiscovery(Settings.EMPTY, transportService, - masterService, new ClusterApplier() { - @Override - public void setInitialState(ClusterState initialState) { - clusterState.set(initialState); - } - - @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, - ClusterApplyListener listener) { - clusterState.set(clusterStateSupplier.get()); - listener.onSuccess(source); - } - }, null); - discovery.start(); - discovery.startInitialJoin(); - final DiscoveryNodes nodes = clusterState.get().nodes(); - assertThat(nodes.getSize(), equalTo(1)); - assertThat(nodes.getMasterNode().getId(), equalTo(node.getId())); - } finally { - while (!stack.isEmpty()) { - IOUtils.closeWhileHandlingException(stack.pop()); - } - terminate(threadPool); - } - } - -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 544dea0600b..4d531d57cef 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1215,7 +1215,7 @@ public final class InternalTestCluster extends TestCluster { nextNodeId.set(newSize); assert size() == newSize; - if (newSize > 0) { + if (autoManageMinMasterNodes && newSize > 0) { validateClusterFormed(); } logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]",