Zen2: Add Cluster State Applier (#34257)

Adds the cluster state applier to Coordinator, and adds tests for cluster state acking.
This commit is contained in:
Yannick Welsch 2018-10-04 20:33:28 +02:00 committed by GitHub
parent c6b0f08472
commit b32abcbd00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 694 additions and 195 deletions

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -30,6 +31,8 @@ import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
@ -61,6 +64,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
public class Coordinator extends AbstractLifecycleComponent implements Discovery {
// the timeout for the publication of each value
@ -77,7 +83,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
final Object mutex = new Object();
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile Optional<ClusterState> lastCommittedState = Optional.empty();
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier
private final PeerFinder peerFinder;
private final PreVoteCollector preVoteCollector;
@ -87,6 +93,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
private final ClusterApplier clusterApplier;
@Nullable
private Releasable electionScheduler;
@Nullable
@ -99,11 +106,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<DiscoveryNode> lastKnownLeader;
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<Publication> currentPublication = Optional.empty();
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier,
UnicastHostsProvider unicastHostsProvider, Random random) {
UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) {
super(settings);
this.transportService = transportService;
this.masterService = masterService;
@ -119,10 +126,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest,
this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
}
@ -168,13 +177,31 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
coordinationState.get().handleCommit(applyCommitRequest);
lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState());
// TODO: send to applier
final ClusterState committedState = coordinationState.get().getLastAcceptedState();
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState;
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// master node applies the committed state at the end of the publication process, not here.
applyListener.onResponse(null);
} else {
clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
public void onFailure(String source, Exception e) {
applyListener.onFailure(e);
}
@Override
public void onSuccess(String source) {
applyListener.onResponse(null);
}
});
}
}
}
@ -233,7 +260,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// do check for this after the publication completes)
}
private void startElection() {
// TODO: make private again after removing term-bump workaround
void startElection() {
synchronized (mutex) {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
@ -245,7 +273,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
// TODO: make private again after removing term-bump workaround
Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
@ -307,6 +336,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
});
}
}
preVoteCollector.update(getPreVoteResponse(), null);
@ -393,10 +428,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void doStart() {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(NO_MASTER_BLOCK_WRITES)) // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL
.nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId()))
.build();
applierState = initialState;
clusterApplier.setInitialState(initialState);
}
}
@Override
@ -427,6 +472,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert peerFinder.getCurrentTerm() == getCurrentTerm();
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
@ -438,17 +484,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert prevotingRound == null : prevotingRound;
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
final Set<DiscoveryNode> knownFollowers = followersChecker.getKnownFollowers();
final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>();
if (becomingMaster == false || publicationInProgress()) {
final ClusterState lastPublishedState
= currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState());
final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
// cluster state update task to become master is submitted to MasterService, but publication has not started yet
assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers();
} else {
final ClusterState lastPublishedState;
if (activePublication) {
// active publication in progress: followersChecker is up-to-date with nodes that we're actively publishing to
lastPublishedState = currentPublication.get().publishedState();
} else {
// no active publication: followersChecker is up-to-date with the nodes of the latest publication
lastPublishedState = coordinationState.get().getLastAcceptedState();
}
final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>();
lastPublishedState.nodes().forEach(lastPublishedNodes::add);
assert lastPublishedNodes.remove(getLocalNode());
assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers
+ " [becomingMaster=" + becomingMaster + ", publicationInProgress=" + publicationInProgress() + "]";
// TODO instead assert that knownFollowers is updated appropriately at the end of each publication
assert lastPublishedNodes.remove(getLocalNode()); // followersChecker excludes local node
assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) :
lastPublishedNodes + " != " + followersChecker.getKnownFollowers();
}
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
@ -461,6 +516,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
@ -470,6 +526,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert followersChecker.getKnownFollowers().isEmpty();
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
}
}
}
@ -503,8 +561,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
public Optional<ClusterState> getLastCommittedState() {
return lastCommittedState;
@Nullable
public ClusterState getApplierState() {
return applierState;
}
private List<DiscoveryNode> getDiscoveredNodes() {
@ -534,7 +593,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
"NO_MASTER_BLOCK should only be added by Coordinator";
// TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL
final ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()).addGlobalBlock(
DiscoverySettings.NO_MASTER_BLOCK_WRITES).build();
NO_MASTER_BLOCK_WRITES).build();
final DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
return ClusterState.builder(clusterState).blocks(clusterBlocks).nodes(discoveryNodes).build();
} else {
@ -575,105 +634,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
getLocalNode() + " should be in published " + clusterState;
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final ListenableFuture<Void> localNodeAckEvent = new ListenableFuture<>();
final AckListener wrappedAckListener = new AckListener() {
@Override
public void onCommit(TimeValue commitTime) {
ackListener.onCommit(commitTime);
}
@Override
public void onNodeAck(DiscoveryNode node, Exception e) {
// acking and cluster state application for local node is handled specially
if (node.equals(getLocalNode())) {
synchronized (mutex) {
if (e == null) {
localNodeAckEvent.onResponse(null);
} else {
localNodeAckEvent.onFailure(e);
}
}
} else {
ackListener.onNodeAck(node, e);
}
}
};
final Publication publication = new Publication(settings, publishRequest, wrappedAckListener,
transportService.getThreadPool()::relativeTimeInMillis) {
@Override
protected void onCompletion(boolean committed) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert currentPublication.get() == this;
currentPublication = Optional.empty();
updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication
localNodeAckEvent.addListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignore) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert committed;
// TODO: send to applier
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
@Override
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (publishRequest.getAcceptedState().term() == coordinationState.get().getCurrentTerm() &&
publishRequest.getAcceptedState().version() == coordinationState.get().getLastPublishedVersion()) {
becomeCandidate("Publication.onCompletion(false)");
}
FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException(
"publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, EsExecutors.newDirectExecutorService());
}
@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
return coordinationState.get().isPublishQuorum(votes);
}
@Override
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode,
PublishResponse publishResponse) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert getCurrentTerm() >= publishResponse.getTerm();
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
}
@Override
protected void onJoin(Join join) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (join.getTerm() == getCurrentTerm()) {
handleJoin(join);
}
// TODO: what to do on missing join?
}
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
}
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener) {
publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
}
};
assert currentPublication.isPresent() == false
: "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']';
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener,
publishListener);
currentPublication = Optional.of(publication);
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@ -722,7 +684,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) {
currentPublication.get().onTimeout();
assert currentPublication.isPresent() == false;
}
}
@ -784,4 +745,144 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
}
class CoordinatorPublication extends Publication {
private final PublishRequest publishRequest;
private final ListenableFuture<Void> localNodeAckEvent;
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
CoordinatorPublication(PublishRequest publishRequest, ListenableFuture<Void> localNodeAckEvent, AckListener ackListener,
ActionListener<Void> publishListener) {
super(Coordinator.this.settings, publishRequest,
new AckListener() {
@Override
public void onCommit(TimeValue commitTime) {
ackListener.onCommit(commitTime);
}
@Override
public void onNodeAck(DiscoveryNode node, Exception e) {
// acking and cluster state application for local node is handled specially
if (node.equals(getLocalNode())) {
synchronized (mutex) {
if (e == null) {
localNodeAckEvent.onResponse(null);
} else {
localNodeAckEvent.onFailure(e);
}
}
} else {
ackListener.onNodeAck(node, e);
}
}
},
transportService.getThreadPool()::relativeTimeInMillis);
this.publishRequest = publishRequest;
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;
}
private void removePublicationAndPossiblyBecomeCandidate(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert currentPublication.get() == this;
currentPublication = Optional.empty();
// check if node has not already switched modes (by bumping term)
if (isActiveForCurrentLeader()) {
becomeCandidate(reason);
}
}
boolean isActiveForCurrentLeader() {
// checks if this publication can still influence the mode of the current publication
return mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm();
}
@Override
protected void onCompletion(boolean committed) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
localNodeAckEvent.addListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignore) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert committed;
clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
public void onFailure(String source, Exception e) {
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
}
@Override
public void onSuccess(String source) {
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
currentPublication = Optional.empty();
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());
}
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
});
}
@Override
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException(
"publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, EsExecutors.newDirectExecutorService());
}
@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
return coordinationState.get().isPublishQuorum(votes);
}
@Override
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode,
PublishResponse publishResponse) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert getCurrentTerm() >= publishResponse.getTerm();
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
}
@Override
protected void onJoin(Join join) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (join.getTerm() == getCurrentTerm()) {
handleJoin(join);
}
// TODO: what to do on missing join?
}
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
}
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener) {
publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener));
}
}
}

