diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 6cda2a04bd7..27544405d3c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -99,7 +99,12 @@ public class CoordinationState extends AbstractComponent { } public boolean isElectionQuorum(VoteCollection votes) { - return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration()); + return isElectionQuorum(votes, getLastAcceptedState()); + } + + static boolean isElectionQuorum(VoteCollection votes, ClusterState lastAcceptedState) { + return votes.isQuorum(lastAcceptedState.getLastCommittedConfiguration()) + && votes.isQuorum(lastAcceptedState.getLastAcceptedConfiguration()); } public boolean isPublishQuorum(VoteCollection votes) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java new file mode 100644 index 00000000000..84dfa70752f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -0,0 +1,202 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum; +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; + +public class PreVoteCollector extends AbstractComponent { + + public static final String REQUEST_PRE_VOTE_ACTION_NAME = "internal:cluster/request_pre_vote"; + + private final TransportService transportService; + private final Runnable startElection; + + // Tuple for simple atomic updates + private volatile Tuple state; // DiscoveryNode component is null if there is currently no known leader + + PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse, + final TransportService transportService, final Runnable startElection) { + super(settings); + state = new Tuple<>(null, preVoteResponse); + this.transportService = transportService; + this.startElection = startElection; + + // TODO does this need to be on the generic threadpool or can it use SAME? + transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false, + PreVoteRequest::new, + (request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request))); + } + + /** + * Start a new pre-voting round. + * + * @param clusterState the last-accepted cluster state + * @param broadcastNodes the nodes from whom to request pre-votes + * @return the pre-voting round, which can be closed to end the round early. + */ + public Releasable start(final ClusterState clusterState, final Iterable broadcastNodes) { + PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm()); + preVotingRound.start(broadcastNodes); + return preVotingRound; + } + + public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) { + logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader); + state = new Tuple<>(leader, preVoteResponse); + } + + private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { + // TODO if we are a leader and the max term seen exceeds our term then we need to bump our term + + Tuple state = this.state; + final DiscoveryNode leader = state.v1(); + + if (leader == null) { + return state.v2(); + } + + if (leader.equals(request.getSourceNode())) { + // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible + // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no + // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the + // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers + // to also detect its failure. + return state.v2(); + } + + throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader"); + } + + @Override + public String toString() { + return "PreVoteCollector{" + + "state=" + state + + '}'; + } + + private class PreVotingRound implements Releasable { + private final Set preVotesReceived = newConcurrentSet(); + private final AtomicBoolean electionStarted = new AtomicBoolean(); + private final PreVoteRequest preVoteRequest; + private final ClusterState clusterState; + private final AtomicBoolean isClosed = new AtomicBoolean(); + + PreVotingRound(final ClusterState clusterState, final long currentTerm) { + this.clusterState = clusterState; + preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm); + } + + void start(final Iterable broadcastNodes) { + logger.debug("{} requesting pre-votes from {}", this, broadcastNodes); + broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(PreVoteResponse response) { + handlePreVoteResponse(response, n); + } + + @Override + public void handleException(TransportException exp) { + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { + logger.debug("{} failed: {}", this, exp.getRootCause().getMessage()); + } else { + logger.debug(new ParameterizedMessage("{} failed", this), exp); + } + } + + @Override + public String executor() { + return Names.GENERIC; + } + + @Override + public String toString() { + return "TransportResponseHandler{" + PreVoteCollector.this + ", node=" + n + '}'; + } + })); + } + + private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) { + if (isClosed.get()) { + logger.debug("{} is closed, ignoring {} from {}", this, response, sender); + return; + } + + // TODO the response carries the sender's current term. If an election starts then it should be in a higher term. + + if (response.getLastAcceptedTerm() > clusterState.term() + || (response.getLastAcceptedTerm() == clusterState.term() + && response.getLastAcceptedVersion() > clusterState.version())) { + logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender); + return; + } + + preVotesReceived.add(sender); + final VoteCollection voteCollection = new VoteCollection(); + preVotesReceived.forEach(voteCollection::addVote); + + if (isElectionQuorum(voteCollection, clusterState) == false) { + logger.debug("{} added {} from {}, no quorum yet", this, response, sender); + return; + } + + if (electionStarted.compareAndSet(false, true) == false) { + logger.debug("{} added {} from {} but election has already started", this, response, sender); + return; + } + + logger.debug("{} added {} from {}, starting election", this, response, sender); + startElection.run(); + } + + @Override + public String toString() { + return "PreVotingRound{" + + "preVotesReceived=" + preVotesReceived + + ", electionStarted=" + electionStarted + + ", preVoteRequest=" + preVoteRequest + + ", isClosed=" + isClosed + + '}'; + } + + @Override + public void close() { + final boolean isNotAlreadyClosed = isClosed.compareAndSet(false, true); + assert isNotAlreadyClosed; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteRequest.java new file mode 100644 index 00000000000..330a7228567 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +public class PreVoteRequest extends TransportRequest { + + private final DiscoveryNode sourceNode; + private final long currentTerm; + + public PreVoteRequest(DiscoveryNode sourceNode, long currentTerm) { + this.sourceNode = sourceNode; + this.currentTerm = currentTerm; + } + + public PreVoteRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + currentTerm = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeLong(currentTerm); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public String toString() { + return "PreVoteRequest{" + + "sourceNode=" + sourceNode + + ", currentTerm=" + currentTerm + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PreVoteRequest that = (PreVoteRequest) o; + return currentTerm == that.currentTerm && + Objects.equals(sourceNode, that.sourceNode); + } + + @Override + public int hashCode() { + return Objects.hash(sourceNode, currentTerm); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteResponse.java new file mode 100644 index 00000000000..379c1cfcb83 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteResponse.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Objects; + +public class PreVoteResponse extends TransportResponse { + private final long currentTerm; + private final long lastAcceptedTerm; + private final long lastAcceptedVersion; + + public PreVoteResponse(long currentTerm, long lastAcceptedTerm, long lastAcceptedVersion) { + this.currentTerm = currentTerm; + this.lastAcceptedTerm = lastAcceptedTerm; + this.lastAcceptedVersion = lastAcceptedVersion; + assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm; + } + + public PreVoteResponse(StreamInput in) throws IOException { + currentTerm = in.readLong(); + lastAcceptedTerm = in.readLong(); + lastAcceptedVersion = in.readLong(); + assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(currentTerm); + out.writeLong(lastAcceptedTerm); + out.writeLong(lastAcceptedVersion); + } + + public long getCurrentTerm() { + return currentTerm; + } + + public long getLastAcceptedTerm() { + return lastAcceptedTerm; + } + + public long getLastAcceptedVersion() { + return lastAcceptedVersion; + } + + @Override + public String toString() { + return "PreVoteResponse{" + + "currentTerm=" + currentTerm + + ", lastAcceptedTerm=" + lastAcceptedTerm + + ", lastAcceptedVersion=" + lastAcceptedVersion + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PreVoteResponse that = (PreVoteResponse) o; + return currentTerm == that.currentTerm && + lastAcceptedTerm == that.lastAcceptedTerm && + lastAcceptedVersion == that.lastAcceptedVersion; + } + + @Override + public int hashCode() { + return Objects.hash(currentTerm, lastAcceptedTerm, lastAcceptedVersion); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 1770770e02f..926287503d8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -174,4 +174,50 @@ public class MessagesTests extends ESTestCase { new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), randomLong()); } + + public void testPreVoteRequestEqualsHashCodeSerialization() { + PreVoteRequest initialPreVoteRequest = new PreVoteRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPreVoteRequest, + preVoteRequest -> copyWriteable(preVoteRequest, writableRegistry(), PreVoteRequest::new), + preVoteRequest -> { + if (randomBoolean()) { + return new PreVoteRequest(createNode(randomAlphaOfLength(10)), preVoteRequest.getCurrentTerm()); + } else { + return new PreVoteRequest(preVoteRequest.getSourceNode(), randomNonNegativeLong()); + } + }); + } + + public void testPreVoteResponseEqualsHashCodeSerialization() { + long currentTerm = randomNonNegativeLong(); + PreVoteResponse initialPreVoteResponse + = new PreVoteResponse(currentTerm, randomLongBetween(1, currentTerm), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPreVoteResponse, + preVoteResponse -> copyWriteable(preVoteResponse, writableRegistry(), PreVoteResponse::new), + preVoteResponse -> { + switch (randomInt(2)) { + case 0: + assumeTrue("last-accepted term is Long.MAX_VALUE", preVoteResponse.getLastAcceptedTerm() < Long.MAX_VALUE); + return new PreVoteResponse( + randomValueOtherThan(preVoteResponse.getCurrentTerm(), + () -> randomLongBetween(preVoteResponse.getLastAcceptedTerm(), Long.MAX_VALUE)), + preVoteResponse.getLastAcceptedTerm(), + preVoteResponse.getLastAcceptedVersion()); + case 1: + assumeTrue("current term is 1", 1 < preVoteResponse.getCurrentTerm()); + return new PreVoteResponse( + preVoteResponse.getCurrentTerm(), + randomValueOtherThan(preVoteResponse.getLastAcceptedTerm(), + () -> randomLongBetween(1, preVoteResponse.getCurrentTerm())), + preVoteResponse.getLastAcceptedVersion()); + case 2: + return new PreVoteResponse( + preVoteResponse.getCurrentTerm(), + preVoteResponse.getLastAcceptedTerm(), + randomValueOtherThan(preVoteResponse.getLastAcceptedVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java new file mode 100644 index 00000000000..2f1b73a90c2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java @@ -0,0 +1,329 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.PreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.Names.SAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class PreVoteCollectorTests extends ESTestCase { + + private DeterministicTaskQueue deterministicTaskQueue; + private PreVoteCollector preVoteCollector; + private boolean electionOccurred = false; + private DiscoveryNode localNode; + private Map responsesByNode = new HashMap<>(); + private long currentTerm, lastAcceptedTerm, lastAcceptedVersion; + private TransportService transportService; + + @Before + public void createObjects() { + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + deterministicTaskQueue = new DeterministicTaskQueue(settings); + final CapturingTransport capturingTransport = new CapturingTransport() { + @Override + protected void onSendRequest(final long requestId, final String action, final TransportRequest request, + final DiscoveryNode node) { + super.onSendRequest(requestId, action, request, node); + assertThat(action, is(REQUEST_PRE_VOTE_ACTION_NAME)); + assertThat(request, instanceOf(PreVoteRequest.class)); + assertThat(node, not(equalTo(localNode))); + PreVoteRequest preVoteRequest = (PreVoteRequest) request; + assertThat(preVoteRequest.getSourceNode(), equalTo(localNode)); + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + final PreVoteResponse response = responsesByNode.get(node); + if (response == null) { + handleRemoteError(requestId, new ConnectTransportException(node, "no response")); + } else { + handleResponse(requestId, response); + } + } + + @Override + public String toString() { + return "response to " + request + " from " + node; + } + }); + } + }; + lastAcceptedTerm = randomNonNegativeLong(); + currentTerm = randomLongBetween(lastAcceptedTerm, Long.MAX_VALUE); + lastAcceptedVersion = randomNonNegativeLong(); + + localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(localNode, new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion)); + transportService = capturingTransport.createCapturingTransportService(settings, + deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + preVoteCollector = new PreVoteCollector(settings, getLocalPreVoteResponse(), transportService, () -> { + assert electionOccurred == false; + electionOccurred = true; + }); + } + + private PreVoteResponse getLocalPreVoteResponse() { + return Objects.requireNonNull(responsesByNode.get(localNode)); + } + + private void startAndRunCollector(DiscoveryNode... votingNodes) { + try (Releasable ignored = startCollector(votingNodes)) { + runCollector(); + } + } + + private void runCollector() { + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + } + + private ClusterState makeClusterState(DiscoveryNode[] votingNodes) { + final VotingConfiguration votingConfiguration + = new VotingConfiguration(Arrays.stream(votingNodes).map(DiscoveryNode::getId).collect(Collectors.toSet())); + return CoordinationStateTests.clusterState(lastAcceptedTerm, lastAcceptedVersion, localNode, + votingConfiguration, votingConfiguration, 0); + } + + private Releasable startCollector(DiscoveryNode... votingNodes) { + return preVoteCollector.start(makeClusterState(votingNodes), responsesByNode.keySet()); + } + + public void testStartsElectionIfLocalNodeIsOnlyNode() { + startAndRunCollector(localNode); + assertTrue(electionOccurred); + } + + public void testStartsElectionIfLocalNodeIsQuorum() { + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, getLocalPreVoteResponse()); + startAndRunCollector(otherNode); + assertTrue(electionOccurred); + } + + + public void testStartsElectionIfOtherNodeIsQuorum() { + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, getLocalPreVoteResponse()); + startAndRunCollector(otherNode); + assertTrue(electionOccurred); + } + + public void testDoesNotStartsElectionIfOtherNodeIsQuorumAndDoesNotRespond() { + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, null); + startAndRunCollector(otherNode); + assertFalse(electionOccurred); + } + + public void testDoesNotStartElectionIfStopped() { + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, getLocalPreVoteResponse()); + startCollector(otherNode).close(); + runCollector(); + assertFalse(electionOccurred); + } + + public void testIgnoresPreVotesFromLaterTerms() { + assumeTrue("unluckily chose lastAcceptedTerm too close to currentTerm, no later terms", lastAcceptedTerm < currentTerm - 1); + + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, + new PreVoteResponse(currentTerm, randomLongBetween(lastAcceptedTerm + 1, currentTerm - 1), randomNonNegativeLong())); + startAndRunCollector(otherNode); + assertFalse(electionOccurred); + } + + public void testIgnoresPreVotesFromLaterVersionInSameTerm() { + assumeTrue("unluckily hit Long.MAX_VALUE for lastAcceptedVersion, cannot increment", lastAcceptedVersion < Long.MAX_VALUE); + + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, + new PreVoteResponse(currentTerm, lastAcceptedTerm, randomLongBetween(lastAcceptedVersion + 1, Long.MAX_VALUE))); + startAndRunCollector(otherNode); + assertFalse(electionOccurred); + } + + public void testAcceptsPreVotesFromAnyVersionInEarlierTerms() { + assumeTrue("unluckily hit 0 for lastAcceptedTerm, cannot decrement", 0 < lastAcceptedTerm); + + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, + new PreVoteResponse(currentTerm, randomLongBetween(0, lastAcceptedTerm - 1), randomNonNegativeLong())); + startAndRunCollector(otherNode); + assertTrue(electionOccurred); + } + + private PreVoteResponse randomPreVoteResponse() { + final long currentTerm = randomNonNegativeLong(); + return new PreVoteResponse(currentTerm, randomLongBetween(0, currentTerm), randomNonNegativeLong()); + } + + public void testPrevotingIndicatesElectionSuccess() { + assumeTrue("unluckily hit currentTerm = Long.MAX_VALUE, cannot increment", currentTerm < Long.MAX_VALUE); + + final Set votingNodesSet = new HashSet<>(); + final int nodeCount = randomIntBetween(0, 5); + for (int i = 0; i < nodeCount; i++) { + final DiscoveryNode otherNode = new DiscoveryNode("other-node-" + i, buildNewFakeTransportAddress(), Version.CURRENT); + responsesByNode.put(otherNode, randomBoolean() ? null : randomPreVoteResponse()); + PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion); + preVoteCollector.update(newPreVoteResponse, null); + if (randomBoolean()) { + votingNodesSet.add(otherNode); + } + } + + DiscoveryNode[] votingNodes = votingNodesSet.toArray(new DiscoveryNode[0]); + startAndRunCollector(votingNodes); + + final CoordinationState coordinationState = new CoordinationState(Settings.EMPTY, localNode, + new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes))); + + final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); + + coordinationState.handleStartJoin(new StartJoinRequest(localNode, newTerm)); + + responsesByNode.forEach((otherNode, preVoteResponse) -> { + if (preVoteResponse != null) { + try { + coordinationState.handleJoin(new Join(otherNode, localNode, newTerm, + preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion())); + } catch (CoordinationStateRejectedException ignored) { + // ok to reject some joins. + } + } + }); + + assertThat(coordinationState.electionWon(), equalTo(electionOccurred)); + } + + private PreVoteResponse handlePreVoteRequestViaTransportService(PreVoteRequest preVoteRequest) { + final AtomicReference responseRef = new AtomicReference<>(); + final AtomicReference exceptionRef = new AtomicReference<>(); + + transportService.sendRequest(localNode, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(PreVoteResponse response) { + responseRef.set(response); + } + + @Override + public void handleException(TransportException exp) { + exceptionRef.set(exp); + } + + @Override + public String executor() { + return SAME; + } + }); + + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + + final PreVoteResponse response = responseRef.get(); + final TransportException transportException = exceptionRef.get(); + + if (transportException != null) { + assertThat(response, nullValue()); + throw transportException; + } + + assertThat(response, not(nullValue())); + return response; + } + + public void testResponseIfCandidate() { + final long term = randomNonNegativeLong(); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + + PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion); + preVoteCollector.update(newPreVoteResponse, null); + + assertThat(handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term)), equalTo(newPreVoteResponse)); + } + + public void testResponseToNonLeaderIfNotCandidate() { + final long term = randomNonNegativeLong(); + final DiscoveryNode leaderNode = new DiscoveryNode("leader-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + + PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion); + preVoteCollector.update(newPreVoteResponse, leaderNode); + + RemoteTransportException remoteTransportException = expectThrows(RemoteTransportException.class, () -> + handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term))); + assertThat(remoteTransportException.getCause(), instanceOf(CoordinationStateRejectedException.class)); + } + + public void testResponseToRequestFromLeader() { + // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible that + // the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no major + // drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the leader + // won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers to also + // detect its failure. + + final long term = randomNonNegativeLong(); + final DiscoveryNode leaderNode = new DiscoveryNode("leader-node", buildNewFakeTransportAddress(), Version.CURRENT); + + PreVoteResponse newPreVoteResponse = new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion); + preVoteCollector.update(newPreVoteResponse, leaderNode); + + assertThat(handlePreVoteRequestViaTransportService(new PreVoteRequest(leaderNode, term)), equalTo(newPreVoteResponse)); + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java index 5e71a6c31e5..7cfa13da19c 100644 --- a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -75,14 +75,13 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase { super.onSendRequest(requestId, action, request, node); assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME)); assertEquals(remoteNode.getAddress(), node.getAddress()); - assertNotEquals(remoteNode, node); if (dropHandshake == false) { handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT)); } } }; - transportService = new TransportService(settings, capturingTransport, threadPool, + transportService = capturingTransport.createCapturingTransportService(settings, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet()); transportService.start(); diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 4c7d3b0f8d0..b924344e868 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -33,7 +33,9 @@ import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; +import org.elasticsearch.test.transport.StubbableConnectionManager; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -191,18 +193,20 @@ public class PeerFinderTests extends ESTestCase { deterministicTaskQueue = new DeterministicTaskQueue(settings); localNode = newDiscoveryNode("local-node"); - transportService = new TransportService(settings, capturingTransport, - deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundTransportAddress -> localNode, null, emptySet()) { - @Override - public boolean nodeConnected(DiscoveryNode node) { - final boolean isConnected = connectedNodes.contains(node); - final boolean isDisconnected = disconnectedNodes.contains(node); - assert isConnected != isDisconnected : node + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected; - return isConnected; - } - }; + ConnectionManager innerConnectionManager + = new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool()); + StubbableConnectionManager connectionManager + = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool()); + connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { + final boolean isConnected = connectedNodes.contains(discoveryNode); + final boolean isDisconnected = disconnectedNodes.contains(discoveryNode); + assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected; + return isConnected; + }); + connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> capturingTransport.openConnection(discoveryNode, null)); + transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet(), connectionManager); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index ebec6917446..94a6e1b82ee 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -97,33 +97,7 @@ public class CapturingTransport implements Transport { StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), settings, this, threadPool); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true); - connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() { - @Override - public DiscoveryNode getNode() { - return discoveryNode; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws TransportException { - onSendRequest(requestId, action, request, discoveryNode); - } - - @Override - public void addCloseListener(ActionListener listener) { - - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - - } - }); + connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null)); return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, connectionManager);