From ca3f5c1e2e7454ed663d57c4b90aa6db5103c985 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 10 Dec 2018 17:23:03 +0000 Subject: [PATCH] Cancel GetDiscoveredNodesAction when bootstrapped (#36423) Today the `GetDiscoveredNodesAction` waits, possibly indefinitely, to discover enough nodes to bootstrap the cluster. However it is possible that the cluster forms before a node has discovered the expected collection of nodes, in which case the action will wait indefinitely despite the fact that it is no longer required. This commit changes the behaviour so that the action fails once a node receives a cluster state with a nonempty configuration, indicating that the cluster has been successfully bootstrapped and therefore the `GetDiscoveredNodesAction` need wait no longer. Relates #36380 and #36381; reverts 558f4ec27820e1a50660dc1f3437422150339af0. --- .../elasticsearch/ElasticsearchException.java | 4 +- .../bootstrap/BootstrapClusterAction.java | 2 +- .../bootstrap/GetDiscoveredNodesAction.java | 2 +- .../TransportGetDiscoveredNodesAction.java | 31 +++-- .../ClusterAlreadyBootstrappedException.java | 38 ++++++ .../coordination/ClusterBootstrapService.java | 7 +- .../cluster/coordination/Coordinator.java | 25 ++-- .../ExceptionSerializationTests.java | 2 + ...ransportGetDiscoveredNodesActionTests.java | 119 +++++++++++++++++- .../coordination/CoordinatorTests.java | 67 +++++++--- .../authz/privilege/SystemPrivilege.java | 2 +- .../xpack/test/rest/XPackRestIT.java | 5 +- 12 files changed, 253 insertions(+), 51 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/ClusterAlreadyBootstrappedException.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index efa1ccaf333..d18d4d4820f 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1008,7 +1008,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class, MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0), COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class, - org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0); + org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0), + CLUSTER_ALREADY_BOOTSTRAPPED_EXCEPTION(org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException.class, + org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException::new, 151, Version.V_7_0_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java index d060efcc5a1..28a8e580ced 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/BootstrapClusterAction.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; public class BootstrapClusterAction extends Action { public static final BootstrapClusterAction INSTANCE = new BootstrapClusterAction(); - public static final String NAME = "cluster:admin/bootstrap_cluster"; + public static final String NAME = "cluster:admin/bootstrap/set_voting_config"; private BootstrapClusterAction() { super(NAME); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java index 0a3ab72e115..acaef284a54 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesAction.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; public class GetDiscoveredNodesAction extends Action { public static final GetDiscoveredNodesAction INSTANCE = new GetDiscoveredNodesAction(); - public static final String NAME = "cluster:monitor/discovered_nodes"; + public static final String NAME = "cluster:admin/bootstrap/discover_nodes"; private GetDiscoveredNodesAction() { super(NAME); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java index 7951a926c10..c88454b6355 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; @@ -43,7 +44,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; @@ -89,18 +89,28 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction> respondIfRequestSatisfied = new Consumer>() { + final ActionListener> respondIfRequestSatisfied = new ActionListener>() { @Override - public void accept(Iterable nodes) { + public void onResponse(Iterable nodes) { final Set nodesSet = new LinkedHashSet<>(); nodesSet.add(localNode); nodes.forEach(nodesSet::add); logger.trace("discovered {}", nodesSet); try { - if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) { - listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet)); + if (checkWaitRequirements(request, nodesSet)) { + final GetDiscoveredNodesResponse response = new GetDiscoveredNodesResponse(nodesSet); + if (listenerNotified.compareAndSet(false, true)) { + listenableFuture.onResponse(response); + } } } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + if (listenerNotified.compareAndSet(false, true)) { listenableFuture.onFailure(e); } } @@ -113,15 +123,18 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction currentPublication = Optional.empty(); - private final Set>> discoveredNodesListeners = newConcurrentSet(); + private final Set>> discoveredNodesListeners = newConcurrentSet(); public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, @@ -166,8 +165,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); - this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped, - joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade); + this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, + this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, @@ -280,6 +279,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } + if (isInitialConfigurationSet()) { + for (final ActionListener> discoveredNodesListener : discoveredNodesListeners) { + discoveredNodesListener.onFailure(new ClusterAlreadyBootstrappedException()); + } + } + return new PublishWithJoinResponse(publishResponse, joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } @@ -704,10 +709,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } - private boolean isBootstrapped() { - return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false; - } - private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) { assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed"; synchronized (mutex) { @@ -715,7 +716,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery throw new IllegalStateException("Cannot overwrite configuration in mode " + mode); } - if (isBootstrapped()) { + if (isInitialConfigurationSet()) { throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to " + getLastAcceptedState().getLastAcceptedConfiguration()); } @@ -1014,8 +1015,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } - for (Consumer> discoveredNodesListener : discoveredNodesListeners) { - discoveredNodesListener.accept(foundPeers); + for (final ActionListener> discoveredNodesListener : discoveredNodesListeners) { + discoveredNodesListener.onResponse(foundPeers); } } } @@ -1051,7 +1052,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery }); } - public Releasable withDiscoveryListener(Consumer> listener) { + public Releasable withDiscoveryListener(ActionListener> listener) { discoveredNodesListeners.add(listener); return () -> { boolean removed = discoveredNodesListeners.remove(listener); diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 4bb180c3986..cee57c9f50c 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; @@ -807,6 +808,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(148, UnknownNamedObjectException.class); ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class); ids.put(150, CoordinationStateRejectedException.class); + ids.put(151, ClusterAlreadyBootstrappedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index b9b55d32be4..add52a1eedc 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -24,11 +24,19 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException; +import org.elasticsearch.cluster.coordination.ClusterBootstrapService; +import org.elasticsearch.cluster.coordination.CoordinationMetaData; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.coordination.NoOpClusterApplier; import org.elasticsearch.cluster.coordination.PeersResponse; +import org.elasticsearch.cluster.coordination.PublicationTransportHandler; +import org.elasticsearch.cluster.coordination.PublishWithJoinResponse; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; @@ -42,6 +50,7 @@ import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponseHandler; @@ -62,6 +71,7 @@ import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; @@ -113,10 +123,14 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); - final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, writableRegistry(), - ESAllocationTestCase.createAllocationService(Settings.EMPTY), - new MasterService("local", Settings.EMPTY, threadPool), + final Settings settings = Settings.builder() + .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), + ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + coordinator = new Coordinator("local", settings, clusterSettings, transportService, writableRegistry(), + ESAllocationTestCase.createAllocationService(settings), + new MasterService("local", settings, threadPool), () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(), new NoOpClusterApplier(), new Random(random().nextLong())); } @@ -152,7 +166,7 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); } - public void testFailsOnNonMasterEligibleNodes() throws InterruptedException { + public void testFailsOnMasterIneligibleNodes() throws InterruptedException { localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); // transport service only picks up local node when started, so we can change it here ^ @@ -230,6 +244,101 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { } } + public void testFailsIfAlreadyBootstrapped() throws InterruptedException { + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); + coordinator.setInitialConfiguration(new VotingConfiguration(singleton(localNode.getId()))); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(null); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) { + countDownLatch.countDown(); + } else { + throw new AssertionError("should not be called", exp); + } + } + }); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + public void testFailsIfAcceptsClusterStateWithNonemptyConfiguration() throws InterruptedException, IOException { + new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); + coordinator.startInitialJoin(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(3); + getDiscoveredNodesRequest.setTimeout(null); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) { + countDownLatch.countDown(); + } else { + throw new AssertionError("should not be called", exp); + } + } + }); + + ClusterState.Builder publishedClusterState = ClusterState.builder(ClusterName.DEFAULT); + publishedClusterState.incrementVersion(); + publishedClusterState.nodes(DiscoveryNodes.builder() + .add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(otherNode.getId())); + publishedClusterState.metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder() + .term(1) + .lastAcceptedConfiguration(new VotingConfiguration(singleton(otherNode.getId()))) + .lastCommittedConfiguration(new VotingConfiguration(singleton(otherNode.getId()))) + .build())); + + transportService.sendRequest(localNode, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, + new BytesTransportRequest(PublicationTransportHandler.serializeFullClusterState(publishedClusterState.build(), Version.CURRENT), + Version.CURRENT), + new TransportResponseHandler() { + @Override + public void handleResponse(PublishWithJoinResponse response) { + // do nothing + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public PublishWithJoinResponse read(StreamInput in) throws IOException { + return new PublishWithJoinResponse(in); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException { setupGetDiscoveredNodesAction(); final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 39be04db108..a9df470a9d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -682,9 +683,9 @@ public class CoordinatorTests extends ESTestCase { final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // register a listener and then deregister it again to show that it is not called after deregistration - try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ns -> { + try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ActionListener.wrap(() -> { throw new AssertionError("should not be called"); - })) { + }))) { // do nothing } @@ -692,23 +693,54 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode bootstrapNode = cluster.getAnyNode(); final AtomicBoolean hasDiscoveredAllPeers = new AtomicBoolean(); assertFalse(bootstrapNode.coordinator.getFoundPeers().iterator().hasNext()); - try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(discoveryNodes -> { - int peerCount = 0; - for (final DiscoveryNode discoveryNode : discoveryNodes) { - peerCount++; - } - assertThat(peerCount, lessThan(cluster.size())); - if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) { - hasDiscoveredAllPeers.set(true); - final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis; - logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis); - assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2)); - } - })) { + try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener( + new ActionListener>() { + @Override + public void onResponse(Iterable discoveryNodes) { + int peerCount = 0; + for (final DiscoveryNode discoveryNode : discoveryNodes) { + peerCount++; + } + assertThat(peerCount, lessThan(cluster.size())); + if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) { + hasDiscoveredAllPeers.set(true); + final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis; + logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis); + assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2)); + } + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + })) { cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "discovery phase"); } assertTrue(hasDiscoveredAllPeers.get()); + + final AtomicBoolean receivedAlreadyBootstrappedException = new AtomicBoolean(); + try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener( + new ActionListener>() { + @Override + public void onResponse(Iterable discoveryNodes) { + // ignore + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ClusterAlreadyBootstrappedException) { + receivedAlreadyBootstrappedException.set(true); + } else { + throw new AssertionError("unexpected", e); + } + } + })) { + + cluster.stabilise(); + } + assertTrue(receivedAlreadyBootstrappedException.get()); } public void testSettingInitialConfigurationTriggersElection() { @@ -1358,7 +1390,10 @@ public class CoordinatorTests extends ESTestCase { } }; - final Settings settings = Settings.EMPTY; + final Settings settings = Settings.builder() + .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), + ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterApplier = new FakeClusterApplier(settings, clusterSettings); masterService = new AckedFakeThreadPoolMasterService("test_node", "test", diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index b8f42cf2875..c673b8ee327 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -19,7 +19,7 @@ public final class SystemPrivilege extends Privilege { "internal:*", "indices:monitor/*", // added for monitoring "cluster:monitor/*", // added for monitoring - "cluster:admin/bootstrap_cluster", // for the bootstrap service + "cluster:admin/bootstrap/*", // for the bootstrap service "cluster:admin/reroute", // added for DiskThresholdDecider.DiskListener "indices:admin/mapping/put", // needed for recovery and shrink api "indices:admin/template/put", // needed for the TemplateUpgradeService diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index f4c96d979f1..ab6289c410b 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -9,7 +9,6 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.common.CheckedFunction; @@ -256,9 +255,7 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase { // it could be waiting for pending tasks while monitoring is still running). ESRestTestCase.waitForPendingTasks(adminClient(), task -> { // Don't check rollup jobs because we clear them in the superclass. - return task.contains(RollupJob.NAME) - // Also ignore the zen2 discovery task - || task.contains(GetDiscoveredNodesAction.NAME); + return task.contains(RollupJob.NAME); }); } }