Introduce PreVoteCollector (#32847)

An election requires a node to select a term that is higher than all
previously-seen terms.  If nodes are too enthusiastic about starting elections
then they can effectively excludes itself from the cluster until the leader can
bump to a still-higher term, and if this process repeats then a single faulty
node can prevent the cluster from making useful progress.

The solution is to start the election with a pre-voting round to ensure that
there is at least a quorum of nodes who believe there to be no leader.

This also fixes up some merge issues.
This commit is contained in:
David Turner 2018-08-20 17:48:05 +01:00 committed by GitHub
parent f6891cd222
commit cd6326b391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 773 additions and 41 deletions

View File

@ -99,7 +99,12 @@ public class CoordinationState extends AbstractComponent {
}
public boolean isElectionQuorum(VoteCollection votes) {
return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration());
return isElectionQuorum(votes, getLastAcceptedState());
}
static boolean isElectionQuorum(VoteCollection votes, ClusterState lastAcceptedState) {
return votes.isQuorum(lastAcceptedState.getLastCommittedConfiguration())
&& votes.isQuorum(lastAcceptedState.getLastAcceptedConfiguration());
}
public boolean isPublishQuorum(VoteCollection votes) {

View File

@ -0,0 +1,202 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
public class PreVoteCollector extends AbstractComponent {
public static final String REQUEST_PRE_VOTE_ACTION_NAME = "internal:cluster/request_pre_vote";
private final TransportService transportService;
private final Runnable startElection;
// Tuple for simple atomic updates
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader
PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
final TransportService transportService, final Runnable startElection) {
super(settings);
state = new Tuple<>(null, preVoteResponse);
this.transportService = transportService;
this.startElection = startElection;
// TODO does this need to be on the generic threadpool or can it use SAME?
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
PreVoteRequest::new,
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
}
/**
* Start a new pre-voting round.
*
* @param clusterState the last-accepted cluster state
* @param broadcastNodes the nodes from whom to request pre-votes
* @return the pre-voting round, which can be closed to end the round early.
*/
public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {
PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
preVotingRound.start(broadcastNodes);
return preVotingRound;
}
public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// TODO if we are a leader and the max term seen exceeds our term then we need to bump our term
Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
final DiscoveryNode leader = state.v1();
if (leader == null) {
return state.v2();
}
if (leader.equals(request.getSourceNode())) {
// This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
// that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
// major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
// leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
// to also detect its failure.
return state.v2();
}
throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}
@Override
public String toString() {
return "PreVoteCollector{" +
"state=" + state +
'}';
}
private class PreVotingRound implements Releasable {
private final Set<DiscoveryNode> preVotesReceived = newConcurrentSet();
private final AtomicBoolean electionStarted = new AtomicBoolean();
private final PreVoteRequest preVoteRequest;
private final ClusterState clusterState;
private final AtomicBoolean isClosed = new AtomicBoolean();
PreVotingRound(final ClusterState clusterState, final long currentTerm) {
this.clusterState = clusterState;
preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm);
}
void start(final Iterable<DiscoveryNode> broadcastNodes) {
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler<PreVoteResponse>() {
@Override
public void handleResponse(PreVoteResponse response) {
handlePreVoteResponse(response, n);
}
@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("{} failed: {}", this, exp.getRootCause().getMessage());
} else {
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}
}
@Override
public String executor() {
return Names.GENERIC;
}
@Override
public String toString() {
return "TransportResponseHandler{" + PreVoteCollector.this + ", node=" + n + '}';
}
}));
}
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
if (isClosed.get()) {
logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
return;
}
// TODO the response carries the sender's current term. If an election starts then it should be in a higher term.
if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term()
&& response.getLastAcceptedVersion() > clusterState.version())) {
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
return;
}
preVotesReceived.add(sender);
final VoteCollection voteCollection = new VoteCollection();
preVotesReceived.forEach(voteCollection::addVote);
if (isElectionQuorum(voteCollection, clusterState) == false) {
logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
return;
}
if (electionStarted.compareAndSet(false, true) == false) {
logger.debug("{} added {} from {} but election has already started", this, response, sender);
return;
}
logger.debug("{} added {} from {}, starting election", this, response, sender);
startElection.run();
}
@Override
public String toString() {
return "PreVotingRound{" +
"preVotesReceived=" + preVotesReceived +
", electionStarted=" + electionStarted +
", preVoteRequest=" + preVoteRequest +
", isClosed=" + isClosed +
'}';
}
@Override
public void close() {
final boolean isNotAlreadyClosed = isClosed.compareAndSet(false, true);
assert isNotAlreadyClosed;
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Objects;
public class PreVoteRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
private final long currentTerm;
public PreVoteRequest(DiscoveryNode sourceNode, long currentTerm) {
this.sourceNode = sourceNode;
this.currentTerm = currentTerm;
}
public PreVoteRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
currentTerm = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeLong(currentTerm);
}
public DiscoveryNode getSourceNode() {
return sourceNode;
}
public long getCurrentTerm() {
return currentTerm;
}
@Override
public String toString() {
return "PreVoteRequest{" +
"sourceNode=" + sourceNode +
", currentTerm=" + currentTerm +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreVoteRequest that = (PreVoteRequest) o;
return currentTerm == that.currentTerm &&
Objects.equals(sourceNode, that.sourceNode);
}
@Override
public int hashCode() {
return Objects.hash(sourceNode, currentTerm);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Objects;
public class PreVoteResponse extends TransportResponse {
private final long currentTerm;
private final long lastAcceptedTerm;
private final long lastAcceptedVersion;
public PreVoteResponse(long currentTerm, long lastAcceptedTerm, long lastAcceptedVersion) {
this.currentTerm = currentTerm;
this.lastAcceptedTerm = lastAcceptedTerm;
this.lastAcceptedVersion = lastAcceptedVersion;
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm;
}
public PreVoteResponse(StreamInput in) throws IOException {
currentTerm = in.readLong();
lastAcceptedTerm = in.readLong();
lastAcceptedVersion = in.readLong();
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(currentTerm);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
}
public long getCurrentTerm() {
return currentTerm;
}
public long getLastAcceptedTerm() {
return lastAcceptedTerm;
}
public long getLastAcceptedVersion() {
return lastAcceptedVersion;
}
@Override
public String toString() {
return "PreVoteResponse{" +
"currentTerm=" + currentTerm +
", lastAcceptedTerm=" + lastAcceptedTerm +
", lastAcceptedVersion=" + lastAcceptedVersion +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreVoteResponse that = (PreVoteResponse) o;
return currentTerm == that.currentTerm &&
lastAcceptedTerm == that.lastAcceptedTerm &&
lastAcceptedVersion == that.lastAcceptedVersion;
}
@Override
public int hashCode() {
return Objects.hash(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
}
}

View File

@ -174,4 +174,50 @@ public class MessagesTests extends ESTestCase {
new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))),
randomLong());
}
public void testPreVoteRequestEqualsHashCodeSerialization() {
PreVoteRequest initialPreVoteRequest = new PreVoteRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPreVoteRequest,
preVoteRequest -> copyWriteable(preVoteRequest, writableRegistry(), PreVoteRequest::new),
preVoteRequest -> {
if (randomBoolean()) {
return new PreVoteRequest(createNode(randomAlphaOfLength(10)), preVoteRequest.getCurrentTerm());
} else {
return new PreVoteRequest(preVoteRequest.getSourceNode(), randomNonNegativeLong());
}
});
}
public void testPreVoteResponseEqualsHashCodeSerialization() {
long currentTerm = randomNonNegativeLong();
PreVoteResponse initialPreVoteResponse
= new PreVoteResponse(currentTerm, randomLongBetween(1, currentTerm), randomNonNegativeLong());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPreVoteResponse,
preVoteResponse -> copyWriteable(preVoteResponse, writableRegistry(), PreVoteResponse::new),
preVoteResponse -> {
switch (randomInt(2)) {
case 0:
assumeTrue("last-accepted term is Long.MAX_VALUE", preVoteResponse.getLastAcceptedTerm() < Long.MAX_VALUE);
return new PreVoteResponse(
randomValueOtherThan(preVoteResponse.getCurrentTerm(),
() -> randomLongBetween(preVoteResponse.getLastAcceptedTerm(), Long.MAX_VALUE)),
preVoteResponse.getLastAcceptedTerm(),
preVoteResponse.getLastAcceptedVersion());
case 1:
assumeTrue("current term is 1", 1 < preVoteResponse.getCurrentTerm());
return new PreVoteResponse(
preVoteResponse.getCurrentTerm(),
randomValueOtherThan(preVoteResponse.getLastAcceptedTerm(),
() -> randomLongBetween(1, preVoteResponse.getCurrentTerm())),
preVoteResponse.getLastAcceptedVersion());
case 2:
return new PreVoteResponse(
preVoteResponse.getCurrentTerm(),
preVoteResponse.getLastAcceptedTerm(),
randomValueOtherThan(preVoteResponse.getLastAcceptedVersion(), ESTestCase::randomNonNegativeLong));
default:
throw new AssertionError();
}
});
}
}

