From 57b5715bf786c88ed8fdc568b5859a9805ad796d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 14 Oct 2020 12:51:42 +0200 Subject: [PATCH] Add CCR repository test for snapshot shard size (#63649) Following #61906 this commit adds two new integration tests that verifies the sizes of snapshotted shards for CCR repositories. Backport of #63590 --- .../xpack/ccr/CcrRepositoryIT.java | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 504a78da137..f215f5bbadb 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -13,22 +14,40 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; @@ -40,18 +59,24 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; public class CcrRepositoryIT extends CcrIntegTestCase { @@ -440,6 +465,233 @@ public class CcrRepositoryIT extends CcrIntegTestCase { } } + public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats() throws Exception { + final String leaderIndex = "leader"; + final int numberOfShards = randomIntBetween(1, 2); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex) + .setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), + TimeValue.ZERO.getStringRep())), XContentType.JSON)); + + final int numDocs = scaledRandomIntBetween(0, 1_000); + final BulkRequestBuilder bulkRequest = leaderClient().prepareBulk(leaderIndex, "_doc"); + for (int i = 0; i < numDocs; i++) { + bulkRequest.add(new IndexRequest(leaderIndex).id(Integer.toString(i)).source("field", i)); + } + assertThat(bulkRequest.get().hasFailures(), is(false)); + + final ForceMergeResponse forceMergeResponse = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setFlush(true).get(); + assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(numberOfShards)); + assertThat(forceMergeResponse.getFailedShards(), equalTo(0)); + ensureLeaderGreen(leaderIndex); + + final IndexStats indexStats = leaderClient().admin().indices().prepareStats(leaderIndex) + .clear() + .setStore(true) + .get() + .getIndex(leaderIndex); + assertThat(indexStats.getIndexShards(), notNullValue()); + assertThat(indexStats.getIndexShards(), aMapWithSize(numberOfShards)); + + final String leaderCluster = CcrRepository.NAME_PREFIX + "leader_cluster"; + final RepositoriesService repositoriesService = getFollowerCluster().getCurrentMasterNodeInstance(RepositoriesService.class); + final Repository repository = repositoriesService.repository(leaderCluster); + assertThat(repository.getMetadata().type(), equalTo(CcrRepository.TYPE)); + assertThat(repository.getMetadata().name(), equalTo(leaderCluster)); + + for (int shardId = 0; shardId < numberOfShards; shardId++) { + IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = repository.getShardSnapshotStatus( + new SnapshotId(CcrRepository.LATEST, CcrRepository.LATEST), + new IndexId(indexStats.getIndex(), indexStats.getUuid()), + new ShardId(new Index(indexStats.getIndex(), indexStats.getUuid()), shardId)).asCopy(); + + assertThat(indexShardSnapshotStatus, notNullValue()); + assertThat(indexShardSnapshotStatus.getStage(), is(IndexShardSnapshotStatus.Stage.DONE)); + assertThat(indexShardSnapshotStatus.getTotalSize(), + equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes())); + } + + final CountDownLatch blockCcrRestore = new CountDownLatch(1); + + final List transportServices = new ArrayList<>(); + for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) { + final MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PutCcrRestoreSessionAction.NAME)) { + try { + blockCcrRestore.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + transportServices.add(mockTransportService); + } + + try { + final String followerIndex = "follower"; + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + final PlainActionFuture waitForRestoreInProgress = PlainActionFuture.newFuture(); + final ClusterStateListener listener = event -> { + RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); + if (restoreInProgress != null + && restoreInProgress.isEmpty() == false + && event.state().routingTable().hasIndex(followerIndex)) { + waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex)); + } + }; + clusterService.addListener(listener); + + final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST) + .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$") + .renameReplacement(followerIndex) + .masterNodeTimeout(TimeValue.MAX_VALUE) + .indexSettings(Settings.builder() + .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)); + restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture()); + + final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS); + clusterService.removeListener(listener); + + final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); + assertBusy(() -> { + SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes(); + for (int shardId = 0; shardId < numberOfShards; shardId++) { + Long snapshotShardSize = snapshotShardSizeInfo.getShardSize(indexRoutingTable.shard(shardId).primaryShard()); + assertThat(snapshotShardSize, + equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes())); + } + }); + + blockCcrRestore.countDown(); + ensureFollowerGreen(followerIndex); + + assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE)); + } finally { + transportServices.forEach(MockTransportService::clearAllRules); + } + } + + public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { + final String leaderIndex = "leader"; + final int numberOfShards = randomIntBetween(1, 2); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex) + .setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), + TimeValue.ZERO.getStringRep())), XContentType.JSON)); + + final IndexMetadata indexMetadata = leaderClient().admin().cluster().prepareState().setIndices(leaderIndex) + .clear().setMetadata(true).get().getState().metadata().index(leaderIndex); + + final AtomicInteger indicesStatsRequestsCount = new AtomicInteger(0); + final CountDownLatch blockCcrRestore = new CountDownLatch(1); + + final List transportServices = new ArrayList<>(); + for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) { + final MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PutCcrRestoreSessionAction.NAME)) { + try { + blockCcrRestore.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + transportServices.add(mockTransportService); + } + for (TransportService transportService : getLeaderCluster().getDataOrMasterNodeInstances(TransportService.class)) { + final MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addRequestHandlingBehavior(IndicesStatsAction.NAME, (handler, request, channel, task) -> { + if (request instanceof IndicesStatsRequest) { + IndicesStatsRequest indicesStatsRequest = (IndicesStatsRequest) request; + if (Arrays.equals(indicesStatsRequest.indices(), new String[]{leaderIndex}) + && indicesStatsRequest.store() + && indicesStatsRequest.search() == false + && indicesStatsRequest.fieldData() == false + ) { + indicesStatsRequestsCount.incrementAndGet(); + channel.sendResponse(new ElasticsearchException("simulated")); + return; + } + } + handler.messageReceived(request, channel, task); + }); + transportServices.add(mockTransportService); + } + + final String followerIndex = "follower"; + try { + final String leaderCluster = CcrRepository.NAME_PREFIX + "leader_cluster"; + final RepositoriesService repositoriesService = getFollowerCluster().getCurrentMasterNodeInstance(RepositoriesService.class); + final Repository repository = repositoriesService.repository(leaderCluster); + assertThat(repository.getMetadata().type(), equalTo(CcrRepository.TYPE)); + assertThat(repository.getMetadata().name(), equalTo(leaderCluster)); + + for (int i = 0; i < numberOfShards; i++) { + final Index index = indexMetadata.getIndex(); + final int shardId = i; + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> repository.getShardSnapshotStatus( + new SnapshotId(CcrRepository.LATEST, CcrRepository.LATEST), + new IndexId(index.getName(), index.getUUID()), + new ShardId(index, shardId))); + assertThat(exception.getMessage(), equalTo("simulated")); + } + assertThat(indicesStatsRequestsCount.getAndSet(0), equalTo(numberOfShards)); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + final PlainActionFuture waitForRestoreInProgress = PlainActionFuture.newFuture(); + final ClusterStateListener listener = event -> { + RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); + if (restoreInProgress != null + && restoreInProgress.isEmpty() == false + && event.state().routingTable().hasIndex(followerIndex)) { + waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex)); + } + }; + clusterService.addListener(listener); + + final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST) + .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$") + .renameReplacement(followerIndex) + .masterNodeTimeout(TimeValue.MAX_VALUE) + .indexSettings(Settings.builder() + .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)); + restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture()); + + final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS); + clusterService.removeListener(listener); + + final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); + assertBusy(() -> { + SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes(); + for (int shardId = 0; shardId < numberOfShards; shardId++) { + final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard(); + assertThat(snapshotShardSizeInfo.getShardSize(primary), equalTo(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + final long randomSize = randomNonNegativeLong(); + assertThat(snapshotShardSizeInfo.getShardSize(primary, randomSize), equalTo(randomSize)); + } + }); + } finally { + transportServices.forEach(MockTransportService::clearAllRules); + } + + assertThat(indicesStatsRequestsCount.get(), equalTo(numberOfShards)); + blockCcrRestore.countDown(); + + followerClient().admin().cluster().prepareReroute().get(); + ensureFollowerGreen(followerIndex); + + assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE)); + } + private void assertExpectedDocument(String followerIndex, final int value) { final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get(); assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());