Use default discovery implementation for single-node discovery (#40036)

Switches "discovery.type: single-node" from using a separate implementation for single-node discovery to using the existing standard discovery implementation, with two small adaptions:

-  auto-bootstrapping, but requiring initial_master_nodes not to be set.
- not actively pinging other nodes using the Peerfinder
- not allowing other nodes to join its single-node cluster (if they have e.g. been set up using regular discovery and connect to the single-disco node).
This commit is contained in:
Yannick Welsch 2019-03-27 18:48:35 +01:00
parent 3860ddd1a4
commit 8f7c5732f1
10 changed files with 258 additions and 328 deletions

View File

@ -29,10 +29,13 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@ -77,15 +80,28 @@ public class ClusterBootstrapService {
public ClusterBootstrapService(Settings settings, TransportService transportService,
Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier,
Consumer<VotingConfiguration> votingConfigurationConsumer) {
final List<String> initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes));
if (bootstrapRequirements.size() != initialMasterNodes.size()) {
throw new IllegalArgumentException(
"setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes);
if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
if (INITIAL_MASTER_NODES_SETTING.exists(settings)) {
throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() +
"] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "]");
}
if (DiscoveryNode.isMasterNode(settings) == false) {
throw new IllegalArgumentException("node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] must be master-eligible");
}
bootstrapRequirements = Collections.singleton(Node.NODE_NAME_SETTING.get(settings));
unconfiguredBootstrapTimeout = null;
} else {
final List<String> initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes));
if (bootstrapRequirements.size() != initialMasterNodes.size()) {
throw new IllegalArgumentException(
"setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes);
}
unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings);
}
unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.discoveredNodesSupplier = discoveredNodesSupplier;
this.isBootstrappedSupplier = isBootstrappedSupplier;

View File

@ -53,12 +53,14 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
@ -72,6 +74,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@ -99,6 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
private final Settings settings;
private final boolean singleNodeDiscovery;
private final TransportService transportService;
private final MasterService masterService;
private final AllocationService allocationService;
@ -149,6 +153,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this.persistedStateSupplier = persistedStateSupplier;
@ -448,6 +453,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
if (singleNodeDiscovery && joinRequest.getSourceNode().equals(getLocalNode()) == false) {
joinCallback.onFailure(new IllegalStateException("cannot join node with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
"] set to [" + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] discovery"));
return;
}
transportService.connectToNode(joinRequest.getSourceNode());
final ClusterState stateForJoinValidation = getStateForMasterService();
@ -666,6 +678,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
VotingConfiguration votingConfiguration = coordinationState.get().getLastAcceptedState().getLastCommittedConfiguration();
if (singleNodeDiscovery &&
votingConfiguration.isEmpty() == false &&
votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
" does not have quorum in voting configuration " + votingConfiguration);
}
ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
@ -1079,7 +1099,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
CoordinatorPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
super(settings, transportService, transportAddressConnector, configuredHostsResolver);
super(settings, transportService, transportAddressConnector,
singleNodeDiscovery ? hostsResolver -> Collections.emptyList() : configuredHostsResolver);
}
@Override
@ -1090,6 +1111,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
@Override
protected void startProbe(TransportAddress transportAddress) {
if (singleNodeDiscovery == false) {
super.startProbe(transportAddress);
}
}
@Override
protected void onFoundPeersUpdated() {
synchronized (mutex) {

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.plugins.DiscoveryPlugin;
@ -51,7 +50,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.BiConsumer;
@ -70,6 +68,8 @@ public class DiscoveryModule {
public static final String ZEN_DISCOVERY_TYPE = "legacy-zen";
public static final String ZEN2_DISCOVERY_TYPE = "zen";
public static final String SINGLE_NODE_DISCOVERY_TYPE = "single-node";
public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", ZEN2_DISCOVERY_TYPE, Function.identity(), Property.NodeScope);
public static final Setting<List<String>> LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING =
@ -119,6 +119,8 @@ public class DiscoveryModule {
List<SeedHostsProvider> filteredSeedProviders = seedProviderNames.stream()
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
final SeedHostsProvider seedHostsProvider = hostsResolver -> {
final List<TransportAddress> addresses = new ArrayList<>();
for (SeedHostsProvider provider : filteredSeedProviders) {
@ -127,23 +129,20 @@ public class DiscoveryModule {
return Collections.unmodifiableList(addresses);
};
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState));
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, clusterApplier,
joinValidators, new Random(Randomness.get().nextLong())));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier,
gatewayMetaState));
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
if (discoverySupplier == null) {
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
discovery = Objects.requireNonNull(discoverySupplier.get());
}
private List<String> getSeedProviderNames(Settings settings) {

View File

@ -308,7 +308,7 @@ public abstract class PeerFinder {
return peersRemoved;
}
private void startProbe(TransportAddress transportAddress) {
protected void startProbe(TransportAddress transportAddress) {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
logger.trace("startProbe({}) not running", transportAddress);

View File

@ -1,139 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.single;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.transport.TransportService;
import java.util.Objects;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
/**
* A discovery implementation where the only member of the cluster is the local node.
*/
public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery {
private static final Logger logger = LogManager.getLogger(SingleNodeDiscovery.class);
private final ClusterName clusterName;
protected final TransportService transportService;
private final ClusterApplier clusterApplier;
private volatile ClusterState clusterState;
public SingleNodeDiscovery(final Settings settings, final TransportService transportService,
final MasterService masterService, final ClusterApplier clusterApplier,
final GatewayMetaState gatewayMetaState) {
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = Objects.requireNonNull(transportService);
masterService.setClusterStateSupplier(() -> clusterState);
this.clusterApplier = clusterApplier;
if (clusterApplier instanceof ClusterApplierService) {
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
}
}
@Override
public synchronized void publish(final ClusterChangedEvent event, ActionListener<Void> publishListener,
final AckListener ackListener) {
clusterState = event.state();
ackListener.onCommit(TimeValue.ZERO);
clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
publishListener.onResponse(null);
ackListener.onNodeAck(transportService.getLocalNode(), null);
}
@Override
public void onFailure(String source, Exception e) {
publishListener.onFailure(e);
ackListener.onNodeAck(transportService.getLocalNode(), e);
logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", event.source()), e);
}
});
}
@Override
public DiscoveryStats stats() {
return new DiscoveryStats(null, null);
}
@Override
public synchronized void startInitialJoin() {
if (lifecycle.started() == false) {
throw new IllegalStateException("can't start initial join when not started");
}
// apply a fresh cluster state just so that state recovery gets triggered by GatewayService
// TODO: give discovery module control over GatewayService
clusterState = ClusterState.builder(clusterState).build();
clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> {});
}
@Override
protected synchronized void doStart() {
// set initial state
DiscoveryNode localNode = transportService.getLocalNode();
clusterState = createInitialState(localNode);
clusterApplier.setInitialState(clusterState);
}
protected ClusterState createInitialState(DiscoveryNode localNode) {
ClusterState.Builder builder = ClusterState.builder(clusterName);
return builder.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
.build())
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.TransportRequest;
@ -491,4 +493,52 @@ public class ClusterBootstrapServiceTests extends ESTestCase {
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
}
public void testBootstrapsAutomaticallyWithSingleNodeDiscovery() {
final Settings.Builder settings = Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE)
.put(NODE_NAME_SETTING.getKey(), localNode.getName());
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(settings.build(),
transportService, () -> emptyList(), () -> false, vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(), hasSize(1));
assertThat(vc.getNodeIds(), hasItem(localNode.getId()));
assertTrue(vc.hasQuorum(singletonList(localNode.getId())));
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
bootstrapped.set(false);
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertFalse(bootstrapped.get()); // should only bootstrap once
}
public void testFailBootstrapWithBothSingleNodeDiscoveryAndInitialMasterNodes() {
final Settings.Builder settings = Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE)
.put(NODE_NAME_SETTING.getKey(), localNode.getName())
.put(INITIAL_MASTER_NODES_SETTING.getKey(), "test");
assertThat(expectThrows(IllegalArgumentException.class, () -> new ClusterBootstrapService(settings.build(),
transportService, () -> emptyList(), () -> false, vc -> fail())).getMessage(),
containsString("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is not allowed when [discovery.type] is set " +
"to [single-node]"));
}
public void testFailBootstrapNonMasterEligibleNodeWithSingleNodeDiscovery() {
final Settings.Builder settings = Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE)
.put(NODE_NAME_SETTING.getKey(), localNode.getName())
.put(Node.NODE_MASTER_SETTING.getKey(), false);
assertThat(expectThrows(IllegalArgumentException.class, () -> new ClusterBootstrapService(settings.build(),
transportService, () -> emptyList(), () -> false, vc -> fail())).getMessage(),
containsString("node with [discovery.type] set to [single-node] must be master-eligible"));
}
}