View File

@ -0,0 +1,329 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.PreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.Names.SAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class PreVoteCollectorTests extends ESTestCase {
private DeterministicTaskQueue deterministicTaskQueue;
private PreVoteCollector preVoteCollector;
private boolean electionOccurred = false;
private DiscoveryNode localNode;
private Map<DiscoveryNode, PreVoteResponse> responsesByNode = new HashMap<>();
private long currentTerm, lastAcceptedTerm, lastAcceptedVersion;
private TransportService transportService;
@Before
public void createObjects() {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings);
final CapturingTransport capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(final long requestId, final String action, final TransportRequest request,
final DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
assertThat(action, is(REQUEST_PRE_VOTE_ACTION_NAME));
assertThat(request, instanceOf(PreVoteRequest.class));
assertThat(node, not(equalTo(localNode)));
PreVoteRequest preVoteRequest = (PreVoteRequest) request;
assertThat(preVoteRequest.getSourceNode(), equalTo(localNode));
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
final PreVoteResponse response = responsesByNode.get(node);
if (response == null) {
handleRemoteError(requestId, new ConnectTransportException(node, "no response"));
} else {
handleResponse(requestId, response);
}
}
@Override
public String toString() {
return "response to " + request + " from " + node;
}
});
}
};
lastAcceptedTerm = randomNonNegativeLong();
currentTerm = randomLongBetween(lastAcceptedTerm, Long.MAX_VALUE);
lastAcceptedVersion = randomNonNegativeLong();
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(localNode, new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion));
transportService = capturingTransport.createCapturingTransportService(settings,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
preVoteCollector = new PreVoteCollector(settings, getLocalPreVoteResponse(), transportService, () -> {
assert electionOccurred == false;
electionOccurred = true;
});
}
private PreVoteResponse getLocalPreVoteResponse() {
return Objects.requireNonNull(responsesByNode.get(localNode));
}
private void startAndRunCollector(DiscoveryNode... votingNodes) {
try (Releasable ignored = startCollector(votingNodes)) {
runCollector();
}
}
private void runCollector() {
deterministicTaskQueue.runAllRunnableTasks(random());
assertFalse(deterministicTaskQueue.hasDeferredTasks());
assertFalse(deterministicTaskQueue.hasRunnableTasks());
}
private ClusterState makeClusterState(DiscoveryNode[] votingNodes) {
final VotingConfiguration votingConfiguration
= new VotingConfiguration(Arrays.stream(votingNodes).map(DiscoveryNode::getId).collect(Collectors.toSet()));
return CoordinationStateTests.clusterState(lastAcceptedTerm, lastAcceptedVersion, localNode,
votingConfiguration, votingConfiguration, 0);
}
private Releasable startCollector(DiscoveryNode... votingNodes) {
return preVoteCollector.start(makeClusterState(votingNodes), responsesByNode.keySet());
}
public void testStartsElectionIfLocalNodeIsOnlyNode() {
startAndRunCollector(localNode);
assertTrue(electionOccurred);
}
public void testStartsElectionIfLocalNodeIsQuorum() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, getLocalPreVoteResponse());
startAndRunCollector(otherNode);
assertTrue(electionOccurred);
}
public void testStartsElectionIfOtherNodeIsQuorum() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, getLocalPreVoteResponse());
startAndRunCollector(otherNode);
assertTrue(electionOccurred);
}
public void testDoesNotStartsElectionIfOtherNodeIsQuorumAndDoesNotRespond() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, null);
startAndRunCollector(otherNode);
assertFalse(electionOccurred);
}
public void testDoesNotStartElectionIfStopped() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, getLocalPreVoteResponse());
startCollector(otherNode).close();
runCollector();
assertFalse(electionOccurred);
}
public void testIgnoresPreVotesFromLaterTerms() {
assumeTrue("unluckily chose lastAcceptedTerm too close to currentTerm, no later terms", lastAcceptedTerm < currentTerm - 1);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode,
new PreVoteResponse(currentTerm, randomLongBetween(lastAcceptedTerm + 1, currentTerm - 1), randomNonNegativeLong()));
startAndRunCollector(otherNode);
assertFalse(electionOccurred);
}
public void testIgnoresPreVotesFromLaterVersionInSameTerm() {
assumeTrue("unluckily hit Long.MAX_VALUE for lastAcceptedVersion, cannot increment", lastAcceptedVersion < Long.MAX_VALUE);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode,
new PreVoteResponse(currentTerm, lastAcceptedTerm, randomLongBetween(lastAcceptedVersion + 1, Long.MAX_VALUE)));
startAndRunCollector(otherNode);
assertFalse(electionOccurred);
}
public void testAcceptsPreVotesFromAnyVersionInEarlierTerms() {
assumeTrue("unluckily hit 0 for lastAcceptedTerm, cannot decrement", 0 < lastAcceptedTerm);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode,
new PreVoteResponse(currentTerm, randomLongBetween(0, lastAcceptedTerm - 1), randomNonNegativeLong()));
startAndRunCollector(otherNode);
assertTrue(electionOccurred);
}
private PreVoteResponse randomPreVoteResponse() {
final long currentTerm = randomNonNegativeLong();
return new PreVoteResponse(currentTerm, randomLongBetween(0, currentTerm), randomNonNegativeLong());
}
public void testPrevotingIndicatesElectionSuccess() {
assumeTrue("unluckily hit currentTerm = Long.MAX_VALUE, cannot increment", currentTerm < Long.MAX_VALUE);
final Set<DiscoveryNode> votingNodesSet = new HashSet<>();
final int nodeCount = randomIntBetween(0, 5);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode otherNode = new DiscoveryNode("other-node-" + i, buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, randomBoolean() ? null : randomPreVoteResponse());
PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
preVoteCollector.update(newPreVoteResponse, null);
if (randomBoolean()) {
votingNodesSet.add(otherNode);
}
}
DiscoveryNode[] votingNodes = votingNodesSet.toArray(new DiscoveryNode[0]);
startAndRunCollector(votingNodes);
final CoordinationState coordinationState = new CoordinationState(Settings.EMPTY, localNode,
new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)));
final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);
coordinationState.handleStartJoin(new StartJoinRequest(localNode, newTerm));
responsesByNode.forEach((otherNode, preVoteResponse) -> {
if (preVoteResponse != null) {
try {
coordinationState.handleJoin(new Join(otherNode, localNode, newTerm,
preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion()));
} catch (CoordinationStateRejectedException ignored) {
// ok to reject some joins.
}
}
});
assertThat(coordinationState.electionWon(), equalTo(electionOccurred));
}
private PreVoteResponse handlePreVoteRequestViaTransportService(PreVoteRequest preVoteRequest) {
final AtomicReference<PreVoteResponse> responseRef = new AtomicReference<>();
final AtomicReference<TransportException> exceptionRef = new AtomicReference<>();
transportService.sendRequest(localNode, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler<PreVoteResponse>() {
@Override
public void handleResponse(PreVoteResponse response) {
responseRef.set(response);
}
@Override
public void handleException(TransportException exp) {
exceptionRef.set(exp);
}
@Override
public String executor() {
return SAME;
}
});
deterministicTaskQueue.runAllRunnableTasks(random());
assertFalse(deterministicTaskQueue.hasDeferredTasks());
final PreVoteResponse response = responseRef.get();
final TransportException transportException = exceptionRef.get();
if (transportException != null) {
assertThat(response, nullValue());
throw transportException;
}
assertThat(response, not(nullValue()));
return response;
}
public void testResponseIfCandidate() {
final long term = randomNonNegativeLong();
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
preVoteCollector.update(newPreVoteResponse, null);
assertThat(handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term)), equalTo(newPreVoteResponse));
}
public void testResponseToNonLeaderIfNotCandidate() {
final long term = randomNonNegativeLong();
final DiscoveryNode leaderNode = new DiscoveryNode("leader-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
preVoteCollector.update(newPreVoteResponse, leaderNode);
RemoteTransportException remoteTransportException = expectThrows(RemoteTransportException.class, () ->
handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term)));
assertThat(remoteTransportException.getCause(), instanceOf(CoordinationStateRejectedException.class));
}
public void testResponseToRequestFromLeader() {
// This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible that
// the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no major
// drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the leader
// won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers to also
// detect its failure.
final long term = randomNonNegativeLong();
final DiscoveryNode leaderNode = new DiscoveryNode("leader-node", buildNewFakeTransportAddress(), Version.CURRENT);
PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
preVoteCollector.update(newPreVoteResponse, leaderNode);
assertThat(handlePreVoteRequestViaTransportService(new PreVoteRequest(leaderNode, term)), equalTo(newPreVoteResponse));
}
}

View File

@ -75,14 +75,13 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
super.onSendRequest(requestId, action, request, node);
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
assertEquals(remoteNode.getAddress(), node.getAddress());
assertNotEquals(remoteNode, node);
if (dropHandshake == false) {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
}
}
};
transportService = new TransportService(settings, capturingTransport, threadPool,
transportService = capturingTransport.createCapturingTransportService(settings, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet());
transportService.start();

View File

@ -33,7 +33,9 @@ import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
import org.elasticsearch.test.transport.StubbableConnectionManager;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
@ -191,18 +193,20 @@ public class PeerFinderTests extends ESTestCase {
deterministicTaskQueue = new DeterministicTaskQueue(settings);
localNode = newDiscoveryNode("local-node");
transportService = new TransportService(settings, capturingTransport,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> localNode, null, emptySet()) {
@Override
public boolean nodeConnected(DiscoveryNode node) {
final boolean isConnected = connectedNodes.contains(node);
final boolean isDisconnected = disconnectedNodes.contains(node);
assert isConnected != isDisconnected : node + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
return isConnected;
}
};
ConnectionManager innerConnectionManager
= new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool());
StubbableConnectionManager connectionManager
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool());
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
final boolean isConnected = connectedNodes.contains(discoveryNode);
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
return isConnected;
});
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> capturingTransport.openConnection(discoveryNode, null));
transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet(), connectionManager);
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -97,33 +97,7 @@ public class CapturingTransport implements Transport {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true);
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() {
@Override
public DiscoveryNode getNode() {
return discoveryNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
onSendRequest(requestId, action, request, discoveryNode);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
});
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);