diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java index f81061e8547..ffd031785cf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.bootstrap; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -33,6 +34,8 @@ import java.io.IOException; public class GetDiscoveredNodesRequest extends ActionRequest { private int waitForNodes = 1; + + @Nullable // if the request should wait indefinitely private TimeValue timeout = TimeValue.timeValueSeconds(30); public GetDiscoveredNodesRequest() { @@ -41,7 +44,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest { public GetDiscoveredNodesRequest(StreamInput in) throws IOException { super(in); waitForNodes = in.readInt(); - timeout = in.readTimeValue(); + timeout = in.readOptionalTimeValue(); } /** @@ -74,8 +77,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest { * * @param timeout how long to wait to discover sufficiently many nodes to respond successfully. */ - public void setTimeout(TimeValue timeout) { - if (timeout.compareTo(TimeValue.ZERO) < 0) { + public void setTimeout(@Nullable TimeValue timeout) { + if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) { throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed"); } this.timeout = timeout; @@ -87,6 +90,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest { * * @return how long to wait to discover sufficiently many nodes to respond successfully. */ + @Nullable public TimeValue getTimeout() { return timeout; } @@ -105,7 +109,7 @@ public class GetDiscoveredNodesRequest extends ActionRequest { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeInt(waitForNodes); - out.writeTimeValue(timeout); + out.writeOptionalTimeValue(timeout); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java index a45d7c3246f..c4d979300fa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java @@ -108,18 +108,20 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction INITIAL_MASTER_NODE_COUNT_SETTING = + Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope); + + private final int initialMasterNodeCount; + private final TransportService transportService; + private volatile boolean running; + + public ClusterBootstrapService(Settings settings, TransportService transportService) { + initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings); + this.transportService = transportService; + } + + public void start() { + assert running == false; + running = true; + + if (initialMasterNodeCount > 0 && transportService.getLocalNode().isMasterNode()) { + logger.debug("unsafely waiting for discovery of [{}] master-eligible nodes", initialMasterNodeCount); + + final ThreadContext threadContext = transportService.getThreadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + + final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest(); + request.setWaitForNodes(initialMasterNodeCount); + request.setTimeout(null); + logger.trace("sending {}", request); + transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request, + new TransportResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + assert response.getNodes().size() >= initialMasterNodeCount; + assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode); + logger.debug("discovered {}, starting to bootstrap", response.getNodes()); + awaitBootstrap(response.getBootstrapConfiguration()); + } + + @Override + public void handleException(TransportException exp) { + logger.warn("discovery attempt failed", exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public GetDiscoveredNodesResponse read(StreamInput in) throws IOException { + return new GetDiscoveredNodesResponse(in); + } + }); + } + } + } + + public void stop() { + assert running == true; + running = false; + } + + private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) { + if (running == false) { + logger.debug("awaitBootstrap: not running"); + return; + } + + BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration); + logger.trace("sending {}", request); + transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request, + new TransportResponseHandler() { + @Override + public void handleResponse(BootstrapClusterResponse response) { + logger.debug("automatic cluster bootstrapping successful: received {}", response); + } + + @Override + public void handleException(TransportException exp) { + // log a warning since a failure here indicates a bad problem, such as: + // - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config) + // - discovered nodes no longer form a quorum in the bootstrap config + logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]", + bootstrapConfiguration.getNodeDescriptions()), exp); + + // There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed + // since a tight loop here is unlikely to help. + transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() { + @Override + public void run() { + awaitBootstrap(bootstrapConfiguration); + } + + @Override + public String toString() { + return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions(); + } + }); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public BootstrapClusterResponse read(StreamInput in) throws IOException { + return new BootstrapClusterResponse(in); + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ae69ff27718..28505966585 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -114,6 +114,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Releasable prevotingRound; private long maxTermSeen; private final Reconfigurator reconfigurator; + private final ClusterBootstrapService clusterBootstrapService; private Mode mode; private Optional lastKnownLeader; @@ -151,6 +152,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); + this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); } private Runnable getOnLeaderFailure() { @@ -483,11 +485,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery synchronized (mutex) { becomeCandidate("startInitialJoin"); } + + clusterBootstrapService.start(); } @Override protected void doStop() { configuredHostsResolver.stop(); + clusterBootstrapService.stop(); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 7f159ddb42a..de8d8f7db85 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.coordination.ClusterBootstrapService; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.JoinHelper; @@ -459,7 +460,8 @@ public final class ClusterSettings extends AbstractScopedSettings { Coordinator.PUBLISH_TIMEOUT_SETTING, JoinHelper.JOIN_TIMEOUT_SETTING, Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION, - TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING + TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING, + ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java index 9ce53a93efa..f15e0af1740 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; @@ -56,6 +57,9 @@ public class GetDiscoveredNodesRequestTests extends ESTestCase { () -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1)))); assertThat(exception.getMessage(), startsWith("negative timeout of ")); assertThat(exception.getMessage(), endsWith(" is not allowed")); + + getDiscoveredNodesRequest.setTimeout(null); + assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue()); } public void testSerialization() throws IOException { @@ -67,6 +71,8 @@ public class GetDiscoveredNodesRequestTests extends ESTestCase { if (randomBoolean()) { originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout")); + } else if (randomBoolean()) { + originalRequest.setTimeout(null); } final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index 1a8ef39fdf8..ba663e76b46 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -171,34 +171,53 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); } - public void testFailsQuicklyWithZeroTimeout() throws InterruptedException { + public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException { new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action transportService.start(); transportService.acceptIncomingRequests(); coordinator.start(); coordinator.startInitialJoin(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setWaitForNodes(2); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(null); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + } - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); - assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); - countDownLatch.countDown(); - } - }); + { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } } public void testGetsDiscoveredNodes() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java new file mode 100644 index 00000000000..952d7c0e752 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest; +import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest; +import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; + +public class ClusterBootstrapServiceTests extends ESTestCase { + + private DiscoveryNode localNode, otherNode1, otherNode2; + private DeterministicTaskQueue deterministicTaskQueue; + private TransportService transportService; + private ClusterBootstrapService clusterBootstrapService; + + @Before + public void createServices() { + localNode = newDiscoveryNode("local"); + otherNode1 = newDiscoveryNode("other1"); + otherNode2 = newDiscoveryNode("other2"); + + deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + final MockTransport transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + throw new AssertionError("unexpected " + action); + } + }; + + transportService = transport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + clusterBootstrapService = new ClusterBootstrapService(Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 3).build(), + transportService); + } + + private DiscoveryNode newDiscoveryNode(String nodeName) { + return new DiscoveryNode(nodeName, randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), + Version.CURRENT); + } + + private void startServices() { + transportService.start(); + transportService.acceptIncomingRequests(); + clusterBootstrapService.start(); + } + + public void testDoesNothingOnNonMasterNodes() { + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + (request, channel, task) -> { + throw new AssertionError("should not make a discovery request"); + }); + + startServices(); + deterministicTaskQueue.runAllTasks(); + } + + public void testDoesNothingIfSettingIsUnset() { + clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService); + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + (request, channel, task) -> { + throw new AssertionError("should not make a discovery request"); + }); + startServices(); + deterministicTaskQueue.runAllTasks(); + } + + public void testDoesNotRetryOnDiscoveryFailure() { + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + new TransportRequestHandler() { + private boolean called = false; + + @Override + public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel channel, Task task) { + assert called == false; + called = true; + throw new IllegalArgumentException("simulate failure of discovery request"); + } + }); + + startServices(); + deterministicTaskQueue.runAllTasks(); + } + + public void testBootstrapsOnDiscoverySuccess() { + final AtomicBoolean discoveryAttempted = new AtomicBoolean(); + final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + (request, channel, task) -> { + assertTrue(discoveryAttempted.compareAndSet(false, true)); + channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)); + }); + + final AtomicBoolean bootstrapAttempted = new AtomicBoolean(); + transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, + (request, channel, task) -> { + assertTrue(bootstrapAttempted.compareAndSet(false, true)); + channel.sendResponse(new BootstrapClusterResponse(false)); + }); + + startServices(); + deterministicTaskQueue.runAllTasks(); + + assertTrue(discoveryAttempted.get()); + assertTrue(bootstrapAttempted.get()); + } + + public void testRetriesOnBootstrapFailure() { + final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + (request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes))); + + AtomicLong callCount = new AtomicLong(); + transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, + (request, channel, task) -> { + callCount.incrementAndGet(); + channel.sendResponse(new ElasticsearchException("simulated exception")); + }); + + startServices(); + while (callCount.get() < 5) { + if (deterministicTaskQueue.hasDeferredTasks()) { + deterministicTaskQueue.advanceTime(); + } + deterministicTaskQueue.runAllRunnableTasks(); + } + } + + public void testStopsRetryingBootstrapWhenStopped() { + final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet()); + transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, + (request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes))); + + transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new, + (request, channel, task) -> channel.sendResponse(new ElasticsearchException("simulated exception"))); + + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 200000, new Runnable() { + @Override + public void run() { + clusterBootstrapService.stop(); + } + + @Override + public String toString() { + return "stop cluster bootstrap service"; + } + }); + + startServices(); + deterministicTaskQueue.runAllTasks(); + // termination means success + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 13a8eae467c..ef76597ef18 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -202,6 +203,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { // turn it off for these tests. .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1) .put(nodeSettings()) // allow test cases to provide their own settings or override these .build(); Collection> plugins = getPlugins(); @@ -217,14 +219,11 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { plugins.add(MockHttpTransport.TestPlugin.class); } Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings()); - bootstrapNodes(true, - () -> { - try { - node.start(); - } catch (NodeValidationException e) { - throw new RuntimeException(e); - } - }, Collections.singletonList(node), logger, this::wrapClient); + try { + node.start(); + } catch (NodeValidationException e) { + throw new RuntimeException(e); + } return node; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index c3515bbe95f..043f206b6a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -47,18 +47,10 @@ import org.apache.lucene.util.TestRuleMarkFailure; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; -import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest; import org.elasticsearch.bootstrap.BootstrapForTesting; -import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.SuppressForbidden; @@ -93,7 +85,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; @@ -109,7 +100,6 @@ import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.AnalysisModule; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; @@ -125,7 +115,6 @@ import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransportPlugin; -import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.nio.MockNioTransportPlugin; import org.joda.time.DateTimeZone; import org.junit.After; @@ -159,11 +148,9 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.IntFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -303,81 +290,6 @@ public abstract class ESTestCase extends LuceneTestCase { return new TransportAddress(TransportAddress.META_ADDRESS, portGenerator.incrementAndGet()); } - public static void bootstrapNodes(boolean condition, Runnable startAction, List nodes, Logger logger, - Function clientWrapper) { - final AtomicBoolean stopBootstrapThread = new AtomicBoolean(); - Thread bootstrapThread = null; - - if (condition) { - int zen2MasterNodeCount = 0; - for (Node node : nodes) { - if (DiscoveryNode.isMasterNode(node.settings())) { - Discovery discovery = node.injector().getInstance(Discovery.class); - if (discovery instanceof Coordinator) { - zen2MasterNodeCount++; - } - } - } - - if (zen2MasterNodeCount > 0) { - final int minimumConfigurationSize = randomIntBetween(1, zen2MasterNodeCount); - final Random bootstrapRandom = new Random(randomLong()); - - bootstrapThread = new Thread(() -> { - BootstrapClusterRequest bootstrapClusterRequest = null; - while (stopBootstrapThread.get() == false) { - final Node node = randomFrom(bootstrapRandom, nodes); - final TransportService transportService = node.injector().getInstance(TransportService.class); - if (transportService.getLocalNode() != null) { - final Client client = clientWrapper.apply(node.client()); - if (bootstrapClusterRequest == null) { - try { - final GetDiscoveredNodesRequest discoveredNodesRequest = new GetDiscoveredNodesRequest(); - discoveredNodesRequest.setWaitForNodes(minimumConfigurationSize); - bootstrapClusterRequest = new BootstrapClusterRequest( - client.execute(GetDiscoveredNodesAction.INSTANCE, discoveredNodesRequest).get() - .getBootstrapConfiguration()); - } catch (Exception e) { - logger.trace("exception getting bootstrap configuration", e); - } - } else { - try { - client.execute(BootstrapClusterAction.INSTANCE, bootstrapClusterRequest).get(); - if (usually(bootstrapRandom)) { - // occasionally carry on trying to bootstrap even after one request succeeded. - return; - } - } catch (Exception e) { - logger.trace("exception bootstrapping cluster", e); - } - } - } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new AssertionError("interrupted while sleeping", e); - } - } - }, "Bootstrap-Thread for " + ClusterName.CLUSTER_NAME_SETTING.get(nodes.get(0).settings())); - bootstrapThread.start(); - } - } - - try { - startAction.run(); - } finally { - if (bootstrapThread != null) { - stopBootstrapThread.set(true); - try { - bootstrapThread.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - } - /** * Called when a test fails, supplying the errors it generated. Not called when the test fails because assumptions are violated. */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 587404fff35..fba872b6fe2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -147,6 +147,7 @@ import java.util.stream.Stream; import static java.util.Collections.emptyList; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; @@ -154,7 +155,6 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; -import static org.elasticsearch.test.ESTestCase.bootstrapNodes; import static org.elasticsearch.test.ESTestCase.getTestTransportType; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -525,9 +525,9 @@ public final class InternalTestCluster extends TestCluster { } final int ord = nextNodeId.getAndIncrement(); final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node. - final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted); + final Settings settings = Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1).build(); + final NodeAndClient buildNode = buildNode(ord, random.nextLong(), settings, false, 1, onTransportServiceStarted); assert nodes.isEmpty(); - bootstrapNodes(true, buildNode::startNode, Collections.singletonList(buildNode.node()), logger, clientWrapper); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -1081,6 +1081,9 @@ public final class InternalTestCluster extends TestCluster { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); + if (prevNodeCount == 0 && autoManageMinMasterNodes) { + settings.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), numSharedDedicatedMasterNodes); + } NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); @@ -1091,6 +1094,8 @@ public final class InternalTestCluster extends TestCluster { // if we don't have dedicated master nodes, keep things default settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); + } else if (prevNodeCount == 0 && autoManageMinMasterNodes) { + settings.put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), numSharedDataNodes); } NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, onTransportServiceStarted); @@ -1105,9 +1110,6 @@ public final class InternalTestCluster extends TestCluster { toStartAndPublish.add(nodeAndClient); } - bootstrapNodes(prevNodeCount == 0, () -> startAndPublishNodesAndClients(toStartAndPublish), - toStartAndPublish.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper); - startAndPublishNodesAndClients(toStartAndPublish); nextNodeId.set(newSize); @@ -1915,16 +1917,23 @@ public final class InternalTestCluster extends TestCluster { final List nodes = new ArrayList<>(); final int prevMasterCount = getMasterNodesCount(); for (Settings nodeSettings : settings) { - nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes))); + final Settings nodeSettingsIncludingBootstrap; + if (prevMasterCount == 0 && autoManageMinMasterNodes) { + nodeSettingsIncludingBootstrap = Settings.builder() + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), + (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count()) + .put(nodeSettings) + .build(); + } else { + nodeSettingsIncludingBootstrap = nodeSettings; + } + + nodes.add(buildNode(nodeSettingsIncludingBootstrap, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes))); + } + startAndPublishNodesAndClients(nodes); + if (autoManageMinMasterNodes) { + validateClusterFormed(); } - bootstrapNodes(prevMasterCount == 0, - () -> { - startAndPublishNodesAndClients(nodes); - if (autoManageMinMasterNodes) { - validateClusterFormed(); - } - }, - nodes.stream().map(NodeAndClient::node).collect(Collectors.toList()), logger, clientWrapper); return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index d5a5d04dded..b8f42cf2875 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -19,6 +19,7 @@ public final class SystemPrivilege extends Privilege { "internal:*", "indices:monitor/*", // added for monitoring "cluster:monitor/*", // added for monitoring + "cluster:admin/bootstrap_cluster", // for the bootstrap service "cluster:admin/reroute", // added for DiskThresholdDecider.DiskListener "indices:admin/mapping/put", // needed for recovery and shrink api "indices:admin/template/put", // needed for the TemplateUpgradeService