From d4463602f68f039069d9fe8ceaf16a639fc52c9f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 29 Apr 2015 17:51:21 +0200 Subject: [PATCH] [TEST] Use a high shard delete timeout when clusterstates are delayed `IndiceStore#indexCleanup` uses a disruption scheme to delay cluster state processing. Yet, the delay is [1..2] seconds but tests are setting the shard deletion timeout to 1 second to speed up tests. This can cause random not reproducible failures in this test since the timeouts and delays are bascially overlapping. This commit adds a longer timeout for this test to prevent these problems. --- .../indices/store/IndicesStore.java | 97 ++++++++++--------- .../store/IndicesStoreIntegrationTests.java | 15 ++- 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 32e695a828c..36c9be862ee 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -332,56 +332,57 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe // make sure shard is really there before register cluster state observer if (indexShard == null) { channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode())); - } - // create observer here. we need to register it here because we need to capture the current cluster state - // which will then be compared to the one that is applied when we call waitForNextChange(). if we create it - // later we might miss an update and wait forever in case no new cluster state comes in. - // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly. - // instead we wait for the cluster state changes because we know any shard state change will trigger or be - // triggered by a cluster state change. - ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger); - // check if shard is active. if so, all is good - boolean shardActive = shardActive(indexShard); - if (shardActive) { - channel.sendResponse(new ShardActiveResponse(true, clusterService.localNode())); } else { - // shard is not active, might be POST_RECOVERY so check if cluster state changed inbetween or wait for next change - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - sendResult(shardActive(getShard(request))); - } - - @Override - public void onClusterServiceClose() { - sendResult(false); - } - - @Override - public void onTimeout(TimeValue timeout) { - sendResult(shardActive(getShard(request))); - } - - public void sendResult(boolean shardActive) { - try { - channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode())); - } catch (IOException e) { - logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId); - } catch (EsRejectedExecutionException e) { - logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId); + // create observer here. we need to register it here because we need to capture the current cluster state + // which will then be compared to the one that is applied when we call waitForNextChange(). if we create it + // later we might miss an update and wait forever in case no new cluster state comes in. + // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly. + // instead we wait for the cluster state changes because we know any shard state change will trigger or be + // triggered by a cluster state change. + ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger); + // check if shard is active. if so, all is good + boolean shardActive = shardActive(indexShard); + if (shardActive) { + channel.sendResponse(new ShardActiveResponse(true, clusterService.localNode())); + } else { + // shard is not active, might be POST_RECOVERY so check if cluster state changed inbetween or wait for next change + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + sendResult(shardActive(getShard(request))); } - } - }, new ClusterStateObserver.ValidationPredicate() { - @Override - protected boolean validate(ClusterState newState) { - // the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified - // or the shard is active in which case we want to send back that the shard is active - // here we could also evaluate the cluster state and get the information from there. we - // don't do it because we would have to write another method for this that would have the same effect - IndexShard indexShard = getShard(request); - return indexShard == null || shardActive(indexShard); - } - }); + + @Override + public void onClusterServiceClose() { + sendResult(false); + } + + @Override + public void onTimeout(TimeValue timeout) { + sendResult(shardActive(getShard(request))); + } + + public void sendResult(boolean shardActive) { + try { + channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode())); + } catch (IOException e) { + logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId); + } catch (EsRejectedExecutionException e) { + logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId); + } + } + }, new ClusterStateObserver.ValidationPredicate() { + @Override + protected boolean validate(ClusterState newState) { + // the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified + // or the shard is active in which case we want to send back that the shard is active + // here we could also evaluate the cluster state and get the information from there. we + // don't do it because we would have to write another method for this that would have the same effect + IndexShard indexShard = getShard(request); + return indexShard == null || shardActive(indexShard); + } + }); + } } } diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index e1efe59776d..386c778b07e 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -45,6 +46,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -58,7 +60,12 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { @Override protected Settings nodeSettings(int nodeOrdinal) { // simplify this and only use a single data path - return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("path.data", "").build(); + return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("path.data", "") + // by default this value is 1 sec in tests (30 sec in practice) but we adding disruption here + // which is between 1 and 2 sec can cause each of the shard deletion requests to timeout. + // to prevent this we are setting the timeout here to something highish ie. the default in practice + .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)) + .build(); } @Test @@ -97,9 +104,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); - SlowClusterStateProcessing disruption = null; - if (randomBoolean()) { - disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000); + if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler + final SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000); internalCluster().setDisruptionScheme(disruption); disruption.startDisrupting(); } @@ -116,6 +122,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true)); assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true)); + } @Test