tighten up FailedToCommitClusterStateException semantics and other feedback

This commit is contained in:
Boaz Leskes 2015-08-26 15:55:31 +02:00
parent 98ed133dd7
commit c9ee8dbd16
7 changed files with 99 additions and 55 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -597,7 +598,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
IndexNotFoundException.class, IndexNotFoundException.class,
ShardNotFoundException.class, ShardNotFoundException.class,
NotSerializableExceptionWrapper.class, NotSerializableExceptionWrapper.class,
org.elasticsearch.discovery.zen.publish.PublishClusterStateAction.FailedToCommitException.class Discovery.FailedToCommitClusterStateException.class
}; };
Map<String, Constructor<? extends ElasticsearchException>> mapping = new HashMap<>(exceptions.length); Map<String, Constructor<? extends ElasticsearchException>> mapping = new HashMap<>(exceptions.length);
for (Class<? extends ElasticsearchException> e : exceptions) { for (Class<? extends ElasticsearchException> e : exceptions) {

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.util.concurrent.*;
@ -485,8 +484,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.debug("publishing cluster state version [{}]", newClusterState.version()); logger.debug("publishing cluster state version [{}]", newClusterState.version());
try { try {
discoveryService.publish(clusterChangedEvent, ackListener); discoveryService.publish(clusterChangedEvent, ackListener);
} catch (Throwable t) { } catch (Discovery.FailedToCommitClusterStateException t) {
logger.warn("failing [{}]: failed to publish cluster state version [{}]", t, source, newClusterState.version()); logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
updateTask.onFailure(source, t); updateTask.onFailure(source, t);
return; return;
} }

View File

@ -19,15 +19,17 @@
package org.elasticsearch.discovery; package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.service.NodeService;
import java.io.IOException;
/** /**
* A pluggable module allowing to implement discovery of other nodes, publishing of the cluster * A pluggable module allowing to implement discovery of other nodes, publishing of the cluster
* state to all nodes, electing a master of the cluster that raises cluster state change * state to all nodes, electing a master of the cluster that raises cluster state change
@ -60,11 +62,29 @@ public interface Discovery extends LifecycleComponent<Discovery> {
* *
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
* they updated their own cluster state or not. * they updated their own cluster state or not.
*
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected.
* Any other exception signals the something wrong happened but the change is committed.
*/ */
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener); void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
public static interface AckListener { interface AckListener {
void onNodeAck(DiscoveryNode node, @Nullable Throwable t); void onNodeAck(DiscoveryNode node, @Nullable Throwable t);
void onTimeout(); void onTimeout();
} }
class FailedToCommitClusterStateException extends ElasticsearchException {
public FailedToCommitClusterStateException(StreamInput in) throws IOException {
super(in);
}
public FailedToCommitClusterStateException(String msg, Object... args) {
super(msg, args);
}
public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
} }

View File

