[TEST] Use a high shard delete timeout when clusterstates are delayed

`IndiceStore#indexCleanup` uses a disruption scheme to delay cluster state
processing. Yet, the delay is [1..2] seconds but tests are setting the shard
deletion timeout to 1 second to speed up tests. This can cause random not
reproducible failures in this test since the timeouts and delays are bascially
overlapping. This commit adds a longer timeout for this test to prevent these
problems.
This commit is contained in:
Simon Willnauer 2015-04-29 17:51:21 +02:00
parent 02c0cdff0a
commit d4463602f6
2 changed files with 60 additions and 52 deletions

View File

@ -332,56 +332,57 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
// make sure shard is really there before register cluster state observer
if (indexShard == null) {
channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode()));
}
// create observer here. we need to register it here because we need to capture the current cluster state
// which will then be compared to the one that is applied when we call waitForNextChange(). if we create it
// later we might miss an update and wait forever in case no new cluster state comes in.
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
channel.sendResponse(new ShardActiveResponse(true, clusterService.localNode()));
} else {
// shard is not active, might be POST_RECOVERY so check if cluster state changed inbetween or wait for next change
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
sendResult(shardActive(getShard(request)));
}
@Override
public void onClusterServiceClose() {
sendResult(false);
}
@Override
public void onTimeout(TimeValue timeout) {
sendResult(shardActive(getShard(request)));
}
public void sendResult(boolean shardActive) {
try {
channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode()));
} catch (IOException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
} catch (EsRejectedExecutionException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
// create observer here. we need to register it here because we need to capture the current cluster state
// which will then be compared to the one that is applied when we call waitForNextChange(). if we create it
// later we might miss an update and wait forever in case no new cluster state comes in.
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
channel.sendResponse(new ShardActiveResponse(true, clusterService.localNode()));
} else {
// shard is not active, might be POST_RECOVERY so check if cluster state changed inbetween or wait for next change
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
sendResult(shardActive(getShard(request)));
}
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
// the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified
// or the shard is active in which case we want to send back that the shard is active
// here we could also evaluate the cluster state and get the information from there. we
// don't do it because we would have to write another method for this that would have the same effect
IndexShard indexShard = getShard(request);
return indexShard == null || shardActive(indexShard);
}
});
@Override
public void onClusterServiceClose() {
sendResult(false);
}
@Override
public void onTimeout(TimeValue timeout) {
sendResult(shardActive(getShard(request)));
}
public void sendResult(boolean shardActive) {
try {
channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode()));
} catch (IOException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
} catch (EsRejectedExecutionException e) {
logger.error("failed send response for shard active while trying to delete shard {} - shard will probably not be removed", e, request.shardId);
}
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
// the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified
// or the shard is active in which case we want to send back that the shard is active
// here we could also evaluate the cluster state and get the information from there. we
// don't do it because we would have to write another method for this that would have the same effect
IndexShard indexShard = getShard(request);
return indexShard == null || shardActive(indexShard);
}
});
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
@ -45,6 +46,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -58,7 +60,12 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) { // simplify this and only use a single data path
return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("path.data", "").build();
return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("path.data", "")
// by default this value is 1 sec in tests (30 sec in practice) but we adding disruption here
// which is between 1 and 2 sec can cause each of the shard deletion requests to timeout.
// to prevent this we are setting the timeout here to something highish ie. the default in practice
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS))
.build();
}
@Test
@ -97,9 +104,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
SlowClusterStateProcessing disruption = null;
if (randomBoolean()) {
disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000);
if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler
final SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000);
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
}
@ -116,6 +122,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true));
}
@Test