Allow joining node to trigger term bump (#53338)

In rare circumstances it is possible for an isolated node to have a greater
term than the currently-elected leader. Today such a node will attempt to join
the cluster but will not offer a vote to the leader and will reject its cluster
state publications due to their stale term. This situation persists since there
is no mechanism for the joining node to inform the leader that its term is
stale and a new election is required.

This commit adds the current term of the joining node to the join request. Once
the join has been validated, the leader will perform another election to
increase its term far enough to allow the isolated node to join properly.

Fixes #53271
This commit is contained in:
David Turner 2020-03-11 09:03:54 +00:00
parent a9dd7773d2
commit ac721938c2
8 changed files with 104 additions and 36 deletions

View File

@ -519,6 +519,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
updateMaxTermSeen(joinRequest.getTerm());
final CoordinationState coordState = coordinationState.get();
final boolean prevElectionWon = coordState.electionWon();
@ -1148,7 +1150,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
synchronized (mutex) {
ensureTermAtLeast(masterNode, term);
joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term));
joinHelper.sendJoinRequest(masterNode, getCurrentTerm(), joinWithDestination(lastJoin, masterNode, term));
}
}

View File

@ -263,7 +263,7 @@ public class DiscoveryUpgradeService {
if (isRunning()) {
final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates);
logger.debug("elected {}, sending join", electedMaster);
joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(),
joinHelper.sendJoinRequest(electedMaster.getNode(), 0L, Optional.empty(),
JoiningRound.this::scheduleNextAttempt);
}
}

View File

@ -130,14 +130,15 @@ public class JoinHelper {
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC, false, false, MembershipAction.JoinRequest::new,
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), // treat as non-voting join
(request, channel, task) ->
joinHandler.accept(new JoinRequest(request.getNode(), 0L, Optional.empty()), // treat as non-voting join
transportJoinCallback(request, channel)));
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
(request, channel, task) -> {
final DiscoveryNode destination = request.getSourceNode();
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request)));
channel.sendResponse(Empty.INSTANCE);
});
@ -212,8 +213,8 @@ public class JoinHelper {
return pendingOutgoingJoins.isEmpty() == false;
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
sendJoinRequest(destination, optionalJoin, () -> {
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
sendJoinRequest(destination, term, optionalJoin, () -> {
});
}
@ -265,9 +266,9 @@ public class JoinHelper {
}
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);

View File

@ -18,29 +18,53 @@
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
public class JoinRequest extends TransportRequest {
/**
* The sending (i.e. joining) node.
*/
private final DiscoveryNode sourceNode;
/**
* The minimum term for which the joining node will accept any cluster state publications. If the joining node is in a strictly greater
* term than the master it wants to join then the master must enter a new term and hold another election. Doesn't necessarily match
* {@link JoinRequest#optionalJoin} and may be zero in join requests sent prior to {@link Version#V_7_7_0}.
*/
private final long minimumTerm;
/**
* A vote for the receiving node. This vote is optional since the sending node may have voted for a different master in this term.
* That's ok, the sender likely discovered that the master we voted for lost the election and now we're trying to join the winner. Once
* the sender has successfully joined the master, the lack of a vote in its term causes another election (see
* {@link Publication#onMissingJoin(DiscoveryNode)}).
*/
private final Optional<Join> optionalJoin;
public JoinRequest(DiscoveryNode sourceNode, Optional<Join> optionalJoin) {
public JoinRequest(DiscoveryNode sourceNode, long minimumTerm, Optional<Join> optionalJoin) {
assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode);
this.sourceNode = sourceNode;
this.minimumTerm = minimumTerm;
this.optionalJoin = optionalJoin;
}
public JoinRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
minimumTerm = in.readLong();
} else {
minimumTerm = 0L;
}
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
}
@ -48,6 +72,9 @@ public class JoinRequest extends TransportRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeLong(minimumTerm);
}
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
@ -55,6 +82,17 @@ public class JoinRequest extends TransportRequest {
return sourceNode;
}
public long getMinimumTerm() {
return minimumTerm;
}
public long getTerm() {
// If the join is also present then its term will normally equal the corresponding term, but we do not require callers to
// obtain the term and the join in a synchronized fashion so it's possible that they disagree. Also older nodes do not share the
// minimum term, so for BWC we can take it from the join if present.
return Math.max(minimumTerm, optionalJoin.map(Join::getTerm).orElse(0L));
}
public Optional<Join> getOptionalJoin() {
return optionalJoin;
}
@ -66,21 +104,21 @@ public class JoinRequest extends TransportRequest {
JoinRequest that = (JoinRequest) o;
if (minimumTerm != that.minimumTerm) return false;
if (!sourceNode.equals(that.sourceNode)) return false;
return optionalJoin.equals(that.optionalJoin);
}
@Override
public int hashCode() {
int result = sourceNode.hashCode();
result = 31 * result + optionalJoin.hashCode();
return result;
return Objects.hash(sourceNode, minimumTerm, optionalJoin);
}
@Override
public String toString() {
return "JoinRequest{" +
"sourceNode=" + sourceNode +
", minimumTerm=" + minimumTerm +
", optionalJoin=" + optionalJoin +
'}';
}

View File

