Gather votes from all nodes (#34335)

Today we accept that some nodes may vote for the wrong master in an election.
This is mostly fine because they do end up joining the correct master in the
end, but the lack of a vote from every follower may prevent a future desirable
reconfiguration from taking place.

The solution is to hold another election in a yet-higher term in order to
collect a complete set of votes. Elections are somewhat disruptive so we should
think carefully about when this election should take place. One option is to
wait as late as possible (on the grounds that it might not ever be necessary).
This unfortunately makes it harder to predict how an
apparently-smoothly-running cluster will react to nodes leaving and joining.
Instead we prefer to perform the election as soon as possible in the leader's
term, adding "votes from all followers" to the invariants that we expect to
hold in a stable cluster. The start of a leader's term is already a somewhat
disrupted time for the cluster, so performing another election at this point
does not materially change the cluster's behaviour.

This change implements the logic needed to trigger a new election in order to
satisfy this extra stabilisation condition.
This commit is contained in:
David Turner 2018-10-06 07:22:04 +01:00 committed by GitHub
parent 29d7d1d503
commit 03da4f6c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 145 additions and 61 deletions

View File

@ -222,8 +222,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
if (sourceNode.equals(getLocalNode()) == false) {
becomeFollower("handlePublishRequest", sourceNode);
if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
}
return new PublishWithJoinResponse(publishResponse,
@ -254,27 +256,31 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
private void updateMaxTermSeen(final long term) {
maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
// TODO if we are leader here, and there is no publication in flight, then we should bump our term
// (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but
// do check for this after the publication completes)
final long updatedMaxTermSeen = maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
synchronized (mutex) {
if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) {
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too.
ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen);
startElection();
}
}
}
// TODO: make private again after removing term-bump workaround
void startElection() {
private void startElection() {
synchronized (mutex) {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
if (mode == Mode.CANDIDATE) {
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1);
logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
}
}
}
// TODO: make private again after removing term-bump workaround
Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
@ -289,9 +295,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm"); // updates followersChecker
becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector
} else {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
preVoteCollector.update(getPreVoteResponse(), null);
}
return join;
}
@ -485,6 +492,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
@ -517,6 +526,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert leaderCheckScheduler != null;
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
@ -528,6 +539,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert followersChecker.getKnownFollowers().isEmpty();
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
}
}
}
@ -537,7 +550,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return coordinationState.get().containsJoinVoteFor(localNode);
}
void handleJoin(Join join) {
private void handleJoin(Join join) {
synchronized (mutex) {
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
@ -547,7 +560,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
try {
coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug("failed to add join, ignoring", e);
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
}
} else {
coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
@ -753,6 +766,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
// We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
// safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
private final List<Join> receivedJoins = new ArrayList<>();
private boolean receivedJoinsProcessed;
CoordinatorPublication(PublishRequest publishRequest, ListenableFuture<Void> localNodeAckEvent, AckListener ackListener,
ActionListener<Void> publishListener) {
super(Coordinator.this.settings, publishRequest,
@ -790,6 +808,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert currentPublication.get() == this;
currentPublication = Optional.empty();
logger.debug("publication ended unsuccessfully: {}", this);
// check if node has not already switched modes (by bumping term)
if (isActiveForCurrentLeader()) {
@ -812,6 +831,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert committed;
receivedJoins.forEach(CoordinatorPublication.this::handleAssociatedJoin);
assert receivedJoinsProcessed == false;
receivedJoinsProcessed = true;
clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
@ -828,6 +851,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
currentPublication = Optional.empty();
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());
}
@ -850,6 +874,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}, EsExecutors.newDirectExecutorService());
}
private void handleAssociatedJoin(Join join) {
if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) {
logger.trace("handling {}", join);
handleJoin(join);
}
}
@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
@ -867,10 +898,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void onJoin(Join join) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (join.getTerm() == getCurrentTerm()) {
handleJoin(join);
if (receivedJoinsProcessed) {
// a late response may arrive after the state has been locally applied, meaning that receivedJoins has already been
// processed, so we have to handle this late response here.
handleAssociatedJoin(join);
} else {
receivedJoins.add(join);
}
}
@Override
protected void onMissingJoin(DiscoveryNode discoveryNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
if (hasJoinVoteFrom(discoveryNode) == false) {
final long term = publishRequest.getAcceptedState().term();
logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term);
updateMaxTermSeen(term + 1);
}
// TODO: what to do on missing join?
}
@Override

