Introduce single-node discovery

This commit adds a single node discovery type. With this discovery type,
a node will elect itself as master and never form a cluster with another
node.

Relates #23595
This commit is contained in:
Jason Tedor 2017-04-04 03:02:58 -04:00 committed by GitHub
parent 3bd2efa177
commit 71293a89bf
6 changed files with 460 additions and 18 deletions

View File

@ -19,6 +19,20 @@
package org.elasticsearch.discovery;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -28,21 +42,6 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* A module for loading classes for node discovery.
*/
@ -83,6 +82,7 @@ public class DiscoveryModule {
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, clusterService, hostsProvider));
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, clusterService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
clusterService, hostsProvider).entrySet().forEach(entry -> {
@ -96,10 +96,12 @@ public class DiscoveryModule {
if (discoverySupplier == null) {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType);
discovery = Objects.requireNonNull(discoverySupplier.get());
}
public Discovery getDiscovery() {
return discovery;
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.single;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PendingClusterStatesQueue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* A discovery implementation where the only member of the cluster is the local node.
*/
public class SingleNodeDiscovery extends AbstractLifecycleComponent implements Discovery {
private final ClusterService clusterService;
private final DiscoverySettings discoverySettings;
public SingleNodeDiscovery(final Settings settings, final ClusterService clusterService) {
super(Objects.requireNonNull(settings));
this.clusterService = Objects.requireNonNull(clusterService);
final ClusterSettings clusterSettings =
Objects.requireNonNull(clusterService.getClusterSettings());
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
}
@Override
public DiscoveryNode localNode() {
return clusterService.localNode();
}
@Override
public String nodeDescription() {
return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId();
}
@Override
public void setAllocationService(final AllocationService allocationService) {
}
@Override
public void publish(final ClusterChangedEvent event, final AckListener listener) {
}
@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
}
@Override
public DiscoverySettings getDiscoverySettings() {
return discoverySettings;
}
@Override
public void startInitialJoin() {
final ClusterStateTaskExecutor<DiscoveryNode> executor =
new ClusterStateTaskExecutor<DiscoveryNode>() {
@Override
public ClusterTasksResult<DiscoveryNode> execute(
final ClusterState current,
final List<DiscoveryNode> tasks) throws Exception {
assert tasks.size() == 1;
final DiscoveryNodes.Builder nodes =
DiscoveryNodes.builder(current.nodes());
// always set the local node as master, there will not be other nodes
nodes.masterNodeId(localNode().getId());
final ClusterState next =
ClusterState.builder(current).nodes(nodes).build();
final ClusterTasksResult.Builder<DiscoveryNode> result =
ClusterTasksResult.builder();
return result.successes(tasks).build(next);
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
};
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTasks(
"single-node-start-initial-join",
Collections.singletonMap(localNode(), (s, e) -> {}), config, executor);
}
@Override
public int getMinimumMasterNodes() {
return 1;
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() throws IOException {
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.single;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ESIntegTestCase.ClusterScope(
scope = ESIntegTestCase.Scope.TEST,
numDataNodes = 1,
numClientNodes = 0,
supportsDedicatedMasters = false,
autoMinMasterNodes = false)
public class SingleNodeDiscoveryIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings
.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("discovery.type", "single-node")
// TODO: do not use such a restrictive ephemeral port range
.put("transport.tcp.port", "49152-49156")
.build();
}
public void testDoesNotRespondToZenPings() throws Exception {
final Settings settings =
Settings.builder().put("cluster.name", internalCluster().getClusterName()).build();
final Version version = Version.CURRENT;
final Stack<Closeable> closeables = new Stack<>();
final TestThreadPool threadPool = new TestThreadPool(getClass().getName());
try {
final MockTransportService pingTransport =
MockTransportService.createNewService(settings, version, threadPool, null);
pingTransport.start();
closeables.push(pingTransport);
final TransportService nodeTransport =
internalCluster().getInstance(TransportService.class);
// try to ping the single node directly
final UnicastHostsProvider provider =
() -> Collections.singletonList(nodeTransport.getLocalNode());
final CountDownLatch latch = new CountDownLatch(1);
final UnicastZenPing unicastZenPing =
new UnicastZenPing(settings, threadPool, pingTransport, provider) {
@Override
protected void finishPingingRound(PingingRound pingingRound) {
latch.countDown();
super.finishPingingRound(pingingRound);
}
};
final DiscoveryNodes nodes =
DiscoveryNodes.builder().add(pingTransport.getLocalNode()).build();
final ClusterName clusterName = new ClusterName(internalCluster().getClusterName());
final ClusterState state = ClusterState.builder(clusterName).nodes(nodes).build();
unicastZenPing.start(new PingContextProvider() {
@Override
public ClusterState clusterState() {
return state;
}
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes
.builder()
.add(nodeTransport.getLocalNode())
.add(pingTransport.getLocalNode())
.localNodeId(pingTransport.getLocalNode().getId())
.build();
}
});
closeables.push(unicastZenPing);
final CompletableFuture<ZenPing.PingCollection> responses = new CompletableFuture<>();
unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3));
latch.await();
responses.get();
assertThat(responses.get().size(), equalTo(0));
} finally {
while (!closeables.isEmpty()) {
IOUtils.closeWhileHandlingException(closeables.pop());
}
terminate(threadPool);
}
}
public void testSingleNodesDoNotDiscoverEachOther() throws IOException, InterruptedException {
final NodeConfigurationSource configurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings
.builder()
.put("discovery.type", "single-node")
.put("http.enabled", false)
.put("transport.type", "mock-socket-network")
/*
* We align the port ranges of the two as then with zen discovery these two
* nodes would find each other.
*/
// TODO: do not use such a restrictive ephemeral port range
.put("transport.tcp.port", "49152-49156")
.build();
}
};
try (InternalTestCluster other =
new InternalTestCluster(
randomLong(),
createTempDir(),
false,
false,
1,
1,
internalCluster().getClusterName(),
configurationSource,
0,
false,
"other",
Collections.singletonList(MockTcpTransportPlugin.class),
Function.identity())) {
other.beforeTest(random(), 0);
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
final ClusterState second = other.getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
assertThat(second.nodes().getSize(), equalTo(1));
assertThat(
first.nodes().getMasterNodeId(),
not(equalTo(second.nodes().getMasterNodeId())));
assertThat(
first.metaData().clusterUUID(),
not(equalTo(second.metaData().clusterUUID())));
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.single;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.equalTo;
public class SingleNodeDiscoveryTests extends ESTestCase {
public void testInitialJoin() throws Exception {
final Settings settings = Settings.EMPTY;
final Version version = Version.CURRENT;
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
final Stack<Closeable> stack = new Stack<>();
try {
final MockTransportService transportService =
MockTransportService.createNewService(settings, version, threadPool, null);
stack.push(transportService);
transportService.start();
final DiscoveryNode node = transportService.getLocalNode();
final ClusterService clusterService = createClusterService(threadPool, node);
stack.push(clusterService);
final SingleNodeDiscovery discovery =
new SingleNodeDiscovery(Settings.EMPTY, clusterService);
discovery.startInitialJoin();
// we are racing against the initial join which is asynchronous so we use an observer
final ClusterState state = clusterService.state();
final ThreadContext threadContext = threadPool.getThreadContext();
final ClusterStateObserver observer =
new ClusterStateObserver(state, clusterService, null, logger, threadContext);
if (state.nodes().getMasterNodeId() == null) {
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
latch.countDown();
}
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
assert false;
}
}, s -> s.nodes().getMasterNodeId() != null);
latch.await();
}
final DiscoveryNodes nodes = clusterService.state().nodes();
assertThat(nodes.getSize(), equalTo(1));
assertThat(nodes.getMasterNode().getId(), equalTo(node.getId()));
} finally {
while (!stack.isEmpty()) {
IOUtils.closeWhileHandlingException(stack.pop());
}
terminate(threadPool);
}
}
}

