Move testRetentionLeasesClearedOnRestore (#45896)

This commit is contained in:
Tanguy Leroux 2019-08-23 13:40:39 +02:00
parent aee92d573c
commit 8e66df9925
2 changed files with 81 additions and 62 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -41,6 +42,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -62,6 +64,9 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
@ -95,12 +100,15 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@ -1229,6 +1237,79 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
}, 60L, TimeUnit.SECONDS);
}
public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = "test-repo-retention-leases";
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs")
.setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())));
final String indexName = "index-retention-leases";
final int shardCount = randomIntBetween(1, 5);
assertAcked(client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
.get());
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
final int snapshotDocCount = iterations(10, 1000);
logger.debug("--> indexing {} docs into {}", snapshotDocCount, indexName);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[snapshotDocCount];
for (int i = 0; i < snapshotDocCount; i++) {
indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value");
}
indexRandom(true, indexRequestBuilders);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final String leaseId = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
logger.debug("--> adding retention lease with id {} to {}", leaseId, shardId);
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
shardId, leaseId, RETAIN_ALL, "test")).actionGet();
final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
final String snapshotName = "snapshot-retention-leases";
logger.debug("--> create snapshot {}:{}", repoName, snapshotName);
CreateSnapshotResponse createResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName).get();
assertThat(createResponse.getSnapshotInfo().successfulShards(), equalTo(shardCount));
assertThat(createResponse.getSnapshotInfo().failedShards(), equalTo(0));
if (randomBoolean()) {
final int extraDocCount = iterations(10, 1000);
logger.debug("--> indexing {} extra docs into {}", extraDocCount, indexName);
indexRequestBuilders = new IndexRequestBuilder[extraDocCount];
for (int i = 0; i < extraDocCount; i++) {
indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value");
}
indexRandom(true, indexRequestBuilders);
}
// Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet
ensureGreen();
logger.debug("--> close index {}", indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
logger.debug("--> restore index {} from snapshot", indexName);
RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).get();
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(shardCount));
assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
.getRetentionLeaseStats().retentionLeases();
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}
private long calculateTotalFilesSize(List<Path> files) {
return files.stream().mapToLong(f -> {
try {
@ -1239,7 +1320,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
}).sum();
}
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){

View File

@ -23,15 +23,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotReq
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -49,7 +43,6 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@ -278,60 +271,6 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
}
}
public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createAndCheckTestRepository(repoName);
final String indexName = randomAsciiName();
final int shardCount = randomIntBetween(1, 5);
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get());
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
final int snapshotDocCount = iterations(10, 1000);
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
addRandomDocuments(indexName, snapshotDocCount);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final String leaseId = randomAsciiName();
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
shardId, leaseId, RETAIN_ALL, "test")).actionGet();
final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
final String snapshotName = randomAsciiName();
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName));
if (randomBoolean()) {
final int extraDocCount = iterations(10, 1000);
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
addRandomDocuments(indexName, extraDocCount);
}
// Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet
ensureGreen();
logger.info("--> close index {}", indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
logger.info("--> restore index {} from snapshot", indexName);
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));
ensureGreen();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
.getRetentionLeaseStats().retentionLeases();
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {