[Zen2] Introduce ClusterBootstrapService (#35488)

Today, the bootstrapping of a Zen2 cluster is driven externally, requiring
something else to wait for discovery to converge and then to inject the initial
configuration. This is hard to use in some situations, such as REST tests.

This change introduces the `ClusterBootstrapService` which brings the bootstrap
retry logic within each node and allows it to be controlled via an (unsafe)
node setting.
This commit is contained in:
David Turner 2018-11-15 20:09:22 +00:00 committed by GitHub
parent 135c3f0f07
commit 86ef041539
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 466 additions and 146 deletions

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -33,6 +34,8 @@ import java.io.IOException;
public class GetDiscoveredNodesRequest extends ActionRequest {
private int waitForNodes = 1;
@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);
public GetDiscoveredNodesRequest() {
@ -41,7 +44,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
super(in);
waitForNodes = in.readInt();
timeout = in.readTimeValue();
timeout = in.readOptionalTimeValue();
}
/**
@ -74,8 +77,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
*
* @param timeout how long to wait to discover sufficiently many nodes to respond successfully.
*/
public void setTimeout(TimeValue timeout) {
if (timeout.compareTo(TimeValue.ZERO) < 0) {
public void setTimeout(@Nullable TimeValue timeout) {
if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed");
}
this.timeout = timeout;
@ -87,6 +90,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
*
* @return how long to wait to discover sufficiently many nodes to respond successfully.
*/
@Nullable
public TimeValue getTimeout() {
return timeout;
}
@ -105,7 +109,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(waitForNodes);
out.writeTimeValue(timeout);
out.writeOptionalTimeValue(timeout);
}
@Override

View File

@ -108,6 +108,7 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
respondIfRequestSatisfied.accept(coordinator.getFoundPeers());
if (request.getTimeout() != null) {
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
@ -123,3 +124,4 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
});
}
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
public class ClusterBootstrapService {
private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class);
// The number of master-eligible nodes which, if discovered, can be used to bootstrap the cluster. This setting is unsafe in the event
// that more master nodes are started than expected.
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);
private final int initialMasterNodeCount;
private final TransportService transportService;
private volatile boolean running;
public ClusterBootstrapService(Settings settings, TransportService transportService) {
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
this.transportService = transportService;
}
public void start() {
assert running == false;
running = true;
if (initialMasterNodeCount > 0 && transportService.getLocalNode().isMasterNode()) {
logger.debug("unsafely waiting for discovery of [{}] master-eligible nodes", initialMasterNodeCount);
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setWaitForNodes(initialMasterNodeCount);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
assert response.getNodes().size() >= initialMasterNodeCount;
assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode);
logger.debug("discovered {}, starting to bootstrap", response.getNodes());
awaitBootstrap(response.getBootstrapConfiguration());
}
@Override
public void handleException(TransportException exp) {
logger.warn("discovery attempt failed", exp);
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
return new GetDiscoveredNodesResponse(in);
}
});
}
}
}
public void stop() {
assert running == true;
running = false;
}
private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) {
if (running == false) {
logger.debug("awaitBootstrap: not running");
return;
}
BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request,
new TransportResponseHandler<BootstrapClusterResponse>() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
logger.debug("automatic cluster bootstrapping successful: received {}", response);
}
@Override
public void handleException(TransportException exp) {
// log a warning since a failure here indicates a bad problem, such as:
// - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config)
// - discovered nodes no longer form a quorum in the bootstrap config
logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]",
bootstrapConfiguration.getNodeDescriptions()), exp);
// There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed
// since a tight loop here is unlikely to help.
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() {
@Override
public void run() {
awaitBootstrap(bootstrapConfiguration);
}
@Override
public String toString() {
return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions();
}
});
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public BootstrapClusterResponse read(StreamInput in) throws IOException {
return new BootstrapClusterResponse(in);
}
});
}
}

View File