@ -331,7 +331,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodesFD.updateNodesAndPing(clusterChangedEvent.state()); nodesFD.updateNodesAndPing(clusterChangedEvent.state());
try { try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (PublishClusterStateAction.FailedToCommitException t) { } catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message // cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() {

View File

@ -93,39 +93,73 @@ public class PublishClusterStateAction extends AbstractComponent {
transportService.removeHandler(COMMIT_ACTION_NAME); transportService.removeHandler(COMMIT_ACTION_NAME);
} }
public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) { /**
final DiscoveryNodes nodes = clusterChangedEvent.state().nodes(); * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
Set<DiscoveryNode> nodesToPublishTo = new HashSet<>(nodes.size()); * be processed by the master and the other nodes.
DiscoveryNode localNode = nodes.localNode(); * <p/>
final int totalMasterNodes = nodes.masterNodes().size(); * The method is guaranteed to throw a {@link Discovery.FailedToCommitClusterStateException} if the change is not committed and should be rejected.
for (final DiscoveryNode node : nodes) { * Any other exception signals the something wrong happened but the change is committed.
if (node.equals(localNode) == false) { */
nodesToPublishTo.add(node); public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException {
final DiscoveryNodes nodes;
final SendingController sendingController;
final Set<DiscoveryNode> nodesToPublishTo;
final Map<Version, BytesReference> serializedStates;
final Map<Version, BytesReference> serializedDiffs;
final boolean sendFullVersion;
try {
nodes = clusterChangedEvent.state().nodes();
nodesToPublishTo = new HashSet<>(nodes.size());
DiscoveryNode localNode = nodes.localNode();
final int totalMasterNodes = nodes.masterNodes().size();
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode) == false) {
nodesToPublishTo.add(node);
}
}
sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;
serializedStates = Maps.newHashMap();
serializedDiffs = Maps.newHashMap();
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
final BlockingClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
} catch (Throwable t) {
throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", t);
}
try {
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs);
} catch (Discovery.FailedToCommitClusterStateException t) {
throw t;
} catch (Throwable t) {
// try to fail committing, in cause it's still on going
sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]");
if (sendingController.isCommitted() == false) {
// signal the change should be rejected
throw new Discovery.FailedToCommitClusterStateException("unexpected error [{}]", t, t.getMessage());
} else {
throw t;
} }
} }
publish(clusterChangedEvent, minMasterNodes, totalMasterNodes, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
} }
private void publish(final ClusterChangedEvent clusterChangedEvent, int minMasterNodes, int totalMasterNodes, final Set<DiscoveryNode> nodesToPublishTo, private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
final BlockingClusterStatePublishResponseHandler publishResponseHandler) { final SendingController sendingController, final boolean sendFullVersion,
final Map<Version, BytesReference> serializedStates, final Map<Version, BytesReference> serializedDiffs) {
Map<Version, BytesReference> serializedStates = Maps.newHashMap();
Map<Version, BytesReference> serializedDiffs = Maps.newHashMap();
final ClusterState clusterState = clusterChangedEvent.state(); final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState(); final ClusterState previousState = clusterChangedEvent.previousState();
final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null;
final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
final long publishingStartInNanos = System.nanoTime(); final long publishingStartInNanos = System.nanoTime();
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
buildDiffAndSerializeStates(clusterState, previousState, nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
for (final DiscoveryNode node : nodesToPublishTo) { for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it // try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it... // per node when we send it over the wire, compress it while we are at it...
@ -141,6 +175,7 @@ public class PublishClusterStateAction extends AbstractComponent {
try { try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos)); long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos))); sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) { if (sendingController.getPublishingTimedOut()) {
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
@ -335,12 +370,13 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
// package private for testing // package private for testing
/** /**
* does simple sanity check of the incoming cluster state. Throws an exception on rejections. * does simple sanity check of the incoming cluster state. Throws an exception on rejections.
*/ */
void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) { void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
final ClusterName incomingClusterName = incomingState.getClusterName(); final ClusterName incomingClusterName = incomingState.getClusterName();
if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) { if (!incomingClusterName.equals(this.clusterName)) {
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName); logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName);
throw new IllegalStateException("received state from a node that is not part of the cluster"); throw new IllegalStateException("received state from a node that is not part of the cluster");
} }
@ -443,17 +479,6 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
public static class FailedToCommitException extends ElasticsearchException {
public FailedToCommitException(StreamInput in) throws IOException {
super(in);
}
public FailedToCommitException(String msg, Object... args) {
super(msg, args);
}
}
class SendingController { class SendingController {
private final ClusterState clusterState; private final ClusterState clusterState;
@ -481,7 +506,7 @@ public class PublishClusterStateAction extends AbstractComponent {
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
this.pendingMasterNodes = totalMasterNodes - 1; this.pendingMasterNodes = totalMasterNodes - 1;
if (this.neededMastersToCommit > this.pendingMasterNodes) { if (this.neededMastersToCommit > this.pendingMasterNodes) {
throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
} }
this.committed = neededMastersToCommit == 0; this.committed = neededMastersToCommit == 0;
this.committedOrFailed = committed; this.committedOrFailed = committed;
@ -493,14 +518,14 @@ public class PublishClusterStateAction extends AbstractComponent {
try { try {
timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false; timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// the commit check bellow will either translate to an exception or we are committed and we can safely continue
} }
if (timedout) { if (timedout) {
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]"); markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
} }
if (isCommitted() == false) { if (isCommitted() == false) {
throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",
timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);
} }
} }
@ -542,7 +567,7 @@ public class PublishClusterStateAction extends AbstractComponent {
synchronized private void onMasterNodeDone(DiscoveryNode node) { synchronized private void onMasterNodeDone(DiscoveryNode node) {
pendingMasterNodes--; pendingMasterNodes--;
if (pendingMasterNodes == 0 && neededMastersToCommit > 0) { if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
markAsFailed("All master nodes acked or failed but [" + neededMastersToCommit + "] acks are still needed"); markAsFailed("no more pending master nodes, but [" + neededMastersToCommit + "] acks are still needed");
} }
} }

