[Zen2] Implement basic cluster formation (#33668)

This PR integrates the following pieces of machinery in the Coordinator:

- discovery
- pre-voting
- randomised election scheduling
- joining (of a new master)
- publication of cluster state updates

Together, these things are everything needed to form a cluster. We therefore
also add the start of a test suite that allows us to assert higher-level
properties of the interactions between all these pieces of machinery, with as
little fake behaviour as possible. We assert one such property: "a cluster
successfully forms".
This commit is contained in:
David Turner 2018-09-17 15:00:30 +02:00 committed by GitHub
parent 01b3be917a
commit c79fbea923
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 886 additions and 88 deletions

View File

@ -49,6 +49,7 @@ public class ApplyCommitRequest extends TermVersionRequest {
return "ApplyCommitRequest{" +
"term=" + term +
", version=" + version +
", sourceNode=" + sourceNode +
'}';
}
}

View File

@ -19,18 +19,35 @@
package org.elasticsearch.cluster.coordination;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.UnicastConfiguredHostsResolver;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class Coordinator extends AbstractLifecycleComponent {
public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
private final TransportService transportService;
private final JoinHelper joinHelper;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
@ -38,6 +55,17 @@ public class Coordinator extends AbstractLifecycleComponent {
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
final Object mutex = new Object();
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile Optional<ClusterState> lastCommittedState = Optional.empty();
private final PeerFinder peerFinder;
private final PreVoteCollector preVoteCollector;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final UnicastConfiguredHostsResolver configuredHostsResolver;
@Nullable
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
private AtomicLong maxTermSeen = new AtomicLong();
private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
@ -45,15 +73,99 @@ public class Coordinator extends AbstractLifecycleComponent {
private JoinHelper.JoinAccumulator joinAccumulator;
public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier) {
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier,
UnicastHostsProvider unicastHostsProvider) {
super(settings);
this.transportService = transportService;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest);
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm);
this.persistedStateSupplier = persistedStateSupplier;
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, Randomness.get(), transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen);
configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, Names.GENERIC, false, false,
in -> new PublishRequest(in, transportService.getLocalNode()),
(request, channel, task) -> channel.sendResponse(handlePublishRequest(request)));
transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, Names.GENERIC, false, false,
ApplyCommitRequest::new,
(request, channel, task) -> {
handleApplyCommit(request);
channel.sendResponse(Empty.INSTANCE);
});
}
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
coordinationState.get().handleCommit(applyCommitRequest);
lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState());
}
}
private PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
synchronized (mutex) {
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);
if (sourceNode.equals(getLocalNode()) == false) {
becomeFollower("handlePublishRequest", sourceNode);
}
return new PublishWithJoinResponse(publishResponse,
joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term()));
}
}
private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, DiscoveryNode leader, long term) {
if (lastJoin.isPresent()
&& lastJoin.get().getTargetNode().getId().equals(leader.getId())
&& lastJoin.get().getTerm() == term) {
return lastJoin;
}
return Optional.empty();
}
private void closePrevotingAndElectionScheduler() {
if (prevotingRound != null) {
prevotingRound.close();
prevotingRound = null;
}
if (electionScheduler != null) {
electionScheduler.close();
electionScheduler = null;
}
}
private void updateMaxTermSeen(final long term) {
maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
// TODO if we are leader here, and there is no publication in flight, then we should bump our term
// (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but
// do check for this after the publication completes)
}
private void startElection() {
synchronized (mutex) {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
if (mode == Mode.CANDIDATE) {
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
}
}
}
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
@ -65,19 +177,20 @@ public class Coordinator extends AbstractLifecycleComponent {
}
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
Join join = coordinationState.get().handleStartJoin(startJoinRequest);
synchronized (mutex) {
logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
lastJoin = Optional.of(join);
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
}
return join;
}
}
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false;
logger.trace("handleJoin: as {}, handling {}", mode, joinRequest);
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
transportService.connectToNode(joinRequest.getSourceNode());
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
@ -85,29 +198,11 @@ public class Coordinator extends AbstractLifecycleComponent {
final CoordinationState coordState = coordinationState.get();
final boolean prevElectionWon = coordState.electionWon();
if (optionalJoin.isPresent()) {
Join join = optionalJoin.get();
// if someone thinks we should be master, let's add our vote and try to become one
// note that the following line should never throw an exception
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin);
if (coordState.electionWon()) {
// if we have already won the election then the actual join does not matter for election purposes,
// so swallow any exception
try {
coordState.handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.trace("failed to add join, ignoring", e);
}
} else {
coordState.handleJoin(join); // this might fail and bubble up the exception
}
}
optionalJoin.ifPresent(this::handleJoin);
joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);
if (prevElectionWon == false && coordState.electionWon()) {
becomeLeader("handleJoin");
becomeLeader("handleJoinRequest");
}
}
}
@ -120,7 +215,11 @@ public class Coordinator extends AbstractLifecycleComponent {
mode = Mode.CANDIDATE;
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator();
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
}
preVoteCollector.update(getPreVoteResponse(), null);
}
void becomeLeader(String method) {
@ -129,9 +228,13 @@ public class Coordinator extends AbstractLifecycleComponent {
logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
mode = Mode.LEADER;
lastKnownLeader = Optional.of(getLocalNode());
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new LeaderJoinAccumulator();
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
}
void becomeFollower(String method, DiscoveryNode leaderNode) {
@ -145,6 +248,14 @@ public class Coordinator extends AbstractLifecycleComponent {
}
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), leaderNode);
}
private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedVersion());
}
// package-visible for testing
@ -170,6 +281,7 @@ public class Coordinator extends AbstractLifecycleComponent {
protected void doStart() {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
configuredHostsResolver.start();
}
public void startInitialJoin() {
@ -180,7 +292,7 @@ public class Coordinator extends AbstractLifecycleComponent {
@Override
protected void doStop() {
configuredHostsResolver.stop();
}
@Override
@ -190,22 +302,115 @@ public class Coordinator extends AbstractLifecycleComponent {
public void invariant() {
synchronized (mutex) {
final Optional<DiscoveryNode> peerFinderLeader = peerFinder.getLeader();
if (mode == Mode.LEADER) {
assert coordinationState.get().electionWon();
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
assert peerFinderLeader.isPresent() == false : peerFinderLeader;
assert prevotingRound == null || electionScheduler != null;
}
}
}
// for tests
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
}
void handleJoin(Join join) {
synchronized (mutex) {
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
if (coordinationState.get().electionWon()) {
// if we have already won the election then the actual join does not matter for election purposes,
// so swallow any exception
try {
coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug("failed to add join, ignoring", e);
}
} else {
coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
}
}
}
public ClusterState getLastAcceptedState() {
synchronized (mutex) {
return coordinationState.get().getLastAcceptedState();
}
}
public Optional<ClusterState> getLastCommittedState() {
return lastCommittedState;
}
private List<DiscoveryNode> getDiscoveredNodes() {
final List<DiscoveryNode> nodes = new ArrayList<>();
nodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(nodes::add);
return nodes;
}
public enum Mode {
CANDIDATE, LEADER, FOLLOWER
}
private class CoordinatorPeerFinder extends PeerFinder {
CoordinatorPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
super(settings, transportService, transportAddressConnector, configuredHostsResolver);
}
@Override
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
// TODO
}
@Override
protected void onFoundPeersUpdated() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection();
getFoundPeers().forEach(expectedVotes::addVote);
expectedVotes.addVote(Coordinator.this.getLocalNode());
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final boolean foundQuorum = CoordinationState.isElectionQuorum(expectedVotes, lastAcceptedState);
if (foundQuorum) {
if (electionScheduler == null) {
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, () -> {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
});
}
} else {
closePrevotingAndElectionScheduler();
}
}
}
}
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
@ -27,21 +28,29 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
public class JoinHelper extends AbstractComponent {
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
private final MasterService masterService;
private final TransportService transportService;
@ -49,7 +58,7 @@ public class JoinHelper extends AbstractComponent {
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler) {
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm) {
super(settings);
this.masterService = masterService;
this.transportService = transportService;
@ -100,6 +109,62 @@ public class JoinHelper extends AbstractComponent {
return "JoinCallback{request=" + request + "}";
}
}));
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
(request, channel, task) -> {
final DiscoveryNode destination = request.getSourceNode();
final JoinRequest joinRequest
= new JoinRequest(transportService.getLocalNode(), Optional.of(joinLeaderInTerm.apply(request)));
logger.debug("attempting to join {} with {}", destination, joinRequest);
this.transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}
@Override
public void handleResponse(Empty response) {
logger.debug("successfully joined {} with {}", destination, joinRequest);
}
@Override
public void handleException(TransportException exp) {
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
}
@Override
public String executor() {
return Names.SAME;
}
});
channel.sendResponse(Empty.INSTANCE);
});
}
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
startJoinRequest, new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}
@Override
public void handleResponse(Empty response) {
logger.debug("successful response to {} from {}", startJoinRequest, destination);
}
@Override
public void handleException(TransportException exp) {
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
public interface JoinCallback {
@ -211,7 +276,8 @@ public class JoinHelper extends AbstractComponent {
@Override
public String toString() {
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}';
return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() +
", closed=" + closed + '}';
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
@ -34,6 +35,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
@ -44,16 +46,17 @@ public class PreVoteCollector extends AbstractComponent {
private final TransportService transportService;
private final Runnable startElection;
private final LongConsumer updateMaxTermSeen;
// Tuple for simple atomic updates
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader
// Tuple for simple atomic updates. null until the first call to `update()`.
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader.
PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
final TransportService transportService, final Runnable startElection) {
PreVoteCollector(final Settings settings, final TransportService transportService, final Runnable startElection,
final LongConsumer updateMaxTermSeen) {
super(settings);
state = new Tuple<>(null, preVoteResponse);
this.transportService = transportService;
this.startElection = startElection;
this.updateMaxTermSeen = updateMaxTermSeen;
// TODO does this need to be on the generic threadpool or can it use SAME?
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
@ -74,7 +77,7 @@ public class PreVoteCollector extends AbstractComponent {
return preVotingRound;
}
public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) {
public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}
@ -156,7 +159,7 @@ public class PreVoteCollector extends AbstractComponent {
return;
}
// TODO the response carries the sender's current term. If an election starts then it should be in a higher term.
updateMaxTermSeen.accept(response.getCurrentTerm());
if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term()

View File

@ -156,7 +156,7 @@ public abstract class Publication extends AbstractComponent {
protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);
protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response);
protected abstract void onJoin(Join join);
protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
@ -287,8 +287,10 @@ public abstract class Publication extends AbstractComponent {
return;
}
// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
onPossibleJoin(discoveryNode, response);
response.getJoin().ifPresent(join -> {
assert discoveryNode.equals(join.getSourceNode());
onJoin(join);
});
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;

View File

@ -72,8 +72,8 @@ public abstract class PeerFinder extends AbstractComponent {
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();
PeerFinder(Settings settings, TransportService transportService,
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
this.transportService = transportService;
@ -95,6 +95,8 @@ public abstract class PeerFinder extends AbstractComponent {
leader = Optional.empty();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}
onFoundPeersUpdated(); // trigger a check for a quorum already
}
public void deactivate(DiscoveryNode leader) {
@ -116,7 +118,7 @@ public abstract class PeerFinder extends AbstractComponent {
return Thread.holdsLock(mutex);
}
boolean assertInactiveWithNoKnownPeers() {
private boolean assertInactiveWithNoKnownPeers() {
assert active == false;
assert peersByAddress.isEmpty() : peersByAddress.keySet();
return true;
@ -125,13 +127,24 @@ public abstract class PeerFinder extends AbstractComponent {
PeersResponse handlePeersRequest(PeersRequest peersRequest) {
synchronized (mutex) {
assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
final List<DiscoveryNode> knownPeers;
if (active) {
assert leader.isPresent() == false : leader;
startProbe(peersRequest.getSourceNode().getAddress());
peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm);
knownPeers = getFoundPeersUnderLock();
} else {
return new PeersResponse(leader, Collections.emptyList(), currentTerm);
assert leader.isPresent();
knownPeers = Collections.emptyList();
}
return new PeersResponse(leader, knownPeers, currentTerm);
}
}
// exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package)
public Optional<DiscoveryNode> getLeader() {
synchronized (mutex) {
return leader;
}
}
@ -247,7 +260,7 @@ public abstract class PeerFinder extends AbstractComponent {
@Override
public String toString() {
return "PeerFinder::handleWakeUp";
return "PeerFinder handling wakeup";
}
});

View File

@ -733,6 +733,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
"failed to notify channel of error message for action [{}]", action), inner);
}
}
@Override
public String toString() {
return "processing of [" + action + "][" + requestId + "]: " + request;
}
});
}
@ -1049,6 +1054,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
FutureUtils.cancel(future);
}
@Override
public String toString() {
return "TimeoutHandler for [" + action + "][" + requestId + "]";
}
}
static class TimeoutInfoHolder {
@ -1176,7 +1186,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (ThreadPool.Names.SAME.equals(executor)) {
processResponse(handler, response);
} else {
threadPool.executor(executor).execute(() -> processResponse(handler, response));
threadPool.executor(executor).execute(new Runnable() {
@Override
public String toString() {
return "delivery of response to [" + action + "][" + requestId + "]: " + response;
}
@Override
public void run() {
DirectResponseChannel.this.processResponse(handler, response);
}
});
}
}
}

View File

@ -162,7 +162,8 @@ public class ShrinkIndexIT extends ESIntegTestCase {
final List<Integer> factors = Arrays.asList(2, 3, 5, 7);
final List<Integer> numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size() - 1), factors);
final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y);
final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y);
final int numberOfTargetShards = randomSubsetOf(randomInt(numberOfShardsFactors.size() - 1), numberOfShardsFactors)
.stream().reduce(1, (x, y) -> x * y);
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get();

View File

@ -0,0 +1,466 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.COMMIT_STATE_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_STATE_ACTION_NAME;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {
public void testCanUpdateClusterStateAfterStabilisation() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
long finalValue = randomLong();
leader.submitValue(finalValue);
cluster.stabilise(); // TODO this should only need a short stabilisation
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
final ClusterState committedState = clusterNode.coordinator.getLastCommittedState().get();
assertThat(nodeId + " has the committed value", value(committedState), is(finalValue));
}
}
private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
class Cluster {
static final long DEFAULT_STABILISATION_TIME = 3000L; // TODO use a real stabilisation time - needs fault detection and disruption
final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;
Cluster(int initialNodeCount) {
logger.info("--> creating cluster of {} nodes", initialNodeCount);
Set<String> initialNodeIds = new HashSet<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) {
initialNodeIds.add(nodeIdFromIndex(i));
}
initialConfiguration = new VotingConfiguration(initialNodeIds);
clusterNodes = new ArrayList<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) {
final ClusterNode clusterNode = new ClusterNode(i);
clusterNodes.add(clusterNode);
}
}
void stabilise() {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) {
while (deterministicTaskQueue.hasRunnableTasks()) {
try {
deterministicTaskQueue.runRandomTask(random());
} catch (CoordinationStateRejectedException e) {
logger.debug("ignoring benign exception thrown when stabilising", e);
}
for (final ClusterNode clusterNode : clusterNodes) {
clusterNode.coordinator.invariant();
}
}
if (deterministicTaskQueue.hasDeferredTasks() == false) {
break; // TODO when fault detection is enabled this should be removed, as there should _always_ be deferred tasks
}
deterministicTaskQueue.advanceTime();
}
assertUniqueLeaderAndExpectedModes();
}
private void assertUniqueLeaderAndExpectedModes() {
final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();
Matcher<Optional<Long>> isPresentAndEqualToLeaderVersion
= equalTo(Optional.of(leader.coordinator.getLastAcceptedState().getVersion()));
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
for (final ClusterNode clusterNode : clusterNodes) {
if (clusterNode == leader) {
continue;
}
final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
assertTrue("leader should have received a vote from " + nodeId,
leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
}
}
ClusterNode getAnyLeader() {
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList());
assertThat(allLeaders, not(empty()));
return randomFrom(allLeaders);
}
class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;
private DiscoveryNode localNode;
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private CapturingTransport capturingTransport;
ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
setUp();
}
private DiscoveryNode createDiscoveryNode() {
final TransportAddress transportAddress = buildNewFakeTransportAddress();
// Generate the ephemeral ID deterministically, for repeatable tests. This means we have to pass everything else into the
// constructor explicitly too.
return new DiscoveryNode("", nodeIdFromIndex(nodeIndex), UUIDs.randomBase64UUID(random()),
transportAddress.address().getHostString(),
transportAddress.getAddress(), transportAddress, Collections.emptyMap(),
EnumSet.allOf(Role.class), Version.CURRENT);
}
private void setUp() {
capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
super.onSendRequest(requestId, action, request, destination);
// connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
final Consumer<Runnable> scheduler;
final Predicate<ClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
scheduler = Runnable::run;
matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress());
} else {
scheduler = deterministicTaskQueue::scheduleNow;
matchesDestination = n -> n.getLocalNode().equals(destination);
}
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of [" + action + "][" + requestId + "]: " + request;
}
@Override
public void run() {
clusterNodes.stream().filter(matchesDestination).findAny().ifPresent(
destinationNode -> {
final RequestHandlerRegistry requestHandler
= destinationNode.capturingTransport.getRequestHandler(action);
final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}
@Override
public String getChannelType() {
return "coordinator-test-channel";
}
@Override
public void sendResponse(final TransportResponse response) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of response " + response
+ " to [" + action + "][" + requestId + "]: " + request;
}
@Override
public void run() {
handleResponse(requestId, response);
}
});
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) {
sendResponse(response);
}
@Override
public void sendResponse(Exception exception) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of error response " + exception.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}
@Override
public void run() {
handleRemoteError(requestId, exception);
}
});
}
};
try {
processMessageReceived(request, requestHandler, transportChannel);
} catch (Exception e) {
scheduler.accept(new Runnable() {
@Override
public String toString() {
return "delivery of processing error response " + e.getMessage()
+ " to [" + action + "][" + requestId + "]: " + request;
}
@Override
public void run() {
handleRemoteError(requestId, e);
}
});
}
}
);
}
});
}
};
masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
AtomicReference<ClusterState> currentState = new AtomicReference<>(getPersistedState().getLastAcceptedState());
masterService.setClusterStateSupplier(currentState::get);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
final PublishRequest publishRequest;
try {
publishRequest = coordinator.coordinationState.get().handleClientValue(event.state());
} catch (CoordinationStateRejectedException e) {
publishListener.onFailure(new FailedToCommitClusterStateException("rejected client value", e));
return;
}
final Publication publication = new Publication(settings, publishRequest, ackListener,
deterministicTaskQueue::getCurrentTimeMillis) {
@Override
protected void onCompletion(boolean committed) {
if (committed) {
currentState.set(event.state());
publishListener.onResponse(null);
} else {
publishListener.onFailure(new FailedToCommitClusterStateException("not committed"));
}
}
@Override
protected boolean isPublishQuorum(VoteCollection votes) {
return coordinator.coordinationState.get().isPublishQuorum(votes);
}
@Override
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode,
PublishResponse publishResponse) {
return coordinator.coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
}
@Override
protected void onJoin(Join join) {
coordinator.handleJoin(join);
}
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, publishRequest,
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
public void handleResponse(PublishWithJoinResponse response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
@Override
public String executor() {
return Names.GENERIC;
}
});
}
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener) {
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommit,
new TransportResponseHandler<Empty>() {
@Override
public void handleResponse(Empty response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
@Override
public String executor() {
return Names.GENERIC;
}
});
}
};
publication.start(emptySet());
});
masterService.start();
transportService = capturingTransport.createCapturingTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts);
coordinator.start();
coordinator.startInitialJoin();
}
private PersistedState getPersistedState() {
return persistedState;
}
String getId() {
return localNode.getId();
}
public DiscoveryNode getLocalNode() {
return localNode;
}
boolean isLeader() {
return coordinator.getMode() == Coordinator.Mode.LEADER;
}
void submitValue(final long value) {
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
}
@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
});
}
}
private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
return clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList());
}
}
@SuppressWarnings("unchecked")
private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler,
TransportChannel transportChannel) throws Exception {
requestHandler.processMessageReceived(request, transportChannel);
}
}

View File

@ -326,7 +326,7 @@ public class DeterministicTaskQueue extends AbstractComponent {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
return false;
}
@Override

View File

@ -144,37 +144,39 @@ public class ElectionSchedulerFactoryTests extends ESTestCase {
{
final Settings settings = Settings.builder().put(ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), "0s").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_INITIAL_TIMEOUT_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [0s] for setting [cluster.election.initial_timeout] must be >= 1ms"));
assertThat(e.getMessage(), is("failed to parse value [0s] for setting [cluster.election.initial_timeout], must be >= [1ms]"));
}
{
final Settings settings = Settings.builder().put(ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), "10001ms").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_INITIAL_TIMEOUT_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [10001ms] for setting [cluster.election.initial_timeout] must be <= 10s"));
assertThat(e.getMessage(),
is("failed to parse value [10001ms] for setting [cluster.election.initial_timeout], must be <= [10s]"));
}
{
final Settings settings = Settings.builder().put(ELECTION_BACK_OFF_TIME_SETTING.getKey(), "0s").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_BACK_OFF_TIME_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [0s] for setting [cluster.election.back_off_time] must be >= 1ms"));
assertThat(e.getMessage(), is("failed to parse value [0s] for setting [cluster.election.back_off_time], must be >= [1ms]"));
}
{
final Settings settings = Settings.builder().put(ELECTION_BACK_OFF_TIME_SETTING.getKey(), "60001ms").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_BACK_OFF_TIME_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [60001ms] for setting [cluster.election.back_off_time] must be <= 1m"));
assertThat(e.getMessage(),
is("failed to parse value [60001ms] for setting [cluster.election.back_off_time], must be <= [60s]"));
}
{
final Settings settings = Settings.builder().put(ELECTION_MAX_TIMEOUT_SETTING.getKey(), "199ms").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_MAX_TIMEOUT_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [199ms] for setting [cluster.election.max_timeout] must be >= 200ms"));
assertThat(e.getMessage(), is("failed to parse value [199ms] for setting [cluster.election.max_timeout], must be >= [200ms]"));
}
{
final Settings settings = Settings.builder().put(ELECTION_MAX_TIMEOUT_SETTING.getKey(), "301s").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_MAX_TIMEOUT_SETTING.get(settings));
assertThat(e.getMessage(), is("Failed to parse value [301s] for setting [cluster.election.max_timeout] must be <= 5m"));
assertThat(e.getMessage(), is("failed to parse value [301s] for setting [cluster.election.max_timeout], must be <= [300s]"));
}
{

View File

@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Matchers.anyObject;
@ -135,6 +136,7 @@ public class NodeJoinTests extends ESTestCase {
this.masterService = masterService;
TransportService transportService = mock(TransportService.class);
when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode());
when(transportService.getThreadPool()).thenReturn(threadPool);
@SuppressWarnings("unchecked")
ArgumentCaptor<TransportRequestHandler<JoinRequest>> joinRequestHandler = ArgumentCaptor.forClass(
(Class) TransportRequestHandler.class);
@ -142,7 +144,7 @@ public class NodeJoinTests extends ESTestCase {
transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState));
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList());
verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false),
anyObject(), joinRequestHandler.capture());
transportRequestHandler = joinRequestHandler.getValue();

View File

@ -109,10 +109,12 @@ public class PreVoteCollectorTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
preVoteCollector = new PreVoteCollector(settings, getLocalPreVoteResponse(), transportService, () -> {
preVoteCollector = new PreVoteCollector(settings, transportService, () -> {
assert electionOccurred == false;
electionOccurred = true;
});
}, l -> {
}); // TODO need tests that check that the max term seen is updated
preVoteCollector.update(getLocalPreVoteResponse(), null);
}
private PreVoteResponse getLocalPreVoteResponse() {

View File

@ -100,7 +100,7 @@ public class PublicationTests extends ESTestCase {
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
Map<DiscoveryNode, PublishWithJoinResponse> possibleJoins = new HashMap<>();
Map<DiscoveryNode, Join> joins = new HashMap<>();
MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener,
LongSupplier currentTimeSupplier) {
@ -116,8 +116,8 @@ public class PublicationTests extends ESTestCase {
}
@Override
protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) {
assertNull(possibleJoins.put(sourceNode, response));
protected void onJoin(Join join) {
assertNull(joins.put(join.getSourceNode(), join));
}
@Override
@ -180,13 +180,17 @@ public class PublicationTests extends ESTestCase {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty());
assertFalse(publication.possibleJoins.containsKey(e.getKey()));
assertFalse(publication.joins.containsKey(e.getKey()));
PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse,
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong())));
e.getValue().onResponse(publishWithJoinResponse);
assertTrue(publication.possibleJoins.containsKey(e.getKey()));
assertEquals(publishWithJoinResponse, publication.possibleJoins.get(e.getKey()));
if (publishWithJoinResponse.getJoin().isPresent()) {
assertTrue(publication.joins.containsKey(e.getKey()));
assertEquals(publishWithJoinResponse.getJoin().get(), publication.joins.get(e.getKey()));
} else {
assertFalse(publication.joins.containsKey(e.getKey()));
}
if (e.getKey().equals(n1)) {
processedNode1PublishResponse.set(true);
}

View File

@ -799,7 +799,7 @@ public class SettingTests extends ESTestCase {
= expectThrows(IllegalArgumentException.class,
() -> settingWithLowerBound.get(Settings.builder().put("foo", "4999ms").build()));
assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [4999ms] for setting [foo] must be >= 5s"));
assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [4999ms] for setting [foo], must be >= [5s]"));
Setting<TimeValue> settingWithBothBounds = Setting.timeSetting("bar",
TimeValue.timeValueSeconds(10), TimeValue.timeValueSeconds(5), TimeValue.timeValueSeconds(20));
@ -810,12 +810,12 @@ public class SettingTests extends ESTestCase {
illegalArgumentException
= expectThrows(IllegalArgumentException.class,
() -> settingWithBothBounds.get(Settings.builder().put("bar", "4999ms").build()));
assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [4999ms] for setting [bar] must be >= 5s"));
assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [4999ms] for setting [bar], must be >= [5s]"));
illegalArgumentException
= expectThrows(IllegalArgumentException.class,
() -> settingWithBothBounds.get(Settings.builder().put("bar", "20001ms").build()));
assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [20001ms] for setting [bar] must be <= 20s"));
assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [20001ms] for setting [bar], must be <= [20s]"));
}
public void testSettingsGroupUpdater() {

View File

@ -84,7 +84,14 @@ public class FakeThreadPoolMasterService extends MasterService {
private void scheduleNextTaskIfNecessary() {
if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) {
scheduledNextTask = true;
onTaskAvailableToRun.accept(() -> {
onTaskAvailableToRun.accept(new Runnable() {
@Override
public String toString() {
return "master service scheduling next task";
}
@Override
public void run() {
assert taskInProgress == false;
assert waitForPublish == false;
assert scheduledNextTask;
@ -97,7 +104,8 @@ public class FakeThreadPoolMasterService extends MasterService {
if (waitForPublish == false) {
taskInProgress = false;
}
scheduleNextTaskIfNecessary();
FakeThreadPoolMasterService.this.scheduleNextTaskIfNecessary();
}
});
}
}

View File

@ -373,6 +373,9 @@ public class MonitoringIT extends ESSingleNodeTestCase {
assertThat(clusterState.remove("cluster_uuid"), notNullValue());
assertThat(clusterState.remove("master_node"), notNullValue());
assertThat(clusterState.remove("nodes"), notNullValue());
assertThat(clusterState.remove("term"), notNullValue());
assertThat(clusterState.remove("last_committed_config"), notNullValue());
assertThat(clusterState.remove("last_accepted_config"), notNullValue());
assertThat(clusterState.keySet(), empty());
}