Resolve some coordination-layer TODOs (#54511)
This commit removes a handful of TODO comments in the cluster coordination layer that no longer apply. Relates #32006
This commit is contained in:
parent
1fe2705826
commit
6d976e1468
|
@ -160,7 +160,6 @@ public class ClusterFormationFailureHelper {
|
||||||
if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
|
if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
|
||||||
bootstrappingDescription = "[" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is empty on this node";
|
bootstrappingDescription = "[" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is empty on this node";
|
||||||
} else {
|
} else {
|
||||||
// TODO update this when we can bootstrap on only a quorum of the initial nodes
|
|
||||||
bootstrappingDescription = String.format(Locale.ROOT,
|
bootstrappingDescription = String.format(Locale.ROOT,
|
||||||
"this node must discover master-eligible nodes %s to bootstrap a cluster",
|
"this node must discover master-eligible nodes %s to bootstrap a cluster",
|
||||||
INITIAL_MASTER_NODES_SETTING.get(settings));
|
INITIAL_MASTER_NODES_SETTING.get(settings));
|
||||||
|
|
|
@ -121,9 +121,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||||
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
|
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
|
||||||
private final NoMasterBlockService noMasterBlockService;
|
private final NoMasterBlockService noMasterBlockService;
|
||||||
// TODO: the following field is package-private as some tests require access to it
|
final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held
|
||||||
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
|
|
||||||
final Object mutex = new Object();
|
|
||||||
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
|
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
|
||||||
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier
|
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier
|
||||||
|
|
||||||
|
@ -885,7 +883,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
|
|
||||||
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
|
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
|
||||||
// automatically generate a UID for the metadata if we need to
|
// automatically generate a UID for the metadata if we need to
|
||||||
metadataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
|
metadataBuilder.generateClusterUuidIfNeeded();
|
||||||
metadataBuilder.coordinationMetadata(coordinationMetadata);
|
metadataBuilder.coordinationMetadata(coordinationMetadata);
|
||||||
|
|
||||||
coordinationState.get().setInitialState(ClusterState.builder(currentState).metadata(metadataBuilder).build());
|
coordinationState.get().setInitialState(ClusterState.builder(currentState).metadata(metadataBuilder).build());
|
||||||
|
@ -1192,7 +1190,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
|
final TimeValue gracePeriod = TimeValue.ZERO;
|
||||||
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
|
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -1225,7 +1223,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
public Iterable<DiscoveryNode> getFoundPeers() {
|
public Iterable<DiscoveryNode> getFoundPeers() {
|
||||||
// TODO everyone takes this and adds the local node. Maybe just add the local node here?
|
|
||||||
return peerFinder.getFoundPeers();
|
return peerFinder.getFoundPeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,6 @@ public class FollowersChecker {
|
||||||
FastResponseState responder = this.fastResponseState;
|
FastResponseState responder = this.fastResponseState;
|
||||||
|
|
||||||
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
|
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
|
||||||
// TODO trigger a term bump if we voted for a different leader in this term
|
|
||||||
logger.trace("responding to {} on fast path", request);
|
logger.trace("responding to {} on fast path", request);
|
||||||
transportChannel.sendResponse(Empty.INSTANCE);
|
transportChannel.sendResponse(Empty.INSTANCE);
|
||||||
return;
|
return;
|
||||||
|
@ -205,15 +204,6 @@ public class FollowersChecker {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO in the PoC a faulty node was considered non-faulty again if it sent us a PeersRequest:
|
|
||||||
// - node disconnects, detected faulty, removal is enqueued
|
|
||||||
// - node reconnects, pings us, finds we are master, requests to join, all before removal is applied
|
|
||||||
// - join is processed before removal, but we do not publish to known-faulty nodes so the joining node does not receive this publication
|
|
||||||
// - it doesn't start its leader checker since it receives nothing to cause it to become a follower
|
|
||||||
// Apparently this meant that it remained a candidate for too long, leading to a test failure. At the time this logic was added, we did
|
|
||||||
// not have gossip-based discovery which would (I think) have retried this joining process a short time later. It's therefore possible
|
|
||||||
// that this is no longer required, so it's omitted here until we can be sure if it's necessary or not.
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return nodes in the current cluster state which have failed their follower checks.
|
* @return nodes in the current cluster state which have failed their follower checks.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster.coordination;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
|
||||||
public class InMemoryPersistedState implements CoordinationState.PersistedState {
|
public class InMemoryPersistedState implements CoordinationState.PersistedState {
|
||||||
// TODO add support and tests for behaviour with persistence-layer failures
|
|
||||||
|
|
||||||
private long currentTerm;
|
private long currentTerm;
|
||||||
private ClusterState acceptedState;
|
private ClusterState acceptedState;
|
||||||
|
|
|
@ -63,7 +63,6 @@ public class PreVoteCollector {
|
||||||
this.updateMaxTermSeen = updateMaxTermSeen;
|
this.updateMaxTermSeen = updateMaxTermSeen;
|
||||||
this.electionStrategy = electionStrategy;
|
this.electionStrategy = electionStrategy;
|
||||||
|
|
||||||
// TODO does this need to be on the generic threadpool or can it use SAME?
|
|
||||||
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
|
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
|
||||||
PreVoteRequest::new,
|
PreVoteRequest::new,
|
||||||
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
|
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
|
||||||
|
|
|
@ -244,7 +244,6 @@ public abstract class Publication {
|
||||||
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST;
|
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST;
|
||||||
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
|
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
|
||||||
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
|
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
|
||||||
// TODO Can this ^ fail with an exception? Target should be failed if so.
|
|
||||||
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
|
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,7 +449,6 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
||||||
logger.info("--> blackholing leader {}", originalLeader);
|
logger.info("--> blackholing leader {}", originalLeader);
|
||||||
originalLeader.blackhole();
|
originalLeader.blackhole();
|
||||||
|
|
||||||
// This stabilisation time bound is undesirably long. TODO try and reduce it.
|
|
||||||
cluster.stabilise(Math.max(
|
cluster.stabilise(Math.max(
|
||||||
// first wait for all the followers to notice the leader has gone
|
// first wait for all the followers to notice the leader has gone
|
||||||
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
|
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
|
||||||
|
@ -686,7 +685,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
||||||
// cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term won’t be able to publish anything
|
// cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term won’t be able to publish anything
|
||||||
leader.heal();
|
leader.heal();
|
||||||
AckCollector ackCollector = leader.submitValue(randomLong());
|
AckCollector ackCollector = leader.submitValue(randomLong());
|
||||||
cluster.stabilise(); // TODO: check if can find a better bound here
|
cluster.stabilise();
|
||||||
assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
|
assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
|
||||||
assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
|
assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
|
||||||
assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
|
assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
|
||||||
|
@ -739,14 +738,13 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
||||||
cluster.stabilise(
|
cluster.stabilise(
|
||||||
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
|
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
|
||||||
// pre-voting round and proceed to an election, so there cannot be any collisions
|
// pre-voting round and proceed to an election, so there cannot be any collisions
|
||||||
defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
|
defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING)
|
||||||
// Allow two round-trip for pre-voting and voting
|
// Allow two round-trip for pre-voting and voting
|
||||||
+ 4 * DEFAULT_DELAY_VARIABILITY
|
+ 4 * DEFAULT_DELAY_VARIABILITY
|
||||||
// Then a commit of the new leader's first cluster state
|
// Then a commit of the new leader's first cluster state
|
||||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||||
// Then allow time for all the other nodes to join, each of which might cause a reconfiguration
|
// Then allow time for all the other nodes to join, each of which might cause a reconfiguration
|
||||||
+ (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
+ (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||||
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ public class PreVoteCollectorTests extends ESTestCase {
|
||||||
assert electionOccurred == false;
|
assert electionOccurred == false;
|
||||||
electionOccurred = true;
|
electionOccurred = true;
|
||||||
}, l -> {
|
}, l -> {
|
||||||
}, ElectionStrategy.DEFAULT_INSTANCE); // TODO need tests that check that the max term seen is updated
|
}, ElectionStrategy.DEFAULT_INSTANCE);
|
||||||
preVoteCollector.update(getLocalPreVoteResponse(), null);
|
preVoteCollector.update(getLocalPreVoteResponse(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -244,7 +244,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
|
|
||||||
final List<ClusterNode> clusterNodes;
|
final List<ClusterNode> clusterNodes;
|
||||||
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
|
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
|
||||||
// TODO does ThreadPool need a node name any more?
|
|
||||||
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
|
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
|
||||||
private boolean disruptStorage;
|
private boolean disruptStorage;
|
||||||
|
|
||||||
|
@ -289,8 +288,13 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
|
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ClusterNode> addNodesAndStabilise(int newNodesCount) {
|
void addNodesAndStabilise(int newNodesCount) {
|
||||||
final List<ClusterNode> addedNodes = addNodes(newNodesCount);
|
|
||||||
|
// The stabilisation time bound is O(#new nodes) which isn't ideal; it's possible that the real bound is O(1) since node-join
|
||||||
|
// events are batched together, but in practice we have not seen problems in this area so have not invested the time needed to
|
||||||
|
// investigate this more closely.
|
||||||
|
|
||||||
|
addNodes(newNodesCount);
|
||||||
stabilise(
|
stabilise(
|
||||||
// The first pinging discovers the master
|
// The first pinging discovers the master
|
||||||
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
|
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
|
||||||
|
@ -299,8 +303,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
|
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
|
||||||
// followup reconfiguration
|
// followup reconfiguration
|
||||||
+ newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
+ newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||||
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
|
|
||||||
return addedNodes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ClusterNode> addNodes(int newNodesCount) {
|
List<ClusterNode> addNodes(int newNodesCount) {
|
||||||
|
@ -331,7 +333,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
*/
|
*/
|
||||||
void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) {
|
void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) {
|
||||||
|
|
||||||
// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
|
|
||||||
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
|
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
|
||||||
assertThat("may reconnect blackholed nodes, probably unexpected", blackholedNodes, empty());
|
assertThat("may reconnect blackholed nodes, probably unexpected", blackholedNodes, empty());
|
||||||
|
|
||||||
|
@ -449,11 +450,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
deterministicTaskQueue.runRandomTask();
|
deterministicTaskQueue.runRandomTask();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO other random steps:
|
|
||||||
// - reboot a node
|
|
||||||
// - abdicate leadership
|
|
||||||
|
|
||||||
} catch (CoordinationStateRejectedException | UncheckedIOException ignored) {
|
} catch (CoordinationStateRejectedException | UncheckedIOException ignored) {
|
||||||
// This is ok: it just means a message couldn't currently be handled.
|
// This is ok: it just means a message couldn't currently be handled.
|
||||||
}
|
}
|
||||||
|
@ -1254,7 +1250,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCommit(TimeValue commitTime) {
|
public void onCommit(TimeValue commitTime) {
|
||||||
// TODO we only currently care about per-node acks
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue