Zen2: Cluster state publication pipeline (#32584)

Implements the state machine on the master to publish a cluster state.

Relates to #32006
This commit is contained in:
Yannick Welsch 2018-08-07 14:51:46 +02:00 committed by GitHub
parent f44ba04aee
commit 785b6e824c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 941 additions and 88 deletions

View File

@ -0,0 +1,348 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.Discovery.AckListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
public abstract class Publication extends AbstractComponent {
private final List<PublicationTarget> publicationTargets;
private final PublishRequest publishRequest;
private final AckListener ackListener;
private final LongSupplier currentTimeSupplier;
private final long startTime;
private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
private boolean isCompleted; // set when publication is completed
private boolean timedOut; // set when publication timed out
public Publication(Settings settings, PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
super(settings);
this.publishRequest = publishRequest;
this.ackListener = ackListener;
this.currentTimeSupplier = currentTimeSupplier;
startTime = currentTimeSupplier.getAsLong();
applyCommitRequest = Optional.empty();
publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size());
publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n)));
}
public void start(Set<DiscoveryNode> faultyNodes) {
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
for (final DiscoveryNode faultyNode : faultyNodes) {
onFaultyNode(faultyNode);
}
onPossibleCommitFailure();
publicationTargets.forEach(PublicationTarget::sendPublishRequest);
}
public void onTimeout() {
assert timedOut == false;
timedOut = true;
if (applyCommitRequest.isPresent() == false) {
logger.debug("onTimeout: [{}] timed out before committing", this);
// fail all current publications
final Exception e = new ElasticsearchException("publication timed out before committing");
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
}
onPossibleCompletion();
}
public void onFaultyNode(DiscoveryNode faultyNode) {
publicationTargets.forEach(t -> t.onFaultyNode(faultyNode));
onPossibleCompletion();
}
private void onPossibleCompletion() {
if (isCompleted) {
return;
}
if (timedOut == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return;
}
}
}
if (applyCommitRequest.isPresent() == false) {
logger.debug("onPossibleCompletion: [{}] commit failed", this);
assert isCompleted == false;
isCompleted = true;
onCompletion(false);
return;
}
assert isCompleted == false;
isCompleted = true;
onCompletion(true);
assert applyCommitRequest.isPresent();
logger.trace("onPossibleCompletion: [{}] was successful", this);
}
// For assertions only: verify that this invariant holds
private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() {
if (timedOut == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return isCompleted == false;
}
}
}
return isCompleted;
}
private void onPossibleCommitFailure() {
if (applyCommitRequest.isPresent()) {
onPossibleCompletion();
return;
}
final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection();
for (PublicationTarget publicationTarget : publicationTargets) {
if (publicationTarget.mayCommitInFuture()) {
possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode);
} else {
assert publicationTarget.isFailed() : publicationTarget;
}
}
if (isPublishQuorum(possiblySuccessfulNodes) == false) {
logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed",
possiblySuccessfulNodes, this);
Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum");
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
onPossibleCompletion();
}
}
protected abstract void onCompletion(boolean committed);
protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes);
protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);
protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response);
protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
protected abstract void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<TransportResponse.Empty> responseActionListener);
@Override
public String toString() {
return "Publication{term=" + publishRequest.getAcceptedState().term() +
", version=" + publishRequest.getAcceptedState().version() + '}';
}
enum PublicationTargetState {
NOT_STARTED,
FAILED,
SENT_PUBLISH_REQUEST,
WAITING_FOR_QUORUM,
SENT_APPLY_COMMIT,
APPLIED_COMMIT,
}
class PublicationTarget {
private final DiscoveryNode discoveryNode;
private boolean ackIsPending = true;
private PublicationTargetState state = PublicationTargetState.NOT_STARTED;
PublicationTarget(DiscoveryNode discoveryNode) {
this.discoveryNode = discoveryNode;
}
@Override
public String toString() {
return "PublicationTarget{" +
"discoveryNode=" + discoveryNode +
", state=" + state +
", ackIsPending=" + ackIsPending +
'}';
}
void sendPublishRequest() {
if (isFailed()) {
return;
}
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST;
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
// TODO Can this ^ fail with an exception? Target should be failed if so.
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
void handlePublishResponse(PublishResponse publishResponse) {
assert isWaitingForQuorum() : this;
logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode);
if (applyCommitRequest.isPresent()) {
sendApplyCommit();
} else {
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
assert applyCommitRequest.isPresent() == false;
applyCommitRequest = Optional.of(applyCommit);
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit);
});
}
}
void sendApplyCommit() {
assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT;
state = PublicationTargetState.SENT_APPLY_COMMIT;
assert applyCommitRequest.isPresent();
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
void setAppliedCommit() {
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT;
state = PublicationTargetState.APPLIED_COMMIT;
ackOnce(null);
}
void setFailed(Exception e) {
assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED;
state = PublicationTargetState.FAILED;
ackOnce(e);
}
void onFaultyNode(DiscoveryNode faultyNode) {
if (isActive() && discoveryNode.equals(faultyNode)) {
logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this);
setFailed(new ElasticsearchException("faulty node"));
onPossibleCommitFailure();
}
}
private void ackOnce(Exception e) {
if (ackIsPending) {
ackIsPending = false;
ackListener.onNodeAck(discoveryNode, e);
}
}
boolean isActive() {
return state != PublicationTargetState.FAILED
&& state != PublicationTargetState.APPLIED_COMMIT;
}
boolean isWaitingForQuorum() {
return state == PublicationTargetState.WAITING_FOR_QUORUM;
}
boolean mayCommitInFuture() {
return (state == PublicationTargetState.NOT_STARTED
|| state == PublicationTargetState.SENT_PUBLISH_REQUEST
|| state == PublicationTargetState.WAITING_FOR_QUORUM);
}
boolean isFailed() {
return state == PublicationTargetState.FAILED;
}
private class PublishResponseHandler implements ActionListener<PublishWithJoinResponse> {
@Override
public void onResponse(PublishWithJoinResponse response) {
if (isFailed()) {
logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode);
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
return;
}
// TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join.
onPossibleJoin(discoveryNode, response);
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;
handlePublishResponse(response.getPublishResponse());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
@Override
public void onFailure(Exception e) {
assert e instanceof TransportException;
final TransportException exp = (TransportException) e;
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("PublishResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage());
} else {
logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp);
}
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCommitFailure();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
}
private class ApplyCommitResponseHandler implements ActionListener<TransportResponse.Empty> {
@Override
public void onResponse(TransportResponse.Empty ignored) {
if (isFailed()) {
logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]",
discoveryNode);
return;
}
setAppliedCommit();
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
@Override
public void onFailure(Exception e) {
assert e instanceof TransportException;
final TransportException exp = (TransportException) e;
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("ApplyCommitResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage());
} else {
logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp);
}
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
}
}
}
}