View File

@ -63,6 +63,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.env.NodeEnvironment;
@ -205,7 +206,7 @@ public class CoordinatorTests extends ESTestCase {
}
public void testDoesNotElectNonMasterNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false);
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();
@ -905,7 +906,7 @@ public class CoordinatorTests extends ESTestCase {
* and join the leader again.
*/
public void testStayCandidateAfterReceivingFollowerCheckFromKnownMaster() {
final Cluster cluster = new Cluster(2, false);
final Cluster cluster = new Cluster(2, false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();
@ -1030,7 +1031,7 @@ public class CoordinatorTests extends ESTestCase {
final ClusterNode shiftedNode = randomFrom(cluster2.clusterNodes).restartedNode();
final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(),
shiftedNode.getLocalNode(), n -> shiftedNode.persistedState);
shiftedNode.getLocalNode(), n -> shiftedNode.persistedState, shiftedNode.nodeSettings);
cluster1.clusterNodes.add(newNode);
MockLogAppender mockAppender = new MockLogAppender();
@ -1054,7 +1055,7 @@ public class CoordinatorTests extends ESTestCase {
final ClusterNode detachedNode = newNode.restartedNode(
metaData -> DetachClusterCommand.updateMetaData(metaData),
term -> DetachClusterCommand.updateCurrentTerm());
term -> DetachClusterCommand.updateCurrentTerm(), newNode.nodeSettings);
cluster1.clusterNodes.replaceAll(cn -> cn == newNode ? detachedNode : cn);
cluster1.stabilise();
}
@ -1112,6 +1113,43 @@ public class CoordinatorTests extends ESTestCase {
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
}
public void testSingleNodeDiscoveryWithoutQuorum() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode clusterNode = cluster.getAnyNode();
logger.debug("rebooting [{}]", clusterNode.getId());
clusterNode.close();
cluster.clusterNodes.forEach(
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
new Runnable() {
@Override
public void run() {
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
}
@Override
public String toString() {
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
}
})));
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ?
cn.restartedNode(Function.identity(), Function.identity(), Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build()) :
cn));
assertThat(ise.getMessage(), containsString("cannot start with [discovery.type] set to [single-node] when local node"));
assertThat(ise.getMessage(), containsString("does not have quorum in voting configuration"));
}
public void testSingleNodeDiscoveryWithQuorum() {
final Cluster cluster = new Cluster(1, randomBoolean(), Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(),
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build());
cluster.runRandomly();
cluster.stabilise();
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -1185,10 +1223,10 @@ public class CoordinatorTests extends ESTestCase {
private List<TransportAddress> seedHostsList;
Cluster(int initialNodeCount) {
this(initialNodeCount, true);
this(initialNodeCount, true, Settings.EMPTY);
}
Cluster(int initialNodeCount, boolean allNodesMasterEligible) {
Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
assertThat(initialNodeCount, greaterThan(0));
@ -1197,7 +1235,7 @@ public class CoordinatorTests extends ESTestCase {
clusterNodes = new ArrayList<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(),
allNodesMasterEligible || i == 0 || randomBoolean());
allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings);
clusterNodes.add(clusterNode);
if (clusterNode.getLocalNode().isMasterNode()) {
masterEligibleNodeIds.add(clusterNode.getId());
@ -1230,7 +1268,7 @@ public class CoordinatorTests extends ESTestCase {
final List<ClusterNode> addedNodes = new ArrayList<>();
for (int i = 0; i < newNodesCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true);
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY);
addedNodes.add(clusterNode);
}
clusterNodes.addAll(addedNodes);
@ -1702,6 +1740,7 @@ public class CoordinatorTests extends ESTestCase {
private Coordinator coordinator;
private final DiscoveryNode localNode;
private final MockPersistedState persistedState;
private final Settings nodeSettings;
private AckedFakeThreadPoolMasterService masterService;
private DisruptableClusterApplierService clusterApplierService;
private ClusterService clusterService;
@ -1709,13 +1748,15 @@ public class CoordinatorTests extends ESTestCase {
private DisruptableMockTransport mockTransport;
private List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
ClusterNode(int nodeIndex, boolean masterEligible) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier);
ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings);
}
ClusterNode(int nodeIndex, DiscoveryNode localNode, Function<DiscoveryNode, MockPersistedState> persistedStateSupplier) {
ClusterNode(int nodeIndex, DiscoveryNode localNode, Function<DiscoveryNode, MockPersistedState> persistedStateSupplier,
Settings nodeSettings) {
this.nodeIndex = nodeIndex;
this.localNode = localNode;
this.nodeSettings = nodeSettings;
persistedState = persistedStateSupplier.apply(localNode);
onNodeLog(localNode, this::setUp).run();
}
@ -1739,7 +1780,8 @@ public class CoordinatorTests extends ESTestCase {
}
};
final Settings settings = Settings.builder()
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
nodeSettings : Settings.builder().put(nodeSettings)
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
transportService = mockTransport.createTransportService(
@ -1782,17 +1824,18 @@ public class CoordinatorTests extends ESTestCase {
}
ClusterNode restartedNode() {
return restartedNode(Function.identity(), Function.identity());
return restartedNode(Function.identity(), Function.identity(), nodeSettings);
}
ClusterNode restartedNode(Function<MetaData, MetaData> adaptGlobalMetaData, Function<Long, Long> adaptCurrentTerm) {
ClusterNode restartedNode(Function<MetaData, MetaData> adaptGlobalMetaData, Function<Long, Long> adaptCurrentTerm,
Settings nodeSettings) {
final TransportAddress address = randomBoolean() ? buildNewFakeTransportAddress() : localNode.getAddress();
final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(),
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
return new ClusterNode(nodeIndex, newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm));
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm), nodeSettings);
}
private PersistedState getPersistedState() {

View File

@ -19,33 +19,27 @@
package org.elasticsearch.discovery.single;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.coordination.JoinHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
@ -69,53 +63,6 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
.build();
}
public void testDoesNotRespondToZenPings() throws Exception {
final Settings settings =
Settings.builder().put("cluster.name", internalCluster().getClusterName()).build();
final Version version = Version.CURRENT;
final Stack<Closeable> closeables = new Stack<>();
final TestThreadPool threadPool = new TestThreadPool(getClass().getName());
try {
final MockTransportService pingTransport =
MockTransportService.createNewService(settings, version, threadPool, null);
pingTransport.start();
closeables.push(pingTransport);
final TransportService nodeTransport =
internalCluster().getInstance(TransportService.class);
// try to ping the single node directly
final SeedHostsProvider provider =
hostsResolver -> Collections.singletonList(nodeTransport.getLocalNode().getAddress());
final CountDownLatch latch = new CountDownLatch(1);
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
.localNodeId(pingTransport.getLocalNode().getId())
.build();
final ClusterName clusterName = new ClusterName(internalCluster().getClusterName());
final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build();
final UnicastZenPing unicastZenPing =
new UnicastZenPing(settings, threadPool, pingTransport, provider, () -> state) {
@Override
protected void finishPingingRound(PingingRound pingingRound) {
latch.countDown();
super.finishPingingRound(pingingRound);
}
};
unicastZenPing.start();
closeables.push(unicastZenPing);
final CompletableFuture<ZenPing.PingCollection> responses = new CompletableFuture<>();
unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3));
latch.await();
responses.get();
assertThat(responses.get().size(), equalTo(0));
} finally {
while (!closeables.isEmpty()) {
IOUtils.closeWhileHandlingException(closeables.pop());
}
terminate(threadPool);
}
}
public void testSingleNodesDoNotDiscoverEachOther() throws IOException, InterruptedException {
final TransportService service = internalCluster().getInstance(TransportService.class);
final int port = service.boundAddress().publishAddress().getPort();
@ -167,6 +114,78 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
}
}
public void testCannotJoinNodeWithSingleNodeDiscovery() throws Exception {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test",
JoinHelper.class.getCanonicalName(),
Level.INFO,
"failed to join") {
@Override
public boolean innerMatch(final LogEvent event) {
return event.getThrown() != null
&& event.getThrown().getClass() == RemoteTransportException.class
&& event.getThrown().getCause() != null
&& event.getThrown().getCause().getClass() == IllegalStateException.class
&& event.getThrown().getCause().getMessage().contains(
"cannot join node with [discovery.type] set to [single-node]");
}
});
final TransportService service = internalCluster().getInstance(TransportService.class);
final int port = service.boundAddress().publishAddress().getPort();
final NodeConfigurationSource configurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings
.builder()
.put("discovery.type", "zen")
.put("transport.type", getTestTransportType())
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s")
/*
* We align the port ranges of the two as then with zen discovery these two
* nodes would find each other.
*/
.put("transport.port", port + "-" + (port + 5 - 1))
.build();
}
@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
};
try (InternalTestCluster other =
new InternalTestCluster(
randomLong(),
createTempDir(),
false,
false,
1,
1,
internalCluster().getClusterName(),
configurationSource,
0,
"other",
Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class),
Function.identity())) {
Logger clusterLogger = LogManager.getLogger(JoinHelper.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
other.beforeTest(random(), 0);
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
assertBusy(() -> mockAppender.assertAllExpectationsMatched());
} finally {
Loggers.removeAppender(clusterLogger, mockAppender);
mockAppender.stop();
}
}
}
public void testStatePersistence() throws Exception {
createIndex("test");
internalCluster().fullRestart();

View File

@ -1,86 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.single;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.elasticsearch.test.ClusterServiceUtils.createMasterService;
import static org.hamcrest.Matchers.equalTo;
public class SingleNodeDiscoveryTests extends ESTestCase {
public void testInitialJoin() throws Exception {
final Settings settings = Settings.EMPTY;
final Version version = Version.CURRENT;
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
final Stack<Closeable> stack = new Stack<>();
try {
final MockTransportService transportService =
MockTransportService.createNewService(settings, version, threadPool, null);
stack.push(transportService);
transportService.start();
final DiscoveryNode node = transportService.getLocalNode();
final MasterService masterService = createMasterService(threadPool, node);
AtomicReference<ClusterState> clusterState = new AtomicReference<>();
final SingleNodeDiscovery discovery =
new SingleNodeDiscovery(Settings.EMPTY, transportService,
masterService, new ClusterApplier() {
@Override
public void setInitialState(ClusterState initialState) {
clusterState.set(initialState);
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
ClusterApplyListener listener) {
clusterState.set(clusterStateSupplier.get());
listener.onSuccess(source);
}
}, null);
discovery.start();
discovery.startInitialJoin();
final DiscoveryNodes nodes = clusterState.get().nodes();
assertThat(nodes.getSize(), equalTo(1));
assertThat(nodes.getMasterNode().getId(), equalTo(node.getId()));
} finally {
while (!stack.isEmpty()) {
IOUtils.closeWhileHandlingException(stack.pop());
}
terminate(threadPool);
}
}
}

View File

@ -1215,7 +1215,7 @@ public final class InternalTestCluster extends TestCluster {
nextNodeId.set(newSize);
assert size() == newSize;
if (newSize > 0) {
if (autoManageMinMasterNodes && newSize > 0) {
validateClusterFormed();
}
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]",