View File

@ -79,19 +79,33 @@ public class PreVoteCollector extends AbstractComponent {
return preVotingRound;
}
// only for testing
PreVoteResponse getPreVoteResponse() {
return state.v2();
}
// only for testing
@Nullable
DiscoveryNode getLeader() {
return state.v1();
}
public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// TODO if we are a leader and the max term seen exceeds our term then we need to bump our term
updateMaxTermSeen.accept(request.getCurrentTerm());
Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
assert state != null : "received pre-vote request before fully initialised";
final DiscoveryNode leader = state.v1();
final PreVoteResponse response = state.v2();
if (leader == null) {
return state.v2();
return response;
}
if (leader.equals(request.getSourceNode())) {
@ -100,7 +114,7 @@ public class PreVoteCollector extends AbstractComponent {
// major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
// leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
// to also detect its failure.
return state.v2();
return response;
}
throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
@ -141,11 +155,7 @@ public class PreVoteCollector extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("{} failed: {}", this, exp.getRootCause().getMessage());
} else {
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}
@Override

View File

@ -172,6 +172,8 @@ public abstract class Publication extends AbstractComponent {
protected abstract void onJoin(Join join);
protected abstract void onMissingJoin(DiscoveryNode discoveryNode);
protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
@ -301,10 +303,16 @@ public abstract class Publication extends AbstractComponent {
return;
}
response.getJoin().ifPresent(join -> {
if (response.getJoin().isPresent()) {
final Join join = response.getJoin().get();
assert discoveryNode.equals(join.getSourceNode());
assert join.getTerm() == response.getPublishResponse().getTerm() : response;
logger.trace("handling join within publish response: {}", join);
onJoin(join);
});
} else {
logger.trace("publish response from {} contained no join", discoveryNode);
onMissingJoin(discoveryNode);
}
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;

View File

@ -30,8 +30,8 @@ import java.io.IOException;
*/
public class PublishResponse implements Writeable {
protected final long term;
protected final long version;
private final long term;
private final long version;
public PublishResponse(long term, long version) {
assert term >= 0;

View File

@ -25,10 +25,12 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
@ -37,6 +39,7 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -82,7 +85,11 @@ public class CoordinationStateTests extends ESTestCase {
}
public static DiscoveryNode createNode(String id) {
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", id,
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
EnumSet.allOf(Role.class), Version.CURRENT);
}
public void testSetInitialState() {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
@ -32,10 +31,8 @@ import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -54,8 +51,6 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -75,6 +70,7 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.valu
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING;
@ -187,15 +183,26 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> blackholing leader {}", originalLeader);
originalLeader.blackhole();
// This stabilisation time bound is undesirably long. TODO try and reduce it.
cluster.stabilise(Math.max(
// first wait for all the followers to notice the leader has gone
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
// and the first publication times out because of the unresponsive node
+ defaultMillis(PUBLISH_TIMEOUT_SETTING)
// there might be a term bump causing another election
+ DEFAULT_ELECTION_DELAY
// then wait for both of:
+ Math.max(
// 1. the term bumping publication to time out
defaultMillis(PUBLISH_TIMEOUT_SETTING),
// 2. the new leader to notice that the old leader is unresponsive
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING))
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
@ -205,6 +212,7 @@ public class CoordinatorTests extends ESTestCase {
// then wait for the leader to try and commit a state removing them, causing it to stand down
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
@ -599,31 +607,33 @@ public class CoordinatorTests extends ESTestCase {
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
runFor(stabilisationDurationMillis, "stabilising");
fixLag();
assertUniqueLeaderAndExpectedModes();
}
// TODO remove when term-bumping is enabled
final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
final long maxLeaderTerm = clusterNodes.stream().filter(n -> n.coordinator.getMode() == Coordinator.Mode.LEADER)
.map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
// TODO remove this when lag detection is implemented
void fixLag() {
final ClusterNode leader = getAnyLeader();
final long leaderVersion = leader.coordinator.getLastAcceptedState().version();
final long minVersion = clusterNodes.stream()
.filter(n -> isConnectedPair(n, leader))
.map(n -> n.coordinator.getLastAcceptedState().version()).min(Long::compare).orElse(Long.MIN_VALUE);
if (maxLeaderTerm < maxTerm) {
logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm);
final ClusterNode leader = getAnyLeader();
assert minVersion >= 0;
if (minVersion < leaderVersion) {
logger.info("--> publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion);
onNode(leader.getLocalNode(), () -> {
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
leader.submitValue(randomLong());
}
leader.coordinator.startElection();
}).run();
runFor(DEFAULT_ELECTION_DELAY, "re-stabilising after term bump");
}
logger.info("--> end of stabilisation");
assertUniqueLeaderAndExpectedModes();
runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication");
}
void runFor(long runDurationMillis, String description) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
logger.info("----> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);
logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);
while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) {
@ -648,7 +658,7 @@ public class CoordinatorTests extends ESTestCase {
deterministicTaskQueue.advanceTime();
}
logger.info("----> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description);
logger.info("--> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description);
}
private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
@ -677,7 +687,7 @@ public class CoordinatorTests extends ESTestCase {
if (isConnectedPair(leader, clusterNode)) {
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that this node has actually voted for the leader in this term
assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
// TODO assert that this node's accepted and committed states are the same as the leader's
assertTrue(nodeId + " is in the leader's applied state",
@ -754,13 +764,7 @@ public class CoordinatorTests extends ESTestCase {
}
private DiscoveryNode createDiscoveryNode() {
final TransportAddress transportAddress = buildNewFakeTransportAddress();
// Generate the ephemeral ID deterministically, for repeatable tests. This means we have to pass everything else into the
// constructor explicitly too.
return new DiscoveryNode("", nodeIdFromIndex(nodeIndex), UUIDs.randomBase64UUID(random()),
transportAddress.address().getHostString(),
transportAddress.getAddress(), transportAddress, Collections.emptyMap(),
EnumSet.allOf(Role.class), Version.CURRENT);
return CoordinationStateTests.createNode(nodeIdFromIndex(nodeIndex));
}
private void setUp() {

View File

@ -101,6 +101,7 @@ public class PublicationTests extends ESTestCase {
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
Map<DiscoveryNode, Join> joins = new HashMap<>();
Set<DiscoveryNode> missingJoins = new HashSet<>();
MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener,
LongSupplier currentTimeSupplier) {
@ -120,6 +121,11 @@ public class PublicationTests extends ESTestCase {
assertNull(joins.put(join.getSourceNode(), join));
}
@Override
protected void onMissingJoin(DiscoveryNode discoveryNode) {
assertTrue(missingJoins.add(discoveryNode));
}
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
@ -182,14 +188,16 @@ public class PublicationTests extends ESTestCase {
assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty());
assertFalse(publication.joins.containsKey(e.getKey()));
PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse,
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(),
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), publishResponse.getTerm(),
randomNonNegativeLong(), randomNonNegativeLong())));
e.getValue().onResponse(publishWithJoinResponse);
if (publishWithJoinResponse.getJoin().isPresent()) {
assertTrue(publication.joins.containsKey(e.getKey()));
assertFalse(publication.missingJoins.contains(e.getKey()));
assertEquals(publishWithJoinResponse.getJoin().get(), publication.joins.get(e.getKey()));
} else {
assertFalse(publication.joins.containsKey(e.getKey()));
assertTrue(publication.missingJoins.contains(e.getKey()));
}
if (e.getKey().equals(n1)) {
processedNode1PublishResponse.set(true);