From afa0ab62261d9a46c77a8bea47679ffe5e743ddc Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 31 Oct 2013 15:42:15 +0100 Subject: [PATCH] Fixed ack behaviour when no ack is expected from any node or timeout is set to 0 We now return acknowledged true when no wait is needed (mustAck always returns false). We do wait for the master node to complete its actions though. Previously it would try to timeout and hang due to a CountDown#fastForward call when the internal counter is set to 0 We now return acknowledged false without starting the timeout thread when the timeout is set 0, as starting the wait and immediately stopping the thread seems pointless. Added coverage for ack in ClusterServiceTests --- .../service/InternalClusterService.java | 43 +-- .../cluster/ClusterServiceTests.java | 283 ++++++++++++++++++ .../elasticsearch/cluster/ack/AckTests.java | 102 +++++++ 3 files changed, 411 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b0d98c39454..72dc8db03a6 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -336,14 +336,18 @@ public class InternalClusterService extends AbstractLifecycleComponent ackTimeoutCallback; private Throwable lastFailure; - AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { + AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, ClusterState clusterState, ThreadPool threadPool) { this.ackedUpdateTask = ackedUpdateTask; - this.version = clusterStateVersion; + this.clusterState = clusterState; int countDown = 0; - for (DiscoveryNode node : nodes) { + for (DiscoveryNode node : clusterState.nodes()) { if (ackedUpdateTask.mustAck(node)) { countDown++; } } - logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, version); + //we always wait for at least 1 node (the master) + countDown = Math.max(1, countDown); + logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterState.version()); this.countDown = new CountDown(countDown); this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() { @Override @@ -651,17 +657,20 @@ public class InternalClusterService extends AbstractLifecycleComponent