View File

@ -27,16 +27,15 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.disruption.NetworkDelaysPartition; import org.elasticsearch.test.disruption.NetworkDelaysPartition;
import org.elasticsearch.test.disruption.NetworkUnresponsivePartition;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.junit.Test; import org.junit.Test;
@ -393,7 +392,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.debug("--> waiting for cluster state to be processed/rejected"); logger.debug("--> waiting for cluster state to be processed/rejected");
latch.await(); latch.await();
assertThat(failure.get(), instanceOf(PublishClusterStateAction.FailedToCommitException.class)); assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -354,7 +354,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
/** /**
* Test concurrent publishing works correctly (although not strictly required, it's a good testamne * Test not waiting publishing works correctly (i.e., publishing times out)
*/ */
@Test @Test
public void testSimultaneousClusterStatePublishing() throws Exception { public void testSimultaneousClusterStatePublishing() throws Exception {
@ -447,9 +447,9 @@ public class PublishClusterStateActionTests extends ESTestCase {
try { try {
publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState); publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState);
fail("cluster state published despite of diff errors"); fail("cluster state published despite of diff errors");
} catch (ElasticsearchException e) { } catch (Discovery.FailedToCommitClusterStateException e) {
assertThat(e.getCause(), notNullValue()); assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("Simulated")); assertThat(e.getCause().getMessage(), containsString("failed to serialize"));
} }
} }
@ -475,7 +475,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
try { try {
publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5));
fail("cluster state publishing didn't fail despite of not having enough nodes"); fail("cluster state publishing didn't fail despite of not having enough nodes");
} catch (PublishClusterStateAction.FailedToCommitException expected) { } catch (Discovery.FailedToCommitClusterStateException expected) {
logger.debug("failed to publish as expected", expected); logger.debug("failed to publish as expected", expected);
} }
} }
@ -554,7 +554,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
if (expectingToCommit == false) { if (expectingToCommit == false) {
fail("cluster state publishing didn't fail despite of not have enough nodes"); fail("cluster state publishing didn't fail despite of not have enough nodes");
} }
} catch (PublishClusterStateAction.FailedToCommitException exception) { } catch (Discovery.FailedToCommitClusterStateException exception) {
logger.debug("failed to publish as expected", exception); logger.debug("failed to publish as expected", exception);
if (expectingToCommit) { if (expectingToCommit) {
throw exception; throw exception;
@ -697,7 +697,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
try { try {
publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS); publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS);
success = true; success = true;
} catch (PublishClusterStateAction.FailedToCommitException OK) { } catch (Discovery.FailedToCommitClusterStateException OK) {
success = false; success = false;
} }
logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed"); logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed");