improved timeout handling

This commit is contained in:
Boaz Leskes 2015-08-23 22:41:21 +02:00
parent 4d31681057
commit 234a3794e5
2 changed files with 24 additions and 14 deletions

View File

@ -35,13 +35,22 @@ import java.util.EnumSet;
*/ */
public class DiscoverySettings extends AbstractComponent { public class DiscoverySettings extends AbstractComponent {
/**
* sets the timeout for a complete publishing cycle, including both sending and committing. the master
* will continute to process the next cluster state update after this time has elapsed
**/
public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout"; public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";
/**
* sets the timeout for receiving enough acks for a specific cluster state and committing it. failing
* to receive responses within this window will cause the cluster state change to be rejected.
*/
public static final String COMMIT_TIMEOUT = "discovery.zen.commit_timeout"; public static final String COMMIT_TIMEOUT = "discovery.zen.commit_timeout";
public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block"; public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block";
public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diff.enable"; public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diff.enable";
public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30); public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(1); public static final TimeValue DEFAULT_COMMIT_TIMEOUT = TimeValue.timeValueSeconds(30);
public static final String DEFAULT_NO_MASTER_BLOCK = "write"; public static final String DEFAULT_NO_MASTER_BLOCK = "write";
public final static int NO_MASTER_BLOCK_ID = 2; public final static int NO_MASTER_BLOCK_ID = 2;
public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true; public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true;

View File

@ -121,6 +121,8 @@ public class PublishClusterStateAction extends AbstractComponent {
final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null; final boolean sendFullVersion = !discoverySettings.getPublishDiff() || previousState == null;
final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); final SendingController sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
final long publishingStartInNanos = System.nanoTime();
// we build these early as a best effort not to commit in the case of error. // we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node // sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the // will cause a full serialization based on an older version, which may fail after the
@ -140,21 +142,19 @@ public class PublishClusterStateAction extends AbstractComponent {
sendingController.waitForCommit(discoverySettings.getCommitTimeout()); sendingController.waitForCommit(discoverySettings.getCommitTimeout());
if (publishTimeout.millis() > 0) { try {
// only wait if the publish timeout is configured... long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
try { sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(publishTimeout)); if (sendingController.getPublishingTimedOut()) {
if (sendingController.getPublishingTimedOut()) { DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); // everyone may have just responded
// everyone may have just responded if (pendingNodes.length > 0) {
if (pendingNodes.length > 0) { logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes);
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes);
}
} }
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
} }
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
} }
} }
@ -487,6 +487,7 @@ public class PublishClusterStateAction extends AbstractComponent {
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
//nocommit: make sure we prevent publishing successfully!
if (committed.get() == false) { if (committed.get() == false) {
throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left",
timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);