@ -69,7 +69,7 @@ public class JoinHelperTests extends ESTestCase {
// check that sending a join to node1 works
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1.length, equalTo(1));
CapturedRequest capturedRequest1 = capturedRequests1[0];
@ -80,14 +80,14 @@ public class JoinHelperTests extends ESTestCase {
// check that sending a join to node2 works
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node2, optionalJoin2);
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2);
CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests2.length, equalTo(1));
CapturedRequest capturedRequest2 = capturedRequests2[0];
assertEquals(node2, capturedRequest2.node);
// check that sending another join to node1 is a noop as the previous join is still in progress
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0));
// complete the previous join to node1
@ -98,7 +98,7 @@ public class JoinHelperTests extends ESTestCase {
}
// check that sending another join to node1 now works again
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1a.length, equalTo(1));
CapturedRequest capturedRequest1a = capturedRequests1a[0];
@ -107,7 +107,7 @@ public class JoinHelperTests extends ESTestCase {
// check that sending another join to node2 works if the optionalJoin is different
Optional<Join> optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node2, optionalJoin2a);
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2a);
CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests2a.length, equalTo(1));
CapturedRequest capturedRequest2a = capturedRequests2a[0];

View File

@ -179,13 +179,18 @@ public class MessagesTests extends ESTestCase {
Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong());
JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(),
randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
// Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest,
(CopyFunction<JoinRequest>) joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new),
joinRequest -> {
if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) {
return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin());
return new JoinRequest(createNode(randomAlphaOfLength(10)),
joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin());
} else if (randomBoolean()) {
return new JoinRequest(joinRequest.getSourceNode(),
randomValueOtherThan(joinRequest.getMinimumTerm(), ESTestCase::randomNonNegativeLong),
joinRequest.getOptionalJoin());
} else {
// change OptionalJoin
final Optional<Join> newOptionalJoin;
@ -195,7 +200,7 @@ public class MessagesTests extends ESTestCase {
newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
}
return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin);
return new JoinRequest(joinRequest.getSourceNode(), joinRequest.getMinimumTerm(), newOptionalJoin);
}
});
}

View File

@ -74,6 +74,7 @@ import static java.util.Collections.singletonList;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class NodeJoinTests extends ESTestCase {
@ -278,7 +279,8 @@ public class NodeJoinTests extends ESTestCase {
assertFalse(isLocalNodeElectedMaster());
assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId());
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, newTerm,
Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
assertEquals(Coordinator.Mode.LEADER, coordinator.getMode());
assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId());
deterministicTaskQueue.runAllRunnableTasks();
@ -298,7 +300,8 @@ public class NodeJoinTests extends ESTestCase {
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
expectThrows(CoordinationStateRejectedException.class,
() -> joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))));
() -> joinNodeAndRun(new JoinRequest(node1, newTerm,
Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))));
assertFalse(isLocalNodeElectedMaster());
}
@ -312,7 +315,7 @@ public class NodeJoinTests extends ESTestCase {
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))));
joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))));
assertTrue(isLocalNodeElectedMaster());
}
@ -325,14 +328,32 @@ public class NodeJoinTests extends ESTestCase {
new VotingConfiguration(Collections.singleton(node0.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertFalse(clusterStateHasNode(node1));
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertTrue(clusterStateHasNode(node1));
}
public void testJoinElectedLeaderWithHigherTerm() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
long newerTerm = newTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1, newerTerm, Optional.empty()));
assertThat(coordinator.getCurrentTerm(), greaterThanOrEqualTo(newerTerm));
assertTrue(isLocalNodeElectedMaster());
}
public void testJoinAccumulation() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
@ -343,17 +364,17 @@ public class NodeJoinTests extends ESTestCase {
new VotingConfiguration(Collections.singleton(node2.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of(
SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of(
new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(futNode0.isDone());
assertFalse(isLocalNodeElectedMaster());
SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of(
SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, newTerm, Optional.of(
new Join(node1, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(futNode1.isDone());
assertFalse(isLocalNodeElectedMaster());
joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion))));
joinNodeAndRun(new JoinRequest(node2, newTerm, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertTrue(clusterStateHasNode(node1));
assertTrue(clusterStateHasNode(node2));
@ -372,7 +393,7 @@ public class NodeJoinTests extends ESTestCase {
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
long newerTerm = newTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1,
joinNodeAndRun(new JoinRequest(node1, newerTerm,
Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
}
@ -447,7 +468,7 @@ public class NodeJoinTests extends ESTestCase {
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
assertThat(expectThrows(CoordinationStateRejectedException.class,
() -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(),
() -> joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.empty()))).getMessage(),
containsString("join target is a follower"));
assertFalse(isLocalNodeElectedMaster());
}
@ -460,7 +481,8 @@ public class NodeJoinTests extends ESTestCase {
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node1.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, newTerm,
Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(fut.isDone());
assertFalse(isLocalNodeElectedMaster());
@ -501,7 +523,7 @@ public class NodeJoinTests extends ESTestCase {
logger.info("Successful voting nodes: {}", successfulNodes);
List<JoinRequest> correctJoinRequests = successfulNodes.stream().map(
node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
node -> new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
.collect(Collectors.toList());
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(allNodes);
@ -512,15 +534,15 @@ public class NodeJoinTests extends ESTestCase {
List<JoinRequest> possiblyFailingJoinRequests = possiblyUnsuccessfulNodes.stream().map(node -> {
if (randomBoolean()) {
// a correct request
return new JoinRequest(node, Optional.of(new Join(node, localNode,
return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode,
newTerm, initialTerm, initialVersion)));
} else if (randomBoolean()) {
// term too low
return new JoinRequest(node, Optional.of(new Join(node, localNode,
return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode,
randomLongBetween(0, initialTerm), initialTerm, initialVersion)));
} else {
// better state
return new JoinRequest(node, Optional.of(new Join(node, localNode,
return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode,
newTerm, initialTerm, initialVersion + randomLongBetween(1, 10))));
}
}).collect(Collectors.toList());

View File

@ -113,7 +113,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
final CompletableFuture<Throwable> future = new CompletableFuture<>();
DiscoveryNode node = state.nodes().getLocalNode();
coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()),
coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, 0L, Optional.empty()),
new JoinHelper.JoinCallback() {
@Override
public void onSuccess() {