diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 3e54d499db3..19376c48eb5 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; @@ -597,7 +598,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte IndexNotFoundException.class, ShardNotFoundException.class, NotSerializableExceptionWrapper.class, - org.elasticsearch.discovery.zen.publish.PublishClusterStateAction.FailedToCommitException.class + Discovery.FailedToCommitClusterStateException.class }; Map> mapping = new HashMap<>(exceptions.length); for (Class e : exceptions) { diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 76262a381b4..08cdbbb863b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; -import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.*; @@ -485,8 +484,8 @@ public class InternalClusterService extends AbstractLifecycleComponent { * * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether * they updated their own cluster state or not. + * + * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected. + * Any other exception signals the something wrong happened but the change is committed. */ void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener); - public static interface AckListener { + interface AckListener { void onNodeAck(DiscoveryNode node, @Nullable Throwable t); void onTimeout(); } + + class FailedToCommitClusterStateException extends ElasticsearchException { + + public FailedToCommitClusterStateException(StreamInput in) throws IOException { + super(in); + } + + public FailedToCommitClusterStateException(String msg, Object... args) { + super(msg, args); + } + + public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 1ad2bd96514..a38fef4cc71 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -331,7 +331,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodesFD.updateNodesAndPing(clusterChangedEvent.state()); try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); - } catch (PublishClusterStateAction.FailedToCommitException t) { + } catch (FailedToCommitClusterStateException t) { // cluster service logs a WARN message logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index b224c777109..32867fae78d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -93,39 +93,73 @@ public class PublishClusterStateAction extends AbstractComponent { transportService.removeHandler(COMMIT_ACTION_NAME); } - public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, final Discovery.AckListener ackListener) { - final DiscoveryNodes nodes = clusterChangedEvent.state().nodes(); - Set nodesToPublishTo = new HashSet<>(nodes.size()); - DiscoveryNode localNode = nodes.localNode(); - final int totalMasterNodes = nodes.masterNodes().size(); - for (final DiscoveryNode node : nodes) { - if (node.equals(localNode) == false) { - nodesToPublishTo.add(node); + /** + * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will + * be processed by the master and the other nodes. + *

+ * The method is guaranteed to throw a {@link Discovery.FailedToCommitClusterStateException} if the change is not committed and should be rejected. + * Any other exception signals the something wrong happened but the change is committed. + */ + public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException { + final DiscoveryNodes nodes; + final SendingController sendingController; + final Set nodesToPublishTo; + final Map serializedStates; + final Map serializedDiffs; + final boolean sendFullVersion; + try { + nodes = clusterChangedEvent.state().nodes(); + nodesToPublishTo = new HashSet<>(nodes.size()); + DiscoveryNode localNode = nodes.localNode(); + final int totalMasterNodes = nodes.masterNodes().size(); + for (final DiscoveryNode node : nodes) { + if (node.equals(localNode) == false) { + nodesToPublishTo.add(node); + } + } + sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null; + serializedStates = Maps.newHashMap(); + serializedDiffs = Maps.newHashMap(); + + // we build these early as a best effort not to commit in the case of error. + // sadly this is not water tight as it may that a failed diff based publishing to a node + // will cause a full serialization based on an older version, which may fail after the + // change has been committed. + buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), + nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); + + final BlockingClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener); + sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); + } catch (Throwable t) { + throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", t); + } + + try { + innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs); + } catch (Discovery.FailedToCommitClusterStateException t) { + throw t; + } catch (Throwable t) { + // try to fail committing, in cause it's still on going + sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]"); + if (sendingController.isCommitted() == false) { + // signal the change should be rejected + throw new Discovery.FailedToCommitClusterStateException("unexpected error [{}]", t, t.getMessage()); + } else { + throw t; } } - publish(clusterChangedEvent, minMasterNodes, totalMasterNodes, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); } - private void publish(final ClusterChangedEvent clusterChangedEvent, int minMasterNodes, int totalMasterNodes, final Set nodesToPublishTo, - final BlockingClusterStatePublishResponseHandler publishResponseHandler) { - - Map serializedStates = Maps.newHashMap(); - Map serializedDiffs = Maps.newHashMap(); + private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set nodesToPublishTo, + final SendingController sendingController, final boolean sendFullVersion, + final Map serializedStates, final Map serializedDiffs) { final ClusterState clusterState = clusterChangedEvent.state(); final ClusterState previousState = clusterChangedEvent.previousState(); final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); - final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null; - final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); final long publishingStartInNanos = System.nanoTime(); - // we build these early as a best effort not to commit in the case of error. - // sadly this is not water tight as it may that a failed diff based publishing to a node - // will cause a full serialization based on an older version, which may fail after the - // change has been committed. - buildDiffAndSerializeStates(clusterState, previousState, nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); - for (final DiscoveryNode node : nodesToPublishTo) { // try and serialize the cluster state once (or per version), so we don't serialize it // per node when we send it over the wire, compress it while we are at it... @@ -141,6 +175,7 @@ public class PublishClusterStateAction extends AbstractComponent { try { long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos)); + final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler(); sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos))); if (sendingController.getPublishingTimedOut()) { DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); @@ -335,12 +370,13 @@ public class PublishClusterStateAction extends AbstractComponent { } // package private for testing + /** * does simple sanity check of the incoming cluster state. Throws an exception on rejections. */ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) { final ClusterName incomingClusterName = incomingState.getClusterName(); - if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) { + if (!incomingClusterName.equals(this.clusterName)) { logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName); throw new IllegalStateException("received state from a node that is not part of the cluster"); } @@ -443,17 +479,6 @@ public class PublishClusterStateAction extends AbstractComponent { } - public static class FailedToCommitException extends ElasticsearchException { - - public FailedToCommitException(StreamInput in) throws IOException { - super(in); - } - - public FailedToCommitException(String msg, Object... args) { - super(msg, args); - } - } - class SendingController { private final ClusterState clusterState; @@ -481,7 +506,7 @@ public class PublishClusterStateAction extends AbstractComponent { this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes this.pendingMasterNodes = totalMasterNodes - 1; if (this.neededMastersToCommit > this.pendingMasterNodes) { - throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); + throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); } this.committed = neededMastersToCommit == 0; this.committedOrFailed = committed; @@ -493,14 +518,14 @@ public class PublishClusterStateAction extends AbstractComponent { try { timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false; } catch (InterruptedException e) { - + // the commit check bellow will either translate to an exception or we are committed and we can safely continue } if (timedout) { markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]"); } if (isCommitted() == false) { - throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", + throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); } } @@ -542,7 +567,7 @@ public class PublishClusterStateAction extends AbstractComponent { synchronized private void onMasterNodeDone(DiscoveryNode node) { pendingMasterNodes--; if (pendingMasterNodes == 0 && neededMastersToCommit > 0) { - markAsFailed("All master nodes acked or failed but [" + neededMastersToCommit + "] acks are still needed"); + markAsFailed("no more pending master nodes, but [" + neededMastersToCommit + "] acks are still needed"); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 00bf606d061..6817aa1b38e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -27,16 +27,15 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.disruption.NetworkDelaysPartition; -import org.elasticsearch.test.disruption.NetworkUnresponsivePartition; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.junit.Test; @@ -393,7 +392,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.debug("--> waiting for cluster state to be processed/rejected"); latch.await(); - assertThat(failure.get(), instanceOf(PublishClusterStateAction.FailedToCommitException.class)); + assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class)); assertBusy(new Runnable() { @Override public void run() { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index d2b69327bd5..1a078171fdd 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -354,7 +354,7 @@ public class PublishClusterStateActionTests extends ESTestCase { /** - * Test concurrent publishing works correctly (although not strictly required, it's a good testamne + * Test not waiting publishing works correctly (i.e., publishing times out) */ @Test public void testSimultaneousClusterStatePublishing() throws Exception { @@ -447,9 +447,9 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState); fail("cluster state published despite of diff errors"); - } catch (ElasticsearchException e) { + } catch (Discovery.FailedToCommitClusterStateException e) { assertThat(e.getCause(), notNullValue()); - assertThat(e.getCause().getMessage(), containsString("Simulated")); + assertThat(e.getCause().getMessage(), containsString("failed to serialize")); } } @@ -475,7 +475,7 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); fail("cluster state publishing didn't fail despite of not having enough nodes"); - } catch (PublishClusterStateAction.FailedToCommitException expected) { + } catch (Discovery.FailedToCommitClusterStateException expected) { logger.debug("failed to publish as expected", expected); } } @@ -554,7 +554,7 @@ public class PublishClusterStateActionTests extends ESTestCase { if (expectingToCommit == false) { fail("cluster state publishing didn't fail despite of not have enough nodes"); } - } catch (PublishClusterStateAction.FailedToCommitException exception) { + } catch (Discovery.FailedToCommitClusterStateException exception) { logger.debug("failed to publish as expected", exception); if (expectingToCommit) { throw exception; @@ -697,7 +697,7 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS); success = true; - } catch (PublishClusterStateAction.FailedToCommitException OK) { + } catch (Discovery.FailedToCommitClusterStateException OK) { success = false; } logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed");