Cancel GetDiscoveredNodesAction when bootstrapped (#36423)
Today the `GetDiscoveredNodesAction` waits, possibly indefinitely, to discover
enough nodes to bootstrap the cluster. However it is possible that the cluster
forms before a node has discovered the expected collection of nodes, in which
case the action will wait indefinitely despite the fact that it is no longer
required.
This commit changes the behaviour so that the action fails once a node receives
a cluster state with a nonempty configuration, indicating that the cluster has
been successfully bootstrapped and therefore the `GetDiscoveredNodesAction`
need wait no longer.
Relates #36380 and #36381; reverts 558f4ec278
.
This commit is contained in:
parent
c3dd0d393d
commit
ca3f5c1e2e
|
@ -1008,7 +1008,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
|
||||
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0),
|
||||
COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class,
|
||||
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0);
|
||||
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0),
|
||||
CLUSTER_ALREADY_BOOTSTRAPPED_EXCEPTION(org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException.class,
|
||||
org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException::new, 151, Version.V_7_0_0);
|
||||
|
||||
final Class<? extends ElasticsearchException> exceptionClass;
|
||||
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
|
||||
public class BootstrapClusterAction extends Action<BootstrapClusterResponse> {
|
||||
public static final BootstrapClusterAction INSTANCE = new BootstrapClusterAction();
|
||||
public static final String NAME = "cluster:admin/bootstrap_cluster";
|
||||
public static final String NAME = "cluster:admin/bootstrap/set_voting_config";
|
||||
|
||||
private BootstrapClusterAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
|
||||
public class GetDiscoveredNodesAction extends Action<GetDiscoveredNodesResponse> {
|
||||
public static final GetDiscoveredNodesAction INSTANCE = new GetDiscoveredNodesAction();
|
||||
public static final String NAME = "cluster:monitor/discovered_nodes";
|
||||
public static final String NAME = "cluster:admin/bootstrap/discover_nodes";
|
||||
|
||||
private GetDiscoveredNodesAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -43,7 +44,6 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
|
||||
|
@ -89,18 +89,28 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
|
|||
listenableFuture.addListener(listener, directExecutor, threadPool.getThreadContext());
|
||||
// TODO make it so that listenableFuture copes with multiple completions, and then remove listenerNotified
|
||||
|
||||
final Consumer<Iterable<DiscoveryNode>> respondIfRequestSatisfied = new Consumer<Iterable<DiscoveryNode>>() {
|
||||
final ActionListener<Iterable<DiscoveryNode>> respondIfRequestSatisfied = new ActionListener<Iterable<DiscoveryNode>>() {
|
||||
@Override
|
||||
public void accept(Iterable<DiscoveryNode> nodes) {
|
||||
public void onResponse(Iterable<DiscoveryNode> nodes) {
|
||||
final Set<DiscoveryNode> nodesSet = new LinkedHashSet<>();
|
||||
nodesSet.add(localNode);
|
||||
nodes.forEach(nodesSet::add);
|
||||
logger.trace("discovered {}", nodesSet);
|
||||
try {
|
||||
if (checkWaitRequirements(request, nodesSet) && listenerNotified.compareAndSet(false, true)) {
|
||||
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
|
||||
if (checkWaitRequirements(request, nodesSet)) {
|
||||
final GetDiscoveredNodesResponse response = new GetDiscoveredNodesResponse(nodesSet);
|
||||
if (listenerNotified.compareAndSet(false, true)) {
|
||||
listenableFuture.onResponse(response);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (listenerNotified.compareAndSet(false, true)) {
|
||||
listenableFuture.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
@ -113,15 +123,18 @@ public class TransportGetDiscoveredNodesAction extends HandledTransportAction<Ge
|
|||
|
||||
final Releasable releasable = coordinator.withDiscoveryListener(respondIfRequestSatisfied);
|
||||
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
|
||||
respondIfRequestSatisfied.accept(coordinator.getFoundPeers());
|
||||
|
||||
if (coordinator.isInitialConfigurationSet()) {
|
||||
respondIfRequestSatisfied.onFailure(new ClusterAlreadyBootstrappedException());
|
||||
} else {
|
||||
respondIfRequestSatisfied.onResponse(coordinator.getFoundPeers());
|
||||
}
|
||||
|
||||
if (request.getTimeout() != null) {
|
||||
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (listenerNotified.compareAndSet(false, true)) {
|
||||
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
|
||||
}
|
||||
respondIfRequestSatisfied.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.coordination;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Exception thrown if trying to discovery nodes in order to perform cluster bootstrapping, but a cluster is formed before all the required
|
||||
* nodes are discovered.
|
||||
*/
|
||||
public class ClusterAlreadyBootstrappedException extends ElasticsearchException {
|
||||
public ClusterAlreadyBootstrappedException() {
|
||||
super("node has already joined a bootstrapped cluster, bootstrapping is not required");
|
||||
}
|
||||
|
||||
public ClusterAlreadyBootstrappedException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
}
|
|
@ -117,7 +117,12 @@ public class ClusterBootstrapService {
|
|||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("discovery attempt failed", exp);
|
||||
final Throwable rootCause = exp.getRootCause();
|
||||
if (rootCause instanceof ClusterAlreadyBootstrappedException) {
|
||||
logger.debug(rootCause.getMessage(), rootCause);
|
||||
} else {
|
||||
logger.warn("discovery attempt failed", exp);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -76,7 +76,6 @@ import java.util.Optional;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -135,7 +134,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
private JoinHelper.JoinAccumulator joinAccumulator;
|
||||
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
|
||||
|
||||
private final Set<Consumer<Iterable<DiscoveryNode>>> discoveredNodesListeners = newConcurrentSet();
|
||||
private final Set<ActionListener<Iterable<DiscoveryNode>>> discoveredNodesListeners = newConcurrentSet();
|
||||
|
||||
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
|
||||
|
@ -166,8 +165,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
masterService.setClusterStateSupplier(this::getStateForMasterService);
|
||||
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
|
||||
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
|
||||
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, this::isBootstrapped,
|
||||
joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
|
||||
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
|
||||
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
|
||||
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
|
||||
transportService::getLocalNode);
|
||||
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
|
||||
|
@ -280,6 +279,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
|
||||
}
|
||||
|
||||
if (isInitialConfigurationSet()) {
|
||||
for (final ActionListener<Iterable<DiscoveryNode>> discoveredNodesListener : discoveredNodesListeners) {
|
||||
discoveredNodesListener.onFailure(new ClusterAlreadyBootstrappedException());
|
||||
}
|
||||
}
|
||||
|
||||
return new PublishWithJoinResponse(publishResponse,
|
||||
joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term()));
|
||||
}
|
||||
|
@ -704,10 +709,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isBootstrapped() {
|
||||
return getLastAcceptedState().getLastAcceptedConfiguration().isEmpty() == false;
|
||||
}
|
||||
|
||||
private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
|
||||
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
|
||||
synchronized (mutex) {
|
||||
|
@ -715,7 +716,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
|
||||
}
|
||||
|
||||
if (isBootstrapped()) {
|
||||
if (isInitialConfigurationSet()) {
|
||||
throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
|
||||
+ getLastAcceptedState().getLastAcceptedConfiguration());
|
||||
}
|
||||
|
@ -1014,8 +1015,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
}
|
||||
}
|
||||
|
||||
for (Consumer<Iterable<DiscoveryNode>> discoveredNodesListener : discoveredNodesListeners) {
|
||||
discoveredNodesListener.accept(foundPeers);
|
||||
for (final ActionListener<Iterable<DiscoveryNode>> discoveredNodesListener : discoveredNodesListeners) {
|
||||
discoveredNodesListener.onResponse(foundPeers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1051,7 +1052,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
});
|
||||
}
|
||||
|
||||
public Releasable withDiscoveryListener(Consumer<Iterable<DiscoveryNode>> listener) {
|
||||
public Releasable withDiscoveryListener(ActionListener<Iterable<DiscoveryNode>> listener) {
|
||||
discoveredNodesListeners.add(listener);
|
||||
return () -> {
|
||||
boolean removed = discoveredNodesListeners.remove(listener);
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.action.support.replication.ReplicationOperation;
|
|||
import org.elasticsearch.client.AbstractClientHeadersTestCase;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException;
|
||||
|
@ -807,6 +808,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(148, UnknownNamedObjectException.class);
|
||||
ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class);
|
||||
ids.put(150, CoordinationStateRejectedException.class);
|
||||
ids.put(151, ClusterAlreadyBootstrappedException.class);
|
||||
|
||||
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
||||
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
||||
|
|
|
@ -24,11 +24,19 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ESAllocationTestCase;
|
||||
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
|
||||
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator;
|
||||
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
|
||||
import org.elasticsearch.cluster.coordination.NoOpClusterApplier;
|
||||
import org.elasticsearch.cluster.coordination.PeersResponse;
|
||||
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
|
||||
import org.elasticsearch.cluster.coordination.PublishWithJoinResponse;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
|
@ -42,6 +50,7 @@ import org.elasticsearch.test.transport.MockTransport;
|
|||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.BytesTransportRequest;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
|
@ -62,6 +71,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
|
||||
import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME;
|
||||
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
|
||||
|
@ -113,10 +123,14 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
|
||||
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, writableRegistry(),
|
||||
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
|
||||
new MasterService("local", Settings.EMPTY, threadPool),
|
||||
final Settings settings = Settings.builder()
|
||||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
|
||||
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
coordinator = new Coordinator("local", settings, clusterSettings, transportService, writableRegistry(),
|
||||
ESAllocationTestCase.createAllocationService(settings),
|
||||
new MasterService("local", settings, threadPool),
|
||||
() -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(),
|
||||
new NoOpClusterApplier(), new Random(random().nextLong()));
|
||||
}
|
||||
|
@ -152,7 +166,7 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void testFailsOnNonMasterEligibleNodes() throws InterruptedException {
|
||||
public void testFailsOnMasterIneligibleNodes() throws InterruptedException {
|
||||
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
// transport service only picks up local node when started, so we can change it here ^
|
||||
|
||||
|
@ -230,6 +244,101 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testFailsIfAlreadyBootstrapped() throws InterruptedException {
|
||||
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
coordinator.start();
|
||||
coordinator.startInitialJoin();
|
||||
coordinator.setInitialConfiguration(new VotingConfiguration(singleton(localNode.getId())));
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setWaitForNodes(2);
|
||||
getDiscoveredNodesRequest.setTimeout(null);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) {
|
||||
countDownLatch.countDown();
|
||||
} else {
|
||||
throw new AssertionError("should not be called", exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void testFailsIfAcceptsClusterStateWithNonemptyConfiguration() throws InterruptedException, IOException {
|
||||
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
coordinator.start();
|
||||
coordinator.startInitialJoin();
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
getDiscoveredNodesRequest.setWaitForNodes(3);
|
||||
getDiscoveredNodesRequest.setTimeout(null);
|
||||
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(GetDiscoveredNodesResponse response) {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) {
|
||||
countDownLatch.countDown();
|
||||
} else {
|
||||
throw new AssertionError("should not be called", exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ClusterState.Builder publishedClusterState = ClusterState.builder(ClusterName.DEFAULT);
|
||||
publishedClusterState.incrementVersion();
|
||||
publishedClusterState.nodes(DiscoveryNodes.builder()
|
||||
.add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(otherNode.getId()));
|
||||
publishedClusterState.metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder()
|
||||
.term(1)
|
||||
.lastAcceptedConfiguration(new VotingConfiguration(singleton(otherNode.getId())))
|
||||
.lastCommittedConfiguration(new VotingConfiguration(singleton(otherNode.getId())))
|
||||
.build()));
|
||||
|
||||
transportService.sendRequest(localNode, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
|
||||
new BytesTransportRequest(PublicationTransportHandler.serializeFullClusterState(publishedClusterState.build(), Version.CURRENT),
|
||||
Version.CURRENT),
|
||||
new TransportResponseHandler<PublishWithJoinResponse>() {
|
||||
@Override
|
||||
public void handleResponse(PublishWithJoinResponse response) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
throw new AssertionError("should not be called", exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PublishWithJoinResponse read(StreamInput in) throws IOException {
|
||||
return new PublishWithJoinResponse(in);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException {
|
||||
setupGetDiscoveredNodesAction();
|
||||
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
|
@ -682,9 +683,9 @@ public class CoordinatorTests extends ESTestCase {
|
|||
final Cluster cluster = new Cluster(randomIntBetween(2, 5));
|
||||
|
||||
// register a listener and then deregister it again to show that it is not called after deregistration
|
||||
try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ns -> {
|
||||
try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ActionListener.wrap(() -> {
|
||||
throw new AssertionError("should not be called");
|
||||
})) {
|
||||
}))) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
|
@ -692,23 +693,54 @@ public class CoordinatorTests extends ESTestCase {
|
|||
final ClusterNode bootstrapNode = cluster.getAnyNode();
|
||||
final AtomicBoolean hasDiscoveredAllPeers = new AtomicBoolean();
|
||||
assertFalse(bootstrapNode.coordinator.getFoundPeers().iterator().hasNext());
|
||||
try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(discoveryNodes -> {
|
||||
int peerCount = 0;
|
||||
for (final DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
peerCount++;
|
||||
}
|
||||
assertThat(peerCount, lessThan(cluster.size()));
|
||||
if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) {
|
||||
hasDiscoveredAllPeers.set(true);
|
||||
final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis;
|
||||
logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis);
|
||||
assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2));
|
||||
}
|
||||
})) {
|
||||
try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(
|
||||
new ActionListener<Iterable<DiscoveryNode>>() {
|
||||
@Override
|
||||
public void onResponse(Iterable<DiscoveryNode> discoveryNodes) {
|
||||
int peerCount = 0;
|
||||
for (final DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
peerCount++;
|
||||
}
|
||||
assertThat(peerCount, lessThan(cluster.size()));
|
||||
if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) {
|
||||
hasDiscoveredAllPeers.set(true);
|
||||
final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis;
|
||||
logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis);
|
||||
assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
})) {
|
||||
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "discovery phase");
|
||||
}
|
||||
|
||||
assertTrue(hasDiscoveredAllPeers.get());
|
||||
|
||||
final AtomicBoolean receivedAlreadyBootstrappedException = new AtomicBoolean();
|
||||
try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(
|
||||
new ActionListener<Iterable<DiscoveryNode>>() {
|
||||
@Override
|
||||
public void onResponse(Iterable<DiscoveryNode> discoveryNodes) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ClusterAlreadyBootstrappedException) {
|
||||
receivedAlreadyBootstrappedException.set(true);
|
||||
} else {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
})) {
|
||||
|
||||
cluster.stabilise();
|
||||
}
|
||||
assertTrue(receivedAlreadyBootstrappedException.get());
|
||||
}
|
||||
|
||||
public void testSettingInitialConfigurationTriggersElection() {
|
||||
|
@ -1358,7 +1390,10 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
|
||||
final Settings settings = Settings.EMPTY;
|
||||
final Settings settings = Settings.builder()
|
||||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
|
||||
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
clusterApplier = new FakeClusterApplier(settings, clusterSettings);
|
||||
masterService = new AckedFakeThreadPoolMasterService("test_node", "test",
|
||||
|
|
|
@ -19,7 +19,7 @@ public final class SystemPrivilege extends Privilege {
|
|||
"internal:*",
|
||||
"indices:monitor/*", // added for monitoring
|
||||
"cluster:monitor/*", // added for monitoring
|
||||
"cluster:admin/bootstrap_cluster", // for the bootstrap service
|
||||
"cluster:admin/bootstrap/*", // for the bootstrap service
|
||||
"cluster:admin/reroute", // added for DiskThresholdDecider.DiskListener
|
||||
"indices:admin/mapping/put", // needed for recovery and shrink api
|
||||
"indices:admin/template/put", // needed for the TemplateUpgradeService
|
||||
|
|
|
@ -9,7 +9,6 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
|||
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
|
@ -256,9 +255,7 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
|
|||
// it could be waiting for pending tasks while monitoring is still running).
|
||||
ESRestTestCase.waitForPendingTasks(adminClient(), task -> {
|
||||
// Don't check rollup jobs because we clear them in the superclass.
|
||||
return task.contains(RollupJob.NAME)
|
||||
// Also ignore the zen2 discovery task
|
||||
|| task.contains(GetDiscoveredNodesAction.NAME);
|
||||
return task.contains(RollupJob.NAME);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue