Fixed ack behaviour when no ack is expected from any node or timeout is set to 0

We now return acknowledged true when no wait is needed (mustAck always returns false). We do wait for the master node to complete its actions though. Previously it would try to timeout and hang due to a CountDown#fastForward call when the internal counter is set to 0

We now return acknowledged false without starting the timeout thread when the timeout is set 0, as starting the wait and immediately stopping the thread seems pointless.

Added coverage for ack in ClusterServiceTests
This commit is contained in:
Luca Cavanna 2013-10-31 15:42:15 +01:00
parent c63741db04
commit afa0ab6226
3 changed files with 411 additions and 17 deletions

View File

@ -336,14 +336,18 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (updateTask instanceof AckedClusterStateUpdateTask) {
final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
} catch(EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
ackedUpdateTask.onAckTimeout();
} else {
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState, threadPool);
} catch(EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedUpdateTask.onAckTimeout();
}
}
}
} else {
@ -624,21 +628,23 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private class AckCountDownListener implements Discovery.AckListener {
private final AckedClusterStateUpdateTask ackedUpdateTask;
private final long version;
private final CountDown countDown;
private final ClusterState clusterState;
private final Future<?> ackTimeoutCallback;
private Throwable lastFailure;
AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, ClusterState clusterState, ThreadPool threadPool) {
this.ackedUpdateTask = ackedUpdateTask;
this.version = clusterStateVersion;
this.clusterState = clusterState;
int countDown = 0;
for (DiscoveryNode node : nodes) {
for (DiscoveryNode node : clusterState.nodes()) {
if (ackedUpdateTask.mustAck(node)) {
countDown++;
}
}
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, version);
//we always wait for at least 1 node (the master)
countDown = Math.max(1, countDown);
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterState.version());
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
@Override
@ -651,17 +657,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
if (!ackedUpdateTask.mustAck(node)) {
return;
//we always wait for the master ack anyway
if (!node.equals(clusterState.nodes().masterNode())) {
return;
}
}
if (t == null) {
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, version);
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterState.version());
} else {
this.lastFailure = t;
logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, version);
logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, clusterState.version());
}
if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", version);
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterState.version());
ackTimeoutCallback.cancel(true);
ackedUpdateTask.onAllNodesAcked(lastFailure);
}
@ -670,7 +679,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void onTimeout() {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", version);
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterState.version());
ackedUpdateTask.onAckTimeout();
}
}

View File