View File

@ -91,6 +91,10 @@ public abstract class Publication extends AbstractComponent {
onPossibleCompletion();
}
public boolean isCommitted() {
return applyCommitRequest.isPresent();
}
private void onPossibleCompletion() {
if (isCompleted) {
return;

View File

@ -20,7 +20,9 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
@ -29,19 +31,20 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
public class PublicationTransportHandler {
public class PublicationTransportHandler extends AbstractComponent {
public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
private final TransportService transportService;
public PublicationTransportHandler(TransportService transportService,
public PublicationTransportHandler(Settings settings, TransportService transportService,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
Consumer<ApplyCommitRequest> handleApplyCommit) {
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
super(settings);
this.transportService = transportService;
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
@ -50,10 +53,27 @@ public class PublicationTransportHandler {
transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
ApplyCommitRequest::new,
(request, channel, task) -> {
handleApplyCommit.accept(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
(request, channel, task) -> handleApplyCommit.accept(request, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
logger.debug("failed to send response on commit", e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException ie) {
e.addSuppressed(ie);
logger.debug("failed to send response on commit", e);
}
}
}));
}
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,

View File

@ -20,21 +20,24 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -59,7 +62,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
@ -84,6 +90,7 @@ import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@ -109,8 +116,8 @@ public class CoordinatorTests extends ESTestCase {
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
final ClusterState committedState = clusterNode.coordinator.getLastCommittedState().get();
assertThat(nodeId + " has the committed value", value(committedState), is(finalValue));
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
}
}
@ -256,6 +263,138 @@ public class CoordinatorTests extends ESTestCase {
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
public void testAckListenerReceivesAcksFromAllNodes() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
for (final ClusterNode clusterNode : cluster.clusterNodes) {
assertTrue("expected ack from " + clusterNode, ackCollector.hasAckedSuccessfully(clusterNode));
}
assertThat("leader should be last to ack", ackCollector.getSuccessfulAckIndex(leader), equalTo(cluster.clusterNodes.size() - 1));
}
public void testAckListenerReceivesNackFromFollower() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertTrue("expected ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
assertThat("leader should be last to ack", ackCollector.getSuccessfulAckIndex(leader), equalTo(1));
}
public void testAckListenerReceivesNackFromLeader() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
final long startingTerm = leader.coordinator.getCurrentTerm();
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm);
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED);
cluster.stabilise();
assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
assertTrue("expected ack from " + follower0, ackCollector.hasAckedSuccessfully(follower0));
assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm);
}
public void testAckListenerReceivesNoAckFromHangingFollower() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
cluster.stabilise();
assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0));
}
public void testAckListenerReceivesNacksIfPublicationTimesOut() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
follower0.blackhole();
follower1.blackhole();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader));
assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0));
assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1));
follower0.heal();
follower1.heal();
cluster.stabilise();
assertTrue("expected eventual nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
assertTrue("expected eventual nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
assertTrue("expected eventual nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
}
public void testAckListenerReceivesNacksIfLeaderStandsDown() {
// TODO: needs support for handling disconnects
// final Cluster cluster = new Cluster(3);
// cluster.runRandomly();
// cluster.stabilise();
// final ClusterNode leader = cluster.getAnyLeader();
// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
//
// leader.partition();
// follower0.coordinator.handleDisconnectedNode(leader.localNode);
// follower1.coordinator.handleDisconnectedNode(leader.localNode);
// cluster.runUntil(cluster.getCurrentTimeMillis() + cluster.DEFAULT_ELECTION_TIME);
// AckCollector ackCollector = leader.submitRandomValue();
// cluster.runUntil(cluster.currentTimeMillis + Cluster.DEFAULT_DELAY_VARIABILITY);
// leader.connectionStatus = ConnectionStatus.CONNECTED;
// cluster.stabilise(cluster.DEFAULT_STABILISATION_TIME, 0L);
// assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
// assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
}
public void testAckListenerReceivesNacksFromFollowerInHigherTerm() {
// TODO: needs proper term bumping
// final Cluster cluster = new Cluster(3);
// cluster.runRandomly();
// cluster.stabilise();
// final ClusterNode leader = cluster.getAnyLeader();
// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
//
// follower0.coordinator.joinLeaderInTerm(new StartJoinRequest(follower0.localNode, follower0.coordinator.getCurrentTerm() + 1));
// AckCollector ackCollector = leader.submitValue(randomLong());
// cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
// assertTrue("expected ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -264,13 +403,15 @@ public class CoordinatorTests extends ESTestCase {
return setting.get(Settings.EMPTY);
}
// Updating the cluster state involves up to 5 delays:
// Updating the cluster state involves up to 7 delays:
// 1. submit the task to the master service
// 2. send PublishRequest
// 3. receive PublishResponse
// 4. send ApplyCommitRequest
// 5. receive ApplyCommitResponse and apply committed state
private static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = 5 * DEFAULT_DELAY_VARIABILITY;
// 5. apply committed cluster state
// 6. receive ApplyCommitResponse
// 7. apply committed state on master (last one to apply cluster state)
private static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = 7 * DEFAULT_DELAY_VARIABILITY;
private static final int ELECTION_RETRIES = 10;
@ -383,8 +524,8 @@ public class CoordinatorTests extends ESTestCase {
switch (randomInt(2)) {
case 0:
if (clusterNode.connect()) {
logger.debug("----> [runRandomly {}] connecting {}", step, clusterNode.getId());
if (clusterNode.heal()) {
logger.debug("----> [runRandomly {}] healing {}", step, clusterNode.getId());
}
break;
case 1:
@ -431,15 +572,13 @@ public class CoordinatorTests extends ESTestCase {
private void updateCommittedStates() {
for (final ClusterNode clusterNode : clusterNodes) {
Optional<ClusterState> committedState = clusterNode.coordinator.getLastCommittedState();
if (committedState.isPresent()) {
ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion());
if (storedState == null) {
committedStatesByVersion.put(committedState.get().getVersion(), committedState.get());
} else {
assertEquals("expected " + committedState.get() + " but got " + storedState,
value(committedState.get()), value(storedState));
}
ClusterState applierState = clusterNode.coordinator.getApplierState();
ClusterState storedState = committedStatesByVersion.get(applierState.getVersion());
if (storedState == null) {
committedStatesByVersion.put(applierState.getVersion(), applierState);
} else {
assertEquals("expected " + applierState + " but got " + storedState,
value(applierState), value(storedState));
}
}
}
@ -449,11 +588,34 @@ public class CoordinatorTests extends ESTestCase {
}
void stabilise(long stabiliationDurationMillis) {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
final long stabilisationEndTime = stabilisationStartTime + stabiliationDurationMillis;
logger.info("--> stabilising until [{}ms]", stabilisationEndTime);
logger.info("--> stabilising until [{}ms]", deterministicTaskQueue.getCurrentTimeMillis() + stabiliationDurationMillis);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
runFor(stabiliationDurationMillis);
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationEndTime) {
// TODO remove when term-bumping is enabled
final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
final long maxLeaderTerm = clusterNodes.stream().filter(n -> n.coordinator.getMode() == Coordinator.Mode.LEADER)
.map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
if (maxLeaderTerm < maxTerm) {
logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm);
final ClusterNode leader = getAnyLeader();
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
}
leader.coordinator.startElection();
logger.info("--> re-stabilising after term bump until [{}ms]",
deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_ELECTION_DELAY);
runFor(DEFAULT_ELECTION_DELAY);
}
assertUniqueLeaderAndExpectedModes();
}
void runFor(long runDurationMillis) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) {
while (deterministicTaskQueue.hasRunnableTasks()) {
try {
@ -464,6 +626,7 @@ public class CoordinatorTests extends ESTestCase {
for (final ClusterNode clusterNode : clusterNodes) {
clusterNode.coordinator.invariant();
}
updateCommittedStates();
}
if (deterministicTaskQueue.hasDeferredTasks() == false) {
@ -474,12 +637,6 @@ public class CoordinatorTests extends ESTestCase {
deterministicTaskQueue.advanceTime();
}
for (ClusterNode clusterNode : clusterNodes) {
assert clusterNode.coordinator.publicationInProgress() == false;
}
assertUniqueLeaderAndExpectedModes();
}
private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
@ -491,40 +648,37 @@ public class CoordinatorTests extends ESTestCase {
private void assertUniqueLeaderAndExpectedModes() {
final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();
Matcher<Optional<Long>> isPresentAndEqualToLeaderVersion
= equalTo(Optional.of(leader.coordinator.getLastAcceptedState().getVersion()));
Matcher<Long> isPresentAndEqualToLeaderVersion
= equalTo(leader.coordinator.getLastAcceptedState().getVersion());
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(leader.getId())),
equalTo(Optional.of(true)));
assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId()));
assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion);
for (final ClusterNode clusterNode : clusterNodes) {
final String nodeId = clusterNode.getId();
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());
if (clusterNode == leader) {
continue;
}
final String nodeId = clusterNode.getId();
if (isConnectedPair(leader, clusterNode)) {
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that this node has actually voted for the leader in this term
// TODO assert that this node's accepted and committed states are the same as the leader's
assertThat(nodeId + " is in the leader's committed state",
leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
assertTrue(nodeId + " is in the leader's applied state",
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
} else {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " is not in the leader's committed state",
leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(false)));
assertFalse(nodeId + " is not in the leader's applied state",
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
}
}
int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count());
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
equalTo(Optional.of(connectedNodeCount)));
assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeCount));
}
ClusterNode getAnyLeader() {
@ -572,9 +726,11 @@ public class CoordinatorTests extends ESTestCase {
private Coordinator coordinator;
private DiscoveryNode localNode;
private final PersistedState persistedState;
private MasterService masterService;
private FakeClusterApplier clusterApplier;
private AckedFakeThreadPoolMasterService masterService;
private TransportService transportService;
private DisruptableMockTransport mockTransport;
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
@ -640,13 +796,15 @@ public class CoordinatorTests extends ESTestCase {
}
};
masterService = new FakeThreadPoolMasterService("test",
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterApplier = new FakeClusterApplier(settings, clusterSettings);
masterService = new AckedFakeThreadPoolMasterService("test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(localNode, runnable)));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get());
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
transportService.start();
@ -672,18 +830,44 @@ public class CoordinatorTests extends ESTestCase {
return coordinator.getMode() == LEADER;
}
void submitValue(final long value) {
onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
}
void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) {
this.clusterStateApplyResponse = clusterStateApplyResponse;
}
@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
})).run();
AckCollector submitValue(final long value) {
return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value));
}
AckCollector submitUpdateTask(String source, UnaryOperator<ClusterState> clusterStateUpdate) {
final AckCollector ackCollector = new AckCollector();
onNode(localNode, () -> {
logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source);
final long submittedTerm = coordinator.getCurrentTerm();
masterService.submitStateUpdateTask(source,
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
assertThat(currentState.term(), greaterThanOrEqualTo(submittedTerm));
masterService.nextAckCollector = ackCollector;
return clusterStateUpdate.apply(currentState);
}
@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateCommittedStates();
ClusterState state = committedStatesByVersion.get(newState.version());
assertNotNull("State not committed : " + newState.toString(), state);
assertEquals(value(state), value(newState));
logger.trace("successfully published: [{}]", newState);
}
});
}).run();
return ackCollector;
}
@Override
@ -691,7 +875,7 @@ public class CoordinatorTests extends ESTestCase {
return localNode.toString();
}
boolean connect() {
boolean heal() {
boolean unBlackholed = blackholedNodes.remove(localNode.getId());
boolean unDisconnected = disconnectedNodes.remove(localNode.getId());
assert unBlackholed == false || unDisconnected == false;
@ -711,6 +895,88 @@ public class CoordinatorTests extends ESTestCase {
assert blackholed || unDisconnected == false;
return blackholed;
}
ClusterState getLastAppliedClusterState() {
return clusterApplier.lastAppliedClusterState;
}
private class FakeClusterApplier implements ClusterApplier {
final ClusterName clusterName;
private final ClusterSettings clusterSettings;
ClusterState lastAppliedClusterState;
private FakeClusterApplier(Settings settings, ClusterSettings clusterSettings) {
clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings = clusterSettings;
}
@Override
public void setInitialState(ClusterState initialState) {
assert lastAppliedClusterState == null;
assert initialState != null;
lastAppliedClusterState = initialState;
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
switch (clusterStateApplyResponse) {
case SUCCEED:
deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() {
@Override
public void run() {
final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState;
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
final Settings incomingSettings = newClusterState.metaData().settings();
clusterSettings.applySettings(incomingSettings); // TODO validation might throw exceptions here.
listener.onSuccess(source);
}
@Override
public String toString() {
return "apply cluster state from [" + source + "]";
}
}));
break;
case FAIL:
deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() {
@Override
public void run() {
listener.onFailure(source, new ElasticsearchException("cluster state application failed"));
}
@Override
public String toString() {
return "fail to apply cluster state from [" + source + "]";
}
}));
break;
case HANG:
if (randomBoolean()) {
deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() {
@Override
public void run() {
final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState;
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() :
"updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
}
@Override
public String toString() {
return "apply cluster state from [" + source + "] without ack";
}
}));
}
break;
}
}
}
}
private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
@ -734,4 +1000,92 @@ public class CoordinatorTests extends ESTestCase {
}
};
}
static class AckCollector implements AckListener {
private final Set<DiscoveryNode> ackedNodes = new HashSet<>();
private final List<DiscoveryNode> successfulNodes = new ArrayList<>();
private final List<DiscoveryNode> unsuccessfulNodes = new ArrayList<>();
@Override
public void onCommit(TimeValue commitTime) {
// TODO we only currently care about per-node acks
}
@Override
public void onNodeAck(DiscoveryNode node, Exception e) {
assertTrue("duplicate ack from " + node, ackedNodes.add(node));
if (e == null) {
successfulNodes.add(node);
} else {
unsuccessfulNodes.add(node);
}
}
boolean hasAckedSuccessfully(ClusterNode clusterNode) {
return successfulNodes.contains(clusterNode.localNode);
}
boolean hasAckedUnsuccessfully(ClusterNode clusterNode) {
return unsuccessfulNodes.contains(clusterNode.localNode);
}
boolean hasAcked(ClusterNode clusterNode) {
return ackedNodes.contains(clusterNode.localNode);
}
int getSuccessfulAckIndex(ClusterNode clusterNode) {
assert successfulNodes.contains(clusterNode.localNode) : "get index of " + clusterNode;
return successfulNodes.indexOf(clusterNode.localNode);
}
}
static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterService {
AckCollector nextAckCollector = new AckCollector();
AckedFakeThreadPoolMasterService(String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
super(serviceName, onTaskAvailableToRun);
}
@Override
protected AckListener wrapAckListener(AckListener ackListener) {
final AckCollector ackCollector = nextAckCollector;
nextAckCollector = new AckCollector();
return new AckListener() {
@Override
public void onCommit(TimeValue commitTime) {
ackCollector.onCommit(commitTime);
ackListener.onCommit(commitTime);
}
@Override
public void onNodeAck(DiscoveryNode node, Exception e) {
ackCollector.onNodeAck(node, e);
ackListener.onNodeAck(node, e);
}
};
}
}
/**
* How to behave with a new cluster state
*/
enum ClusterStateApplyResponse {
/**
* Apply the state (default)
*/
SUCCEED,
/**
* Reject the state with an exception.
*/
FAIL,
/**
* Never respond either way.
*/
HANG,
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTests;
import org.elasticsearch.common.Randomness;
@ -63,6 +64,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -162,7 +164,9 @@ public class NodeJoinTests extends ESTestCase {
transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), random);
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
random);
transportService.start();
transportService.acceptIncomingRequests();
transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME);
@ -515,4 +519,16 @@ public class NodeJoinTests extends ESTestCase {
private boolean clusterStateHasNode(DiscoveryNode node) {
return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId()));
}
private static class NoOpClusterApplier implements ClusterApplier {
@Override
public void setInitialState(ClusterState initialState) {
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
@ -28,7 +29,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -120,7 +120,7 @@ public class FakeThreadPoolMasterService extends MasterService {
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
assert waitForPublish == false;
waitForPublish = true;
final Discovery.AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() {
private boolean listenerCalled = false;
@ -152,6 +152,10 @@ public class FakeThreadPoolMasterService extends MasterService {
scheduleNextTaskIfNecessary();
}
}
}, ackListener);
}, wrapAckListener(ackListener));
}
protected AckListener wrapAckListener(AckListener ackListener) {
return ackListener;
}
}