View File

@ -1,3 +1,5 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -25,3 +27,16 @@ apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile project(path: ':client:transport', configuration: 'runtime') // randomly swapped in as a transport
}
task singleNodeIntegTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}
singleNodeIntegTestCluster {
numNodes = 1
setting 'discovery.type', 'single-node'
}
integTest.dependsOn(singleNodeIntegTestRunner, 'singleNodeIntegTestCluster#stop')
check.dependsOn(integTest)

View File

@ -64,6 +64,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
@ -590,7 +591,8 @@ public final class InternalTestCluster extends TestCluster {
.put("node.name", name)
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed);
if (autoManageMinMasterNodes) {
final boolean usingSingleNodeDiscovery = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(finalSettings.build()).equals("single-node");
if (!usingSingleNodeDiscovery && autoManageMinMasterNodes) {
assert finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null :
"min master nodes may not be set when auto managed";
assert finalSettings.get(INITIAL_STATE_TIMEOUT_SETTING.getKey()) == null :
@ -600,7 +602,7 @@ public final class InternalTestCluster extends TestCluster {
// don't wait too long not to slow down tests
.put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s")
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes);
} else if (finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null) {
} else if (!usingSingleNodeDiscovery && finalSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null) {
throw new IllegalArgumentException(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " must be configured");
}
MockNode node = new MockNode(finalSettings.build(), plugins);
@ -1083,7 +1085,7 @@ public final class InternalTestCluster extends TestCluster {
}
return true;
}, 30, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("cluster failed to from with expected nodes " + expectedNodes + " and actual nodes " +
throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " +
client.admin().cluster().prepareState().get().getState().nodes());
}
} catch (InterruptedException e) {