@ -22,7 +22,9 @@ package org.elasticsearch.cluster;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent;
@ -109,6 +111,287 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
assertThat(executeCalled.get(), equalTo(false));
}
@Test
public void testAckedUpdateTask() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class);
final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean processed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true);
}
@Override
public void onAckTimeout() {
ackTimeout.set(true);
}
@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processed.set(true);
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
executed.set(true);
return ClusterState.newClusterStateBuilder().state(currentState).build();
}
@Override
public void onFailure(String source, Throwable t) {
onFailure.set(true);
latch.countDown();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
assertThat(allNodesAcked.get(), equalTo(true));
assertThat(ackTimeout.get(), equalTo(false));
assertThat(executed.get(), equalTo(true));
assertThat(processed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false));
}
@Test
public void testAckedUpdateTaskSameClusterState() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class);
final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean processed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true);
}
@Override
public void onAckTimeout() {
ackTimeout.set(true);
}
@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processed.set(true);
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
executed.set(true);
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
onFailure.set(true);
latch.countDown();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
assertThat(allNodesAcked.get(), equalTo(true));
assertThat(ackTimeout.get(), equalTo(false));
assertThat(executed.get(), equalTo(true));
assertThat(processed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false));
}
@Test
public void testAckedUpdateTaskNoAckExpected() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class);
final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return false;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true);
latch.countDown();
}
@Override
public void onAckTimeout() {
ackTimeout.set(true);
latch.countDown();
}
@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
executed.set(true);
return ClusterState.newClusterStateBuilder().state(currentState).build();
}
@Override
public void onFailure(String source, Throwable t) {
onFailure.set(true);
latch.countDown();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
assertThat(allNodesAcked.get(), equalTo(true));
assertThat(ackTimeout.get(), equalTo(false));
assertThat(executed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false));
}
@Test
public void testAckedUpdateTaskTimeoutZero() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class);
final AtomicBoolean allNodesAcked = new AtomicBoolean(false);
final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return false;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true);
latch.countDown();
}
@Override
public void onAckTimeout() {
ackTimeout.set(true);
latch.countDown();
}
@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(0);
}
@Override
public TimeValue timeout() {
return TimeValue.timeValueSeconds(10);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
executed.set(true);
return ClusterState.newClusterStateBuilder().state(currentState).build();
}
@Override
public void onFailure(String source, Throwable t) {
onFailure.set(true);
latch.countDown();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
assertThat(allNodesAcked.get(), equalTo(false));
assertThat(ackTimeout.get(), equalTo(true));
assertThat(executed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false));
}
@Test
public void testPendingUpdateTask() throws Exception {
Settings zenSettings = settingsBuilder()

View File

@ -74,6 +74,15 @@ public class AckTests extends AbstractIntegrationTest {
}
}
@Test
public void testUpdateSettingsNoAcknowledgement() {
createIndex("test");
UpdateSettingsResponse updateSettingsResponse = client().admin().indices().prepareUpdateSettings("test").setTimeout("0s")
.setSettings(ImmutableSettings.builder().put("refresh_interval", 9999)).get();
assertThat(updateSettingsResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testPutWarmerAcknowledgement() {
createIndex("test");
@ -94,6 +103,17 @@ public class AckTests extends AbstractIntegrationTest {
}
}
@Test
public void testPutWarmerNoAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer").setTimeout("0s")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testDeleteWarmerAcknowledgement() {
createIndex("test");
@ -113,6 +133,17 @@ public class AckTests extends AbstractIntegrationTest {
}
}
@Test
public void testDeleteWarmerNoAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer").setTimeout("0s")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testDeleteMappingAcknowledgement() {
client().admin().indices().prepareCreate("test")
@ -133,6 +164,18 @@ public class AckTests extends AbstractIntegrationTest {
}
}
@Test
public void testDeleteMappingNoAcknowledgement() {
client().admin().indices().prepareCreate("test")
.addMapping("type1", "field1", "type=string").get();
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1");
DeleteMappingResponse deleteMappingResponse = client().admin().indices().prepareDeleteMapping("test").setTimeout("0s").setType("type1").get();
assertThat(deleteMappingResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testClusterRerouteAcknowledgement() throws InterruptedException {
client().admin().indices().prepareCreate("test")
@ -173,6 +216,20 @@ public class AckTests extends AbstractIntegrationTest {
waitForRelocation();
}
@Test
public void testClusterRerouteNoAcknowledgement() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
MoveAllocationCommand moveAllocationCommand = getAllocationCommand();
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setTimeout("0s").add(moveAllocationCommand).get();
assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(false));
}
@Test
public void testClusterRerouteAcknowledgementDryRun() throws InterruptedException {
client().admin().indices().prepareCreate("test")
@ -209,6 +266,21 @@ public class AckTests extends AbstractIntegrationTest {
}
}
@Test
public void testClusterRerouteNoAcknowledgementDryRun() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
MoveAllocationCommand moveAllocationCommand = getAllocationCommand();
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setTimeout("0s").setDryRun(true).add(moveAllocationCommand).get();
//acknowledged anyway as no changes were made
assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(true));
}
private MoveAllocationCommand getAllocationCommand() {
String fromNodeId = null;
String toNodeId = null;
@ -281,4 +353,34 @@ public class AckTests extends AbstractIntegrationTest {
//removes the allocation exclude settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", "")).get();
}
@Test
public void testClusterUpdateSettingsNoAcknowledgement() {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get();
String excludedNodeId = null;
for (NodeInfo nodeInfo : nodesInfo) {
if (nodeInfo.getNode().isDataNode()) {
excludedNodeId = nodesInfo.getAt(0).getNode().id();
break;
}
}
assert excludedNodeId != null;
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = client().admin().cluster().prepareUpdateSettings().setTimeout("0s")
.setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", excludedNodeId)).get();
assertThat(clusterUpdateSettingsResponse.isAcknowledged(), equalTo(false));
assertThat(clusterUpdateSettingsResponse.getTransientSettings().get("cluster.routing.allocation.exclude._id"), equalTo(excludedNodeId));
//let's wait for the relocation to be completed, otherwise there can be issues with after test checks (mock directory wrapper etc.)
waitForRelocation();
//removes the allocation exclude settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", "")).get();
}
}