@ -114,6 +114,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Releasable prevotingRound;
private long maxTermSeen;
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
@ -151,6 +152,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
}
private Runnable getOnLeaderFailure() {
@ -483,11 +485,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
becomeCandidate("startInitialJoin");
}
clusterBootstrapService.start();
}
@Override
protected void doStop() {
configuredHostsResolver.stop();
clusterBootstrapService.stop();
}
@Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.JoinHelper;
@ -459,7 +460,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
Coordinator.PUBLISH_TIMEOUT_SETTING,
JoinHelper.JOIN_TIMEOUT_SETTING,
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING
)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;
@ -56,6 +57,9 @@ public class GetDiscoveredNodesRequestTests extends ESTestCase {
() -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1))));
assertThat(exception.getMessage(), startsWith("negative timeout of "));
assertThat(exception.getMessage(), endsWith(" is not allowed"));
getDiscoveredNodesRequest.setTimeout(null);
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue());
}
public void testSerialization() throws IOException {
@ -67,6 +71,8 @@ public class GetDiscoveredNodesRequestTests extends ESTestCase {
if (randomBoolean()) {
originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
} else if (randomBoolean()) {
originalRequest.setTimeout(null);
}
final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new);

View File

@ -171,13 +171,31 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testFailsQuicklyWithZeroTimeout() throws InterruptedException {
public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(null);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
}
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
@ -200,6 +218,7 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
}
public void testGetsDiscoveredNodes() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
public class ClusterBootstrapServiceTests extends ESTestCase {
private DiscoveryNode localNode, otherNode1, otherNode2;
private DeterministicTaskQueue deterministicTaskQueue;
private TransportService transportService;
private ClusterBootstrapService clusterBootstrapService;
@Before
public void createServices() {
localNode = newDiscoveryNode("local");
otherNode1 = newDiscoveryNode("other1");
otherNode2 = newDiscoveryNode("other2");
deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
final MockTransport transport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
throw new AssertionError("unexpected " + action);
}
};
transportService = transport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
clusterBootstrapService = new ClusterBootstrapService(Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 3).build(),
transportService);
}
private DiscoveryNode newDiscoveryNode(String nodeName) {
return new DiscoveryNode(nodeName, randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER),
Version.CURRENT);
}
private void startServices() {
transportService.start();
transportService.acceptIncomingRequests();
clusterBootstrapService.start();
}
public void testDoesNothingOnNonMasterNodes() {
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
throw new AssertionError("should not make a discovery request");
});
startServices();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNothingIfSettingIsUnset() {
clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService);
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
throw new AssertionError("should not make a discovery request");
});
startServices();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotRetryOnDiscoveryFailure() {
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
new TransportRequestHandler<GetDiscoveredNodesRequest>() {
private boolean called = false;
@Override
public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel channel, Task task) {
assert called == false;
called = true;
throw new IllegalArgumentException("simulate failure of discovery request");
}
});
startServices();
deterministicTaskQueue.runAllTasks();
}
public void testBootstrapsOnDiscoverySuccess() {
final AtomicBoolean discoveryAttempted = new AtomicBoolean();
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
assertTrue(discoveryAttempted.compareAndSet(false, true));
channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes));
});
final AtomicBoolean bootstrapAttempted = new AtomicBoolean();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> {
assertTrue(bootstrapAttempted.compareAndSet(false, true));
channel.sendResponse(new BootstrapClusterResponse(false));
});
startServices();
deterministicTaskQueue.runAllTasks();
assertTrue(discoveryAttempted.get());
assertTrue(bootstrapAttempted.get());
}
public void testRetriesOnBootstrapFailure() {
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)));
AtomicLong callCount = new AtomicLong();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> {
callCount.incrementAndGet();
channel.sendResponse(new ElasticsearchException("simulated exception"));
});
startServices();
while (callCount.get() < 5) {
if (deterministicTaskQueue.hasDeferredTasks()) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks();
}
}
public void testStopsRetryingBootstrapWhenStopped() {
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)));
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> channel.sendResponse(new ElasticsearchException("simulated exception")));
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 200000, new Runnable() {
@Override
public void run() {
clusterBootstrapService.stop();
}
@Override
public String toString() {
return "stop cluster bootstrap service";
}
});
startServices();
deterministicTaskQueue.runAllTasks();
// termination means success
}
}

View File

@ -62,6 +62,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@ -202,6 +203,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
// turn it off for these tests.
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1)
.put(nodeSettings()) // allow test cases to provide their own settings or override these
.build();
Collection<Class<? extends Plugin>> plugins = getPlugins();
@ -217,14 +219,11 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
plugins.add(MockHttpTransport.TestPlugin.class);
}
Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings());
bootstrapNodes(true,
() -> {
try {
node.start();
} catch (NodeValidationException e) {
throw new RuntimeException(e);
}
}, Collections.singletonList(node), logger, this::wrapClient);
return node;
}

View File