View File

@ -20,25 +20,43 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException; import java.io.IOException;
/** /**
* Response to a {@link PublishRequest}, carrying the term and version of the request. * Response to a {@link PublishRequest}, carrying the term and version of the request.
* Typically wrapped in a {@link PublishWithJoinResponse}.
*/ */
public class PublishResponse extends TermVersionResponse { public class PublishResponse implements Writeable {
protected final long term;
protected final long version;
public PublishResponse(long term, long version) { public PublishResponse(long term, long version) {
super(term, version); assert term >= 0;
assert version >= 0;
this.term = term;
this.version = version;
} }
public PublishResponse(StreamInput in) throws IOException { public PublishResponse(StreamInput in) throws IOException {
super(in); this(in.readLong(), in.readLong());
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); out.writeLong(term);
out.writeLong(version);
}
public long getTerm() {
return term;
}
public long getVersion() {
return version;
} }
@Override @Override
@ -48,4 +66,22 @@ public class PublishResponse extends TermVersionResponse {
", version=" + version + ", version=" + version +
'}'; '}';
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PublishResponse response = (PublishResponse) o;
if (term != response.term) return false;
return version == response.version;
}
@Override
public int hashCode() {
int result = (int) (term ^ (term >>> 32));
result = 31 * result + (int) (version ^ (version >>> 32));
return result;
}
} }

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Optional;
/**
* Response to a {@link PublishRequest}. Encapsulates both a {@link PublishResponse}
* and an optional {@link Join}.
*/
public class PublishWithJoinResponse extends TransportResponse {
private final PublishResponse publishResponse;
private final Optional<Join> optionalJoin;
public PublishWithJoinResponse(PublishResponse publishResponse, Optional<Join> optionalJoin) {
this.publishResponse = publishResponse;
this.optionalJoin = optionalJoin;
}
public PublishWithJoinResponse(StreamInput in) throws IOException {
this.publishResponse = new PublishResponse(in);
this.optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
publishResponse.writeTo(out);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
public PublishResponse getPublishResponse() {
return publishResponse;
}
public Optional<Join> getJoin() {
return optionalJoin;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PublishWithJoinResponse)) return false;
PublishWithJoinResponse that = (PublishWithJoinResponse) o;
if (!publishResponse.equals(that.publishResponse)) return false;
return optionalJoin.equals(that.optionalJoin);
}
@Override
public int hashCode() {
int result = publishResponse.hashCode();
result = 31 * result + optionalJoin.hashCode();
return result;
}
@Override
public String toString() {
return "PublishWithJoinResponse{" +
"publishResponse=" + publishResponse +
", optionalJoin=" + optionalJoin +
'}';
}
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
abstract class TermVersionResponse extends TransportResponse {
protected final long term;
protected final long version;
TermVersionResponse(long term, long version) {
assert term >= 0;
assert version >= 0;
this.term = term;
this.version = version;
}
TermVersionResponse(StreamInput in) throws IOException {
this(in.readLong(), in.readLong());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(term);
out.writeLong(version);
}
public long getTerm() {
return term;
}
public long getVersion() {
return version;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TermVersionResponse response = (TermVersionResponse) o;
if (term != response.term) return false;
return version == response.version;
}
@Override
public int hashCode() {
int result = (int) (term ^ (term >>> 32));
result = 31 * result + (int) (version ^ (version >>> 32));
return result;
}
@Override
public String toString() {
return "TermVersionResponse{" +
"term=" + term +
", version=" + version +
'}';
}
}

View File

@ -74,7 +74,7 @@ public class CoordinationStateTests extends ESTestCase {
cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3); cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3);
} }
private DiscoveryNode createNode(String id) { public static DiscoveryNode createNode(String id) {
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
} }

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.EqualsHashCodeTestUtils;
import java.util.Optional;
public class MessagesTests extends ESTestCase { public class MessagesTests extends ESTestCase {
private DiscoveryNode createNode(String id) { private DiscoveryNode createNode(String id) {
@ -96,6 +98,33 @@ public class MessagesTests extends ESTestCase {
}); });
} }
public void testPublishWithJoinResponseEqualsHashCodeSerialization() {
PublishResponse initialPublishResponse = new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong());
Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong());
PublishWithJoinResponse initialPublishWithJoinResponse = new PublishWithJoinResponse(initialPublishResponse,
randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishWithJoinResponse,
publishWithJoinResponse -> copyWriteable(publishWithJoinResponse, writableRegistry(), PublishWithJoinResponse::new),
publishWithJoinResponse -> {
switch (randomInt(1)) {
case 0:
// change publish response
return new PublishWithJoinResponse(new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()),
publishWithJoinResponse.getJoin());
case 1:
// change optional join
Join newJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
return new PublishWithJoinResponse(publishWithJoinResponse.getPublishResponse(),
publishWithJoinResponse.getJoin().isPresent() && randomBoolean() ? Optional.empty() : Optional.of(newJoin));
default:
throw new AssertionError();
}
});
}
public void testStartJoinRequestEqualsHashCodeSerialization() { public void testStartJoinRequestEqualsHashCodeSerialization() {
StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest, EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest,

View File

@ -0,0 +1,430 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class PublicationTests extends ESTestCase {
class MockNode {
MockNode(Settings settings, DiscoveryNode localNode) {
this.localNode = localNode;
ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode,
VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L);
coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L,
initialState));
}
final DiscoveryNode localNode;
final CoordinationState coordinationState;
public MockPublication publish(ClusterState clusterState, Discovery.AckListener ackListener, Set<DiscoveryNode> faultyNodes) {
PublishRequest publishRequest = coordinationState.handleClientValue(clusterState);
MockPublication currentPublication = new MockPublication(Settings.EMPTY, publishRequest, ackListener, () -> 0L) {
@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
return coordinationState.isPublishQuorum(votes);
}
@Override
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
return coordinationState.handlePublishResponse(sourceNode, publishResponse);
}
};
currentPublication.start(faultyNodes);
return currentPublication;
}
}
abstract class MockPublication extends Publication {
final PublishRequest publishRequest;
ApplyCommitRequest applyCommit;
boolean completed;
boolean committed;
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
Map<DiscoveryNode, PublishWithJoinResponse> possibleJoins = new HashMap<>();
MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener,
LongSupplier currentTimeSupplier) {
super(settings, publishRequest, ackListener, currentTimeSupplier);
this.publishRequest = publishRequest;
}
@Override
protected void onCompletion(boolean committed) {
assertFalse(completed);
completed = true;
this.committed = committed;
}
@Override
protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) {
assertNull(possibleJoins.put(sourceNode, response));
}
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
assertSame(publishRequest, this.publishRequest);
assertNull(pendingPublications.put(destination, responseActionListener));
}
@Override
protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit,
ActionListener<TransportResponse.Empty> responseActionListener) {
if (this.applyCommit == null) {
this.applyCommit = applyCommit;
} else {
assertSame(applyCommit, this.applyCommit);
}
assertNull(pendingCommits.put(destination, responseActionListener));
}
}
DiscoveryNode n1 = CoordinationStateTests.createNode("node1");
DiscoveryNode n2 = CoordinationStateTests.createNode("node2");
DiscoveryNode n3 = CoordinationStateTests.createNode("node3");
Set<DiscoveryNode> discoNodes = Sets.newHashSet(n1, n2, n3);
MockNode node1 = new MockNode(Settings.EMPTY, n1);
MockNode node2 = new MockNode(Settings.EMPTY, n2);
MockNode node3 = new MockNode(Settings.EMPTY, n3);
List<MockNode> nodes = Arrays.asList(node1, node2, node3);
Function<DiscoveryNode, MockNode> nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get();
private void initializeCluster(VotingConfiguration initialConfig) {
node1.coordinationState.setInitialState(CoordinationStateTests.clusterState(0L, 1L, n1, initialConfig, initialConfig, 0L));
StartJoinRequest startJoinRequest = new StartJoinRequest(n1, 1L);
node1.coordinationState.handleJoin(node1.coordinationState.handleStartJoin(startJoinRequest));
node1.coordinationState.handleJoin(node2.coordinationState.handleStartJoin(startJoinRequest));
node1.coordinationState.handleJoin(node3.coordinationState.handleStartJoin(startJoinRequest));
assertTrue(node1.coordinationState.electionWon());
}
public void testSimpleClusterStatePublishing() throws InterruptedException {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());
assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
assertTrue(publication.pendingCommits.isEmpty());
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
boolean delayProcessingNode2PublishResponse = randomBoolean();
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (delayProcessingNode2PublishResponse && e.getKey().equals(n2)) {
return;
}
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty());
assertFalse(publication.possibleJoins.containsKey(e.getKey()));
PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse,
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong())));
e.getValue().onResponse(publishWithJoinResponse);
assertTrue(publication.possibleJoins.containsKey(e.getKey()));
assertEquals(publishWithJoinResponse, publication.possibleJoins.get(e.getKey()));
if (e.getKey().equals(n1)) {
processedNode1PublishResponse.set(true);
}
assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty());
});
if (delayProcessingNode2PublishResponse) {
assertThat(publication.pendingCommits.keySet(), equalTo(Sets.newHashSet(n1, n3)));
} else {
assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes));
}
assertNotNull(publication.applyCommit);
assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term());
assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version());
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
assertFalse(publication.completed);
assertFalse(publication.committed);
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
});
if (delayProcessingNode2PublishResponse) {
assertFalse(publication.completed);
assertFalse(publication.committed);
PublishResponse publishResponse = nodeResolver.apply(n2).coordinationState.handlePublishRequest(
publication.publishRequest);
publication.pendingPublications.get(n2).onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes));
assertFalse(publication.completed);
assertFalse(publication.committed);
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
}
assertTrue(publication.completed);
assertTrue(publication.committed);
assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
}
public void testClusterStatePublishingWithFaultyNodeBeforeCommit() throws InterruptedException {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
AtomicInteger remainingActions = new AtomicInteger(4); // number of publish actions + initial faulty nodes injection
int injectFaultAt = randomInt(remainingActions.get() - 1);
logger.info("Injecting fault at: {}", injectFaultAt);
Set<DiscoveryNode> initialFaultyNodes = remainingActions.decrementAndGet() == injectFaultAt ?
Collections.singleton(n2) : Collections.emptySet();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, initialFaultyNodes);
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (remainingActions.decrementAndGet() == injectFaultAt) {
publication.onFaultyNode(n2);
}
if (e.getKey().equals(n2) == false || randomBoolean()) {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
}
});
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
});
assertTrue(publication.completed);
assertTrue(publication.committed);
publication.onFaultyNode(randomFrom(n1, n3)); // has no influence
List<Tuple<DiscoveryNode, Throwable>> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS);
assertThat(errors.size(), equalTo(1));
assertThat(errors.get(0).v1(), equalTo(n2));
assertThat(errors.get(0).v2().getMessage(), containsString("faulty node"));
}
public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws InterruptedException {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
boolean publicationDidNotMakeItToNode2 = randomBoolean();
AtomicInteger remainingActions = new AtomicInteger(publicationDidNotMakeItToNode2 ? 2 : 3);
int injectFaultAt = randomInt(remainingActions.get() - 1);
logger.info("Injecting fault at: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, publicationDidNotMakeItToNode2);
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
}
});
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2)) {
// we must fail node before committing for the node, otherwise failing the node is ignored
publication.onFaultyNode(n2);
}
if (remainingActions.decrementAndGet() == injectFaultAt) {
publication.onFaultyNode(n2);
}
if (e.getKey().equals(n2) == false || randomBoolean()) {
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
}
});
// we need to complete publication by failing the node
if (publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) {
publication.onFaultyNode(n2);
}
assertTrue(publication.completed);
assertTrue(publication.committed);
publication.onFaultyNode(randomFrom(n1, n3)); // has no influence
List<Tuple<DiscoveryNode, Throwable>> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS);
assertThat(errors.size(), equalTo(1));
assertThat(errors.get(0).v1(), equalTo(n2));
assertThat(errors.get(0).v2().getMessage(), containsString("faulty node"));
}
public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws InterruptedException {
VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId()));
initializeCluster(config);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, config, config, 42L), ackListener, Collections.emptySet());
boolean timeOut = randomBoolean();
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2)) {
if (timeOut) {
publication.onTimeout();
} else {
e.getValue().onFailure(new TransportException(new Exception("dummy failure")));
}
assertTrue(publication.completed);
assertFalse(publication.committed);
} else if (randomBoolean()) {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
}
});
assertThat(publication.pendingCommits.keySet(), equalTo(Collections.emptySet()));
assertNull(publication.applyCommit);
assertTrue(publication.completed);
assertFalse(publication.committed);
List<Tuple<DiscoveryNode, Throwable>> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS);
assertThat(errors.size(), equalTo(3));
assertThat(errors.stream().map(Tuple::v1).collect(Collectors.toList()), containsInAnyOrder(n1, n2, n3));
errors.stream().forEach(tuple ->
assertThat(tuple.v2().getMessage(), containsString(timeOut ? "timed out" :
tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum")));
}
public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException {
VotingConfiguration config = new VotingConfiguration(randomBoolean() ?
Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId()));
initializeCluster(config);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, config, config, 42L), ackListener, Collections.emptySet());
boolean publishedToN3 = randomBoolean();
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n3) == false || publishedToN3) {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
}
});
assertNotNull(publication.applyCommit);
Set<DiscoveryNode> committingNodes = new HashSet<>(randomSubsetOf(discoNodes));
if (publishedToN3 == false) {
committingNodes.remove(n3);
}
logger.info("Committing nodes: {}", committingNodes);
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
if (committingNodes.contains(e.getKey())) {
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
}
});
publication.onTimeout();
assertTrue(publication.completed);
assertTrue(publication.committed);
assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS));
// check that acking still works after publication completed
if (publishedToN3 == false) {
publication.pendingPublications.get(n3).onResponse(
new PublishWithJoinResponse(node3.coordinationState.handlePublishRequest(publication.publishRequest), Optional.empty()));
}
assertEquals(discoNodes, publication.pendingCommits.keySet());
Set<DiscoveryNode> nonCommittedNodes = Sets.difference(discoNodes, committingNodes);
logger.info("Non-committed nodes: {}", nonCommittedNodes);
nonCommittedNodes.stream().collect(shuffle()).forEach(n ->
publication.pendingCommits.get(n).onResponse(TransportResponse.Empty.INSTANCE));
assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS));
}
public static <T> Collector<T, ?, Stream<T>> shuffle() {
return Collectors.collectingAndThen(Collectors.toList(),
ts -> {
Collections.shuffle(ts, random());
return ts.stream();
});
}
}

View File

@ -65,8 +65,10 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -816,6 +818,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
public static class AssertingAckListener implements Discovery.AckListener { public static class AssertingAckListener implements Discovery.AckListener {
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>(); private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
private final Set<DiscoveryNode> successfulAcks = Collections.synchronizedSet(new HashSet<>());
private final CountDownLatch countDown; private final CountDownLatch countDown;
private final CountDownLatch commitCountDown; private final CountDownLatch commitCountDown;
@ -833,13 +836,16 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
if (e != null) { if (e != null) {
errors.add(new Tuple<>(node, e)); errors.add(new Tuple<>(node, e));
} else {
successfulAcks.add(node);
} }
countDown.countDown(); countDown.countDown();
} }
public void await(long timeout, TimeUnit unit) throws InterruptedException { public Set<DiscoveryNode> await(long timeout, TimeUnit unit) throws InterruptedException {
assertThat(awaitErrors(timeout, unit), emptyIterable()); assertThat(awaitErrors(timeout, unit), emptyIterable());
assertTrue(commitCountDown.await(timeout, unit)); assertTrue(commitCountDown.await(timeout, unit));
return new HashSet<>(successfulAcks);
} }
public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {