From 8e66df99257786c989d547672fbccc530a9035e6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 23 Aug 2019 13:40:39 +0200 Subject: [PATCH] Move testRetentionLeasesClearedOnRestore (#45896) --- .../DedicatedClusterSnapshotRestoreIT.java | 82 ++++++++++++++++++- .../ESBlobStoreRepositoryIntegTestCase.java | 61 -------------- 2 files changed, 81 insertions(+), 62 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 8dbb2cbae42..b924d1eccce 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -41,6 +42,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -62,6 +64,9 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; @@ -95,12 +100,15 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -1229,6 +1237,79 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest }, 60L, TimeUnit.SECONDS); } + public void testRetentionLeasesClearedOnRestore() throws Exception { + final String repoName = "test-repo-retention-leases"; + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("fs") + .setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()))); + + final String indexName = "index-retention-leases"; + final int shardCount = randomIntBetween(1, 5); + assertAcked(client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + .get()); + final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1)); + + final int snapshotDocCount = iterations(10, 1000); + logger.debug("--> indexing {} docs into {}", snapshotDocCount, indexName); + IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[snapshotDocCount]; + for (int i = 0; i < snapshotDocCount; i++) { + indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value"); + } + indexRandom(true, indexRequestBuilders); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final String leaseId = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); + logger.debug("--> adding retention lease with id {} to {}", leaseId, shardId); + client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest( + shardId, leaseId, RETAIN_ALL, "test")).actionGet(); + + final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards()) + .filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get(); + final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); + assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId)); + + final String snapshotName = "snapshot-retention-leases"; + logger.debug("--> create snapshot {}:{}", repoName, snapshotName); + CreateSnapshotResponse createResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).setIndices(indexName).get(); + assertThat(createResponse.getSnapshotInfo().successfulShards(), equalTo(shardCount)); + assertThat(createResponse.getSnapshotInfo().failedShards(), equalTo(0)); + + if (randomBoolean()) { + final int extraDocCount = iterations(10, 1000); + logger.debug("--> indexing {} extra docs into {}", extraDocCount, indexName); + indexRequestBuilders = new IndexRequestBuilder[extraDocCount]; + for (int i = 0; i < extraDocCount; i++) { + indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value"); + } + indexRandom(true, indexRequestBuilders); + } + + // Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet + ensureGreen(); + logger.debug("--> close index {}", indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + + logger.debug("--> restore index {} from snapshot", indexName); + RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).get(); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(shardCount)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + + ensureGreen(); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get() + .getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get() + .getRetentionLeaseStats().retentionLeases(); + assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId)); + } + private long calculateTotalFilesSize(List files) { return files.stream().mapToLong(f -> { try { @@ -1239,7 +1320,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest }).sum(); } - private List scanSnapshotFolder(Path repoPath) throws IOException { List files = new ArrayList<>(); Files.walkFileTree(repoPath, new SimpleFileVisitor(){ diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index afdf3f0bb2a..cc835b24535 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -23,15 +23,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotReq import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.seqno.RetentionLeaseActions; -import org.elasticsearch.index.seqno.RetentionLeases; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -49,7 +43,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -278,60 +271,6 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase } } - public void testRetentionLeasesClearedOnRestore() throws Exception { - final String repoName = randomAsciiName(); - logger.info("--> creating repository {}", repoName); - createAndCheckTestRepository(repoName); - - final String indexName = randomAsciiName(); - final int shardCount = randomIntBetween(1, 5); - assertAcked(client().admin().indices().prepareCreate(indexName).setSettings( - Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get()); - final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1)); - - final int snapshotDocCount = iterations(10, 1000); - logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName); - addRandomDocuments(indexName, snapshotDocCount); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); - - final String leaseId = randomAsciiName(); - logger.info("--> adding retention lease with id {} to {}", leaseId, shardId); - client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest( - shardId, leaseId, RETAIN_ALL, "test")).actionGet(); - - final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards()) - .filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get(); - final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); - assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId)); - - final String snapshotName = randomAsciiName(); - logger.info("--> create snapshot {}:{}", repoName, snapshotName); - assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).setIndices(indexName)); - - if (randomBoolean()) { - final int extraDocCount = iterations(10, 1000); - logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName); - addRandomDocuments(indexName, extraDocCount); - } - - // Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet - ensureGreen(); - logger.info("--> close index {}", indexName); - assertAcked(client().admin().indices().prepareClose(indexName)); - - logger.info("--> restore index {} from snapshot", indexName); - assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true)); - - ensureGreen(); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); - - final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get() - .getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get() - .getRetentionLeaseStats().retentionLeases(); - assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId)); - } - protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) {