From 234a3794e5e6cb85ef8d43619884209bb93c6399 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 23 Aug 2015 22:41:21 +0200 Subject: [PATCH] improved timeout handling --- .../discovery/DiscoverySettings.java | 11 +++++++- .../publish/PublishClusterStateAction.java | 27 ++++++++++--------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java b/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java index 8807bfcbf3d..cbf4cf10477 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java @@ -35,13 +35,22 @@ import java.util.EnumSet; */ public class DiscoverySettings extends AbstractComponent { + /** + * sets the timeout for a complete publishing cycle, including both sending and committing. the master + * will continute to process the next cluster state update after this time has elapsed + **/ public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout"; + + /** + * sets the timeout for receiving enough acks for a specific cluster state and committing it. failing + * to receive responses within this window will cause the cluster state change to be rejected. + */ public static final String COMMIT_TIMEOUT = "discovery.zen.commit_timeout"; public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block"; public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diff.enable"; public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30); - public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(1); + public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(30); public static final String DEFAULT_NO_MASTER_BLOCK = "write"; public final static int NO_MASTER_BLOCK_ID = 2; public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 5083dc98af3..7722511b6ab 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -121,6 +121,8 @@ public class PublishClusterStateAction extends AbstractComponent { final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null; final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); + final long publishingStartInNanos = System.nanoTime(); + // we build these early as a best effort not to commit in the case of error. // sadly this is not water tight as it may that a failed diff based publishing to a node // will cause a full serialization based on an older version, which may fail after the @@ -140,21 +142,19 @@ public class PublishClusterStateAction extends AbstractComponent { sendingController.waitForCommit(discoverySettings.getCommitTimeout()); - if (publishTimeout.millis() > 0) { - // only wait if the publish timeout is configured... - try { - sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(publishTimeout)); - if (sendingController.getPublishingTimedOut()) { - DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); - // everyone may have just responded - if (pendingNodes.length > 0) { - logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); - } + try { + long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos)); + sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos))); + if (sendingController.getPublishingTimedOut()) { + DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); + // everyone may have just responded + if (pendingNodes.length > 0) { + logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); } - } catch (InterruptedException e) { - // ignore & restore interrupt - Thread.currentThread().interrupt(); } + } catch (InterruptedException e) { + // ignore & restore interrupt + Thread.currentThread().interrupt(); } } @@ -487,6 +487,7 @@ public class PublishClusterStateAction extends AbstractComponent { } catch (InterruptedException e) { } + //nocommit: make sure we prevent publishing successfully! if (committed.get() == false) { throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);