Zen2: Add leader-side join handling logic (#33013)

Adds the logic for handling joins by a prospective leader. Introduces the Coordinator class with the
basic lifecycle modes (candidate, leader, follower) as well as a JoinHelper class that contains most
of the plumbing for handling joins.
This commit is contained in:
Yannick Welsch 2018-08-23 19:18:52 +02:00 committed by GitHub
parent e4ef12798e
commit a0d32f5947
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1035 additions and 3 deletions

View File

@ -227,6 +227,7 @@ public class CoordinationState extends AbstractComponent {
boolean added = joinVotes.addVote(join.getSourceNode());
boolean prevElectionWon = electionWon;
electionWon = isElectionQuorum(joinVotes);
assert !prevElectionWon || electionWon; // we cannot go from won to not won
logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join,
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());

View File

@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
import java.util.Optional;
import java.util.function.Supplier;
public class Coordinator extends AbstractLifecycleComponent {
private final TransportService transportService;
private final JoinHelper joinHelper;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
// TODO: the following two fields are package-private as some tests require access to them
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
final Object mutex = new Object();
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier) {
super(settings);
this.transportService = transportService;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest);
this.persistedStateSupplier = persistedStateSupplier;
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
}
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
}
return Optional.empty();
}
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
Join join = coordinationState.get().handleStartJoin(startJoinRequest);
lastJoin = Optional.of(join);
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
}
return join;
}
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false;
logger.trace("handleJoin: as {}, handling {}", mode, joinRequest);
transportService.connectToNode(joinRequest.getSourceNode());
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
final CoordinationState coordState = coordinationState.get();
final boolean prevElectionWon = coordState.electionWon();
if (optionalJoin.isPresent()) {
Join join = optionalJoin.get();
// if someone thinks we should be master, let's add our vote and try to become one
// note that the following line should never throw an exception
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin);
if (coordState.electionWon()) {
// if we have already won the election then the actual join does not matter for election purposes,
// so swallow any exception
try {
coordState.handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.trace("failed to add join, ignoring", e);
}
} else {
coordState.handleJoin(join); // this might fail and bubble up the exception
}
}
joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);
if (prevElectionWon == false && coordState.electionWon()) {
becomeLeader("handleJoin");
}
}
}
void becomeCandidate(String method) {
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
logger.debug("{}: becoming CANDIDATE (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
if (mode != Mode.CANDIDATE) {
mode = Mode.CANDIDATE;
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator();
}
}
void becomeLeader(String method) {
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
mode = Mode.LEADER;
lastKnownLeader = Optional.of(getLocalNode());
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new LeaderJoinAccumulator();
}
void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);
if (mode != Mode.FOLLOWER) {
mode = Mode.FOLLOWER;
joinAccumulator.close(mode);
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
}
lastKnownLeader = Optional.of(leaderNode);
}
// package-visible for testing
long getCurrentTerm() {
synchronized (mutex) {
return coordinationState.get().getCurrentTerm();
}
}
// package-visible for testing
Mode getMode() {
synchronized (mutex) {
return mode;
}
}
// package-visible for testing
DiscoveryNode getLocalNode() {
return transportService.getLocalNode();
}
@Override
protected void doStart() {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
}
public void startInitialJoin() {
synchronized (mutex) {
becomeCandidate("startInitialJoin");
}
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
public void invariant() {
synchronized (mutex) {
if (mode == Mode.LEADER) {
assert coordinationState.get().electionWon();
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
}
}
}
public enum Mode {
CANDIDATE, LEADER, FOLLOWER
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
public class JoinHelper extends AbstractComponent {
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
private final MasterService masterService;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler) {
super(settings);
this.masterService = masterService;
this.transportService = transportService;
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
throws Exception {
// This is called when preparing the next cluster state for publication. There is no guarantee that the term we see here is
// the term under which this state will eventually be published: the current term may be increased after this check due to
// some other activity. That the term is correct is, however, checked properly during publication, so it is sufficient to
// check it here on a best-effort basis. This is fine because a concurrent change indicates the existence of another leader
// in a higher term which will cause this node to stand down.
final long currentTerm = currentTermSupplier.getAsLong();
if (currentState.term() != currentTerm) {
currentState = ClusterState.builder(currentState).term(currentTerm).build();
}
return super.execute(currentState, joiningTasks);
}
};
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
(request, channel, task) -> joinHandler.accept(request, new JoinCallback() {
@Override
public void onSuccess() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}
@Override
public String toString() {
return "JoinCallback{request=" + request + "}";
}
}));
}
public interface JoinCallback {
void onSuccess();
void onFailure(Exception e);
}
static class JoinTaskListener implements ClusterStateTaskListener {
private final JoinTaskExecutor.Task task;
private final JoinCallback joinCallback;
JoinTaskListener(JoinTaskExecutor.Task task, JoinCallback joinCallback) {
this.task = task;
this.joinCallback = joinCallback;
}
@Override
public void onFailure(String source, Exception e) {
joinCallback.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
joinCallback.onSuccess();
}
@Override
public String toString() {
return "JoinTaskListener{task=" + task + "}";
}
}
interface JoinAccumulator {
void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback);
default void close(Mode newMode) {
}
}
class LeaderJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(task, joinCallback));
}
@Override
public String toString() {
return "LeaderJoinAccumulator";
}
}
static class FollowerJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
joinCallback.onFailure(new CoordinationStateRejectedException("join target is a follower"));
}
@Override
public String toString() {
return "FollowerJoinAccumulator";
}
}
class CandidateJoinAccumulator implements JoinAccumulator {
private final Map<DiscoveryNode, JoinCallback> joinRequestAccumulator = new HashMap<>();
boolean closed;
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
assert closed == false : "CandidateJoinAccumulator closed";
JoinCallback prev = joinRequestAccumulator.put(sender, joinCallback);
if (prev != null) {
prev.onFailure(new CoordinationStateRejectedException("received a newer join from " + sender));
}
}
@Override
public void close(Mode newMode) {
assert closed == false : "CandidateJoinAccumulator closed";
closed = true;
if (newMode == Mode.LEADER) {
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new HashMap<>();
joinRequestAccumulator.forEach((key, value) -> {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(key, "elect leader");
pendingAsTasks.put(task, new JoinTaskListener(task, value));
});
final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";
pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> {
});
pendingAsTasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, (source, e) -> {
});
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else if (newMode == Mode.FOLLOWER) {
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
new CoordinationStateRejectedException("became follower")));
} else {
assert newMode == Mode.CANDIDATE;
assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet();
}
}
@Override
public String toString() {
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}';
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Optional;
public class JoinRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
private final Optional<Join> optionalJoin;
public JoinRequest(DiscoveryNode sourceNode, Optional<Join> optionalJoin) {
assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode);
this.sourceNode = sourceNode;
this.optionalJoin = optionalJoin;
}
public JoinRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
public DiscoveryNode getSourceNode() {
return sourceNode;
}
public Optional<Join> getOptionalJoin() {
return optionalJoin;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JoinRequest)) return false;
JoinRequest that = (JoinRequest) o;
if (!sourceNode.equals(that.sourceNode)) return false;
return optionalJoin.equals(that.optionalJoin);
}
@Override
public int hashCode() {
int result = sourceNode.hashCode();
result = 31 * result + optionalJoin.hashCode();
return result;
}
@Override
public String toString() {
return "JoinRequest{" +
"sourceNode=" + sourceNode +
", optionalJoin=" + optionalJoin +
'}';
}
}

View File

@ -168,6 +168,30 @@ public class MessagesTests extends ESTestCase {
});
}
public void testJoinRequestEqualsHashCodeSerialization() {
Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong());
JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(),
randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest,
joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new),
joinRequest -> {
if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) {
return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin());
} else {
// change OptionalJoin
final Optional<Join> newOptionalJoin;
if (joinRequest.getOptionalJoin().isPresent() && randomBoolean()) {
newOptionalJoin = Optional.empty();
} else {
newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
}
return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin);
}
});
}
public ClusterState randomClusterState() {
return CoordinationStateTests.clusterState(randomNonNegativeLong(), randomNonNegativeLong(), createNode(randomAlphaOfLength(10)),
new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))),

View File

@ -0,0 +1,492 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE")
public class NodeJoinTests extends ESTestCase {
private static ThreadPool threadPool;
private MasterService masterService;
private Coordinator coordinator;
private DeterministicTaskQueue deterministicTaskQueue;
private TransportRequestHandler<JoinRequest> transportRequestHandler;
@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool(NodeJoinTests.getTestClass().getName());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@After
public void tearDown() throws Exception {
super.tearDown();
masterService.close();
}
private static ClusterState initialState(boolean withMaster, DiscoveryNode localNode, long term, long version,
VotingConfiguration config) {
ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
.nodes(DiscoveryNodes.builder()
.add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(withMaster ? localNode.getId() : null))
.term(term)
.version(version)
.lastAcceptedConfiguration(config)
.lastCommittedConfiguration(config)
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
return initialClusterState;
}
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
deterministicTaskQueue = new DeterministicTaskQueue(Settings.EMPTY);
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
fakeMasterService.setClusterStateSupplier(currentState::get);
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
currentState.set(event.state());
publishListener.onResponse(null);
});
fakeMasterService.start();
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService);
}
private void setupRealMasterServiceAndCoordinator(long term, ClusterState initialState) {
setupMasterServiceAndCoordinator(term, initialState, ClusterServiceUtils.createMasterService(threadPool, initialState));
}
private void setupMasterServiceAndCoordinator(long term, ClusterState initialState, MasterService masterService) {
if (this.masterService != null || coordinator != null) {
throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once");
}
this.masterService = masterService;
TransportService transportService = mock(TransportService.class);
when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode());
@SuppressWarnings("unchecked")
ArgumentCaptor<TransportRequestHandler<JoinRequest>> joinRequestHandler = ArgumentCaptor.forClass(
(Class) TransportRequestHandler.class);
coordinator = new Coordinator(Settings.EMPTY,
transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState));
verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false),
anyObject(), joinRequestHandler.capture());
transportRequestHandler = joinRequestHandler.getValue();
coordinator.start();
coordinator.startInitialJoin();
}
protected DiscoveryNode newNode(int i) {
return newNode(i, randomBoolean());
}
protected DiscoveryNode newNode(int i, boolean master) {
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (master) {
roles.add(DiscoveryNode.Role.MASTER);
}
final String prefix = master ? "master_" : "data_";
return new DiscoveryNode(prefix + i, i + "", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT);
}
static class SimpleFuture extends BaseFuture<Void> {
final String description;
SimpleFuture(String description) {
this.description = description;
}
public void markAsDone() {
set(null);
}
public void markAsFailed(Throwable t) {
setException(t);
}
@Override
public String toString() {
return "future [" + description + "]";
}
}
private SimpleFuture joinNodeAsync(final JoinRequest joinRequest) {
final SimpleFuture future = new SimpleFuture("join of " + joinRequest + "]");
logger.debug("starting {}", future);
// clone the node before submitting to simulate an incoming join, which is guaranteed to have a new
// disco node object serialized off the network
try {
transportRequestHandler.messageReceived(joinRequest, new TransportChannel() {
@Override
public String getProfileName() {
return "dummy";
}
@Override
public String getChannelType() {
return "dummy";
}
@Override
public void sendResponse(TransportResponse response) {
logger.debug("{} completed", future);
future.markAsDone();
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}
@Override
public void sendResponse(Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e);
future.markAsFailed(e);
}
}, null);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e);
future.markAsFailed(e);
}
return future;
}
private void joinNode(final JoinRequest joinRequest) {
FutureUtils.get(joinNodeAsync(joinRequest));
}
private void joinNodeAndRun(final JoinRequest joinRequest) {
SimpleFuture fut = joinNodeAsync(joinRequest);
deterministicTaskQueue.runAllTasks(random());
assertTrue(fut.isDone());
FutureUtils.get(fut);
}
public void testJoinWithHigherTermElectsLeader() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(randomFrom(node0, node1).getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
}
public void testJoinWithHigherTermButBetterStateGetsRejected() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node1.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
expectThrows(CoordinationStateRejectedException.class,
() -> joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))));
assertFalse(isLocalNodeElectedMaster());
}
public void testJoinWithHigherTermButBetterStateStillElectsMasterThroughSelfJoin() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))));
assertTrue(isLocalNodeElectedMaster());
}
public void testJoinElectedLeader() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertFalse(clusterStateHasNode(node1));
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertTrue(clusterStateHasNode(node1));
}
public void testJoinAccumulation() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
DiscoveryNode node2 = newNode(2, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node2.getId()))));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of(
new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllTasks(random());
assertFalse(futNode0.isDone());
assertFalse(isLocalNodeElectedMaster());
SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of(
new Join(node1, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllTasks(random());
assertFalse(futNode1.isDone());
assertFalse(isLocalNodeElectedMaster());
joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
assertTrue(clusterStateHasNode(node1));
assertTrue(clusterStateHasNode(node2));
FutureUtils.get(futNode0);
FutureUtils.get(futNode1);
}
public void testJoinFollowerWithHigherTerm() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm));
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
assertFalse(isLocalNodeElectedMaster());
long newerTerm = newTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1,
Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
}
public void testJoinFollowerFails() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm));
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
assertFalse(isLocalNodeElectedMaster());
assertThat(expectThrows(CoordinationStateRejectedException.class,
() -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(),
containsString("join target is a follower"));
assertFalse(isLocalNodeElectedMaster());
}
public void testBecomeFollowerFailsPendingJoin() {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node1.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllTasks(random());
assertFalse(fut.isDone());
assertFalse(isLocalNodeElectedMaster());
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
assertFalse(isLocalNodeElectedMaster());
assertThat(expectThrows(CoordinationStateRejectedException.class,
() -> FutureUtils.get(fut)).getMessage(),
containsString("became follower"));
assertFalse(isLocalNodeElectedMaster());
}
public void testConcurrentJoining() {
List<DiscoveryNode> nodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
.mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList());
VotingConfiguration votingConfiguration = new VotingConfiguration(
randomSubsetOf(randomIntBetween(1, nodes.size()), nodes).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()));
logger.info("Voting configuration: {}", votingConfiguration);
DiscoveryNode localNode = nodes.get(0);
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupRealMasterServiceAndCoordinator(initialTerm, initialState(false, localNode, initialTerm, initialVersion, votingConfiguration));
long newTerm = initialTerm + randomLongBetween(1, 10);
// we need at least a quorum of voting nodes with a correct term and worse state
List<DiscoveryNode> successfulNodes;
do {
successfulNodes = randomSubsetOf(nodes);
} while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()))
== false);
logger.info("Successful voting nodes: {}", successfulNodes);
List<JoinRequest> correctJoinRequests = successfulNodes.stream().map(
node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
.collect(Collectors.toList());
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(nodes);
possiblyUnsuccessfulNodes.removeAll(successfulNodes);
logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes);
List<JoinRequest> possiblyFailingJoinRequests = possiblyUnsuccessfulNodes.stream().map(node -> {
if (randomBoolean()) {
// a correct request
return new JoinRequest(node, Optional.of(new Join(node, localNode,
newTerm, initialTerm, initialVersion)));
} else if (randomBoolean()) {
// term too low
return new JoinRequest(node, Optional.of(new Join(node, localNode,
randomLongBetween(0, initialTerm), initialTerm, initialVersion)));
} else {
// better state
return new JoinRequest(node, Optional.of(new Join(node, localNode,
newTerm, initialTerm, initialVersion + randomLongBetween(1, 10))));
}
}).collect(Collectors.toList());
// duplicate some requests, which will be unsuccessful
possiblyFailingJoinRequests.addAll(randomSubsetOf(possiblyFailingJoinRequests));
CyclicBarrier barrier = new CyclicBarrier(correctJoinRequests.size() + possiblyFailingJoinRequests.size() + 1);
List<Thread> threads = new ArrayList<>();
threads.add(new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 30; i++) {
coordinator.invariant();
}
}));
threads.addAll(correctJoinRequests.stream().map(joinRequest -> new Thread(
() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
joinNode(joinRequest);
})).collect(Collectors.toList()));
threads.addAll(possiblyFailingJoinRequests.stream().map(joinRequest -> new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
try {
joinNode(joinRequest);
} catch (CoordinationStateRejectedException ignore) {
// ignore
}
})).collect(Collectors.toList()));
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
successfulNodes.forEach(node -> assertTrue(clusterStateHasNode(node)));
}
private boolean isLocalNodeElectedMaster() {
return MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster();
}
private boolean clusterStateHasNode(DiscoveryNode node) {
return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId()));
}
}

View File

@ -50,7 +50,7 @@ public class FakeThreadPoolMasterService extends MasterService {
private boolean taskInProgress = false;
private boolean waitForPublish = false;
FakeThreadPoolMasterService(String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
public FakeThreadPoolMasterService(String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
super(Settings.EMPTY, createMockThreadPool());
this.name = serviceName;
this.onTaskAvailableToRun = onTaskAvailableToRun;

View File

@ -969,10 +969,10 @@ public abstract class ESTestCase extends LuceneTestCase {
}
/**
* Returns a random subset of values (including a potential empty list)
* Returns a random subset of values (including a potential empty list, or the full original list)
*/
public static <T> List<T> randomSubsetOf(Collection<T> collection) {
return randomSubsetOf(randomInt(Math.max(collection.size() - 1, 0)), collection);
return randomSubsetOf(randomInt(collection.size()), collection);
}
/**