@ -47,18 +47,10 @@ import org.apache.lucene.util.TestRuleMarkFailure;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.bootstrap.BootstrapForTesting;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.SuppressForbidden;
@ -93,7 +85,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
@ -109,7 +100,6 @@ import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
@ -125,7 +115,6 @@ import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.joda.time.DateTimeZone;
import org.junit.After;
@ -159,11 +148,9 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -303,81 +290,6 @@ public abstract class ESTestCase extends LuceneTestCase {
return new TransportAddress(TransportAddress.META_ADDRESS, portGenerator.incrementAndGet());
}
public static void bootstrapNodes(boolean condition, Runnable startAction, List<Node> nodes, Logger logger,
Function<Client, Client> clientWrapper) {
final AtomicBoolean stopBootstrapThread = new AtomicBoolean();
Thread bootstrapThread = null;
if (condition) {
int zen2MasterNodeCount = 0;
for (Node node : nodes) {
if (DiscoveryNode.isMasterNode(node.settings())) {
Discovery discovery = node.injector().getInstance(Discovery.class);
if (discovery instanceof Coordinator) {
zen2MasterNodeCount++;
}
}
}
if (zen2MasterNodeCount > 0) {
final int minimumConfigurationSize = randomIntBetween(1, zen2MasterNodeCount);
final Random bootstrapRandom = new Random(randomLong());
bootstrapThread = new Thread(() -> {
BootstrapClusterRequest bootstrapClusterRequest = null;
while (stopBootstrapThread.get() == false) {
final Node node = randomFrom(bootstrapRandom, nodes);
final TransportService transportService = node.injector().getInstance(TransportService.class);
if (transportService.getLocalNode() != null) {
final Client client = clientWrapper.apply(node.client());
if (bootstrapClusterRequest == null) {
try {
final GetDiscoveredNodesRequest discoveredNodesRequest = new GetDiscoveredNodesRequest();
discoveredNodesRequest.setWaitForNodes(minimumConfigurationSize);
bootstrapClusterRequest = new BootstrapClusterRequest(
client.execute(GetDiscoveredNodesAction.INSTANCE, discoveredNodesRequest).get()
.getBootstrapConfiguration());
} catch (Exception e) {
logger.trace("exception getting bootstrap configuration", e);
}
} else {
try {
client.execute(BootstrapClusterAction.INSTANCE, bootstrapClusterRequest).get();
if (usually(bootstrapRandom)) {
// occasionally carry on trying to bootstrap even after one request succeeded.
return;
}
} catch (Exception e) {
logger.trace("exception bootstrapping cluster", e);
}
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new AssertionError("interrupted while sleeping", e);
}
}
}, "Bootstrap-Thread for " + ClusterName.CLUSTER_NAME_SETTING.get(nodes.get(0).settings()));
bootstrapThread.start();
}
}
try {
startAction.run();
} finally {
if (bootstrapThread != null) {
stopBootstrapThread.set(true);
try {
bootstrapThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Called when a test fails, supplying the errors it generated. Not called when the test fails because assumptions are violated.
*/

View File

@ -147,6 +147,7 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
@ -154,7 +155,6 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M
import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.bootstrapNodes;
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -525,9 +525,9 @@ public final class InternalTestCluster extends TestCluster {
}
final int ord = nextNodeId.getAndIncrement();
final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node.
final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted);
final Settings settings = Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1).build();
final NodeAndClient buildNode = buildNode(ord, random.nextLong(), settings, false, 1, onTransportServiceStarted);
assert nodes.isEmpty();
bootstrapNodes(true, buildNode::startNode, Collections.singletonList(buildNode.node()), logger, clientWrapper);
buildNode.startNode();
publishNode(buildNode);
return buildNode;
@ -1081,6 +1081,9 @@ public final class InternalTestCluster extends TestCluster {
final Settings.Builder settings = Settings.builder();
settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
settings.put(Node.NODE_DATA_SETTING.getKey(), false);
if (prevNodeCount == 0 && autoManageMinMasterNodes) {
settings.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), numSharedDedicatedMasterNodes);
}
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes,
onTransportServiceStarted);
toStartAndPublish.add(nodeAndClient);
@ -1091,6 +1094,8 @@ public final class InternalTestCluster extends TestCluster {
// if we don't have dedicated master nodes, keep things default
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
} else if (prevNodeCount == 0 && autoManageMinMasterNodes) {
settings.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), numSharedDataNodes);
}
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes,
onTransportServiceStarted);
@ -1105,9 +1110,6 @@ public final class InternalTestCluster extends TestCluster {
toStartAndPublish.add(nodeAndClient);
}
bootstrapNodes(prevNodeCount == 0, () -> startAndPublishNodesAndClients(toStartAndPublish),
toStartAndPublish.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper);
startAndPublishNodesAndClients(toStartAndPublish);
nextNodeId.set(newSize);
@ -1915,16 +1917,23 @@ public final class InternalTestCluster extends TestCluster {
final List<NodeAndClient> nodes = new ArrayList<>();
final int prevMasterCount = getMasterNodesCount();
for (Settings nodeSettings : settings) {
nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes)));
final Settings nodeSettingsIncludingBootstrap;
if (prevMasterCount == 0 && autoManageMinMasterNodes) {
nodeSettingsIncludingBootstrap = Settings.builder()
.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(),
(int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count())
.put(nodeSettings)
.build();
} else {
nodeSettingsIncludingBootstrap = nodeSettings;
}
nodes.add(buildNode(nodeSettingsIncludingBootstrap, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes)));
}
bootstrapNodes(prevMasterCount == 0,
() -> {
startAndPublishNodesAndClients(nodes);
if (autoManageMinMasterNodes) {
validateClusterFormed();
}
},
nodes.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper);
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
}

View File

@ -19,6 +19,7 @@ public final class SystemPrivilege extends Privilege {
"internal:*",
"indices:monitor/*", // added for monitoring
"cluster:monitor/*", // added for monitoring
"cluster:admin/bootstrap_cluster", // for the bootstrap service
"cluster:admin/reroute", // added for DiskThresholdDecider.DiskListener
"indices:admin/mapping/put", // needed for recovery and shrink api
"indices:admin/template/put", // needed for the TemplateUpgradeService