Add CCR repository test for snapshot shard size (#63649)
Following #61906 this commit adds two new integration tests that verifies the sizes of snapshotted shards for CCR repositories. Backport of #63590
This commit is contained in:
parent
424b313784
commit
57b5715bf7
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -13,22 +14,40 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
||||
import org.elasticsearch.snapshots.RestoreInfo;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotsInfoService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -40,18 +59,24 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.aMapWithSize;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class CcrRepositoryIT extends CcrIntegTestCase {
|
||||
|
||||
|
@ -440,6 +465,233 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats() throws Exception {
|
||||
final String leaderIndex = "leader";
|
||||
final int numberOfShards = randomIntBetween(1, 2);
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
|
||||
.setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(),
|
||||
TimeValue.ZERO.getStringRep())), XContentType.JSON));
|
||||
|
||||
final int numDocs = scaledRandomIntBetween(0, 1_000);
|
||||
final BulkRequestBuilder bulkRequest = leaderClient().prepareBulk(leaderIndex, "_doc");
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
bulkRequest.add(new IndexRequest(leaderIndex).id(Integer.toString(i)).source("field", i));
|
||||
}
|
||||
assertThat(bulkRequest.get().hasFailures(), is(false));
|
||||
|
||||
final ForceMergeResponse forceMergeResponse = leaderClient().admin().indices().prepareForceMerge(leaderIndex).setFlush(true).get();
|
||||
assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(numberOfShards));
|
||||
assertThat(forceMergeResponse.getFailedShards(), equalTo(0));
|
||||
ensureLeaderGreen(leaderIndex);
|
||||
|
||||
final IndexStats indexStats = leaderClient().admin().indices().prepareStats(leaderIndex)
|
||||
.clear()
|
||||
.setStore(true)
|
||||
.get()
|
||||
.getIndex(leaderIndex);
|
||||
assertThat(indexStats.getIndexShards(), notNullValue());
|
||||
assertThat(indexStats.getIndexShards(), aMapWithSize(numberOfShards));
|
||||
|
||||
final String leaderCluster = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||
final RepositoriesService repositoriesService = getFollowerCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
|
||||
final Repository repository = repositoriesService.repository(leaderCluster);
|
||||
assertThat(repository.getMetadata().type(), equalTo(CcrRepository.TYPE));
|
||||
assertThat(repository.getMetadata().name(), equalTo(leaderCluster));
|
||||
|
||||
for (int shardId = 0; shardId < numberOfShards; shardId++) {
|
||||
IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = repository.getShardSnapshotStatus(
|
||||
new SnapshotId(CcrRepository.LATEST, CcrRepository.LATEST),
|
||||
new IndexId(indexStats.getIndex(), indexStats.getUuid()),
|
||||
new ShardId(new Index(indexStats.getIndex(), indexStats.getUuid()), shardId)).asCopy();
|
||||
|
||||
assertThat(indexShardSnapshotStatus, notNullValue());
|
||||
assertThat(indexShardSnapshotStatus.getStage(), is(IndexShardSnapshotStatus.Stage.DONE));
|
||||
assertThat(indexShardSnapshotStatus.getTotalSize(),
|
||||
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
|
||||
}
|
||||
|
||||
final CountDownLatch blockCcrRestore = new CountDownLatch(1);
|
||||
|
||||
final List<MockTransportService> transportServices = new ArrayList<>();
|
||||
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
|
||||
final MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
|
||||
try {
|
||||
blockCcrRestore.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
transportServices.add(mockTransportService);
|
||||
}
|
||||
|
||||
try {
|
||||
final String followerIndex = "follower";
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
|
||||
final PlainActionFuture<IndexRoutingTable> waitForRestoreInProgress = PlainActionFuture.newFuture();
|
||||
final ClusterStateListener listener = event -> {
|
||||
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
|
||||
if (restoreInProgress != null
|
||||
&& restoreInProgress.isEmpty() == false
|
||||
&& event.state().routingTable().hasIndex(followerIndex)) {
|
||||
waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex));
|
||||
}
|
||||
};
|
||||
clusterService.addListener(listener);
|
||||
|
||||
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
|
||||
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
|
||||
.renameReplacement(followerIndex)
|
||||
.masterNodeTimeout(TimeValue.MAX_VALUE)
|
||||
.indexSettings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
|
||||
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());
|
||||
|
||||
final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
|
||||
clusterService.removeListener(listener);
|
||||
|
||||
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
|
||||
assertBusy(() -> {
|
||||
SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
|
||||
for (int shardId = 0; shardId < numberOfShards; shardId++) {
|
||||
Long snapshotShardSize = snapshotShardSizeInfo.getShardSize(indexRoutingTable.shard(shardId).primaryShard());
|
||||
assertThat(snapshotShardSize,
|
||||
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
|
||||
}
|
||||
});
|
||||
|
||||
blockCcrRestore.countDown();
|
||||
ensureFollowerGreen(followerIndex);
|
||||
|
||||
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
|
||||
} finally {
|
||||
transportServices.forEach(MockTransportService::clearAllRules);
|
||||
}
|
||||
}
|
||||
|
||||
public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
|
||||
final String leaderIndex = "leader";
|
||||
final int numberOfShards = randomIntBetween(1, 2);
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
|
||||
.setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(),
|
||||
TimeValue.ZERO.getStringRep())), XContentType.JSON));
|
||||
|
||||
final IndexMetadata indexMetadata = leaderClient().admin().cluster().prepareState().setIndices(leaderIndex)
|
||||
.clear().setMetadata(true).get().getState().metadata().index(leaderIndex);
|
||||
|
||||
final AtomicInteger indicesStatsRequestsCount = new AtomicInteger(0);
|
||||
final CountDownLatch blockCcrRestore = new CountDownLatch(1);
|
||||
|
||||
final List<MockTransportService> transportServices = new ArrayList<>();
|
||||
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
|
||||
final MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
|
||||
try {
|
||||
blockCcrRestore.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
transportServices.add(mockTransportService);
|
||||
}
|
||||
for (TransportService transportService : getLeaderCluster().getDataOrMasterNodeInstances(TransportService.class)) {
|
||||
final MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||
mockTransportService.addRequestHandlingBehavior(IndicesStatsAction.NAME, (handler, request, channel, task) -> {
|
||||
if (request instanceof IndicesStatsRequest) {
|
||||
IndicesStatsRequest indicesStatsRequest = (IndicesStatsRequest) request;
|
||||
if (Arrays.equals(indicesStatsRequest.indices(), new String[]{leaderIndex})
|
||||
&& indicesStatsRequest.store()
|
||||
&& indicesStatsRequest.search() == false
|
||||
&& indicesStatsRequest.fieldData() == false
|
||||
) {
|
||||
indicesStatsRequestsCount.incrementAndGet();
|
||||
channel.sendResponse(new ElasticsearchException("simulated"));
|
||||
return;
|
||||
}
|
||||
}
|
||||
handler.messageReceived(request, channel, task);
|
||||
});
|
||||
transportServices.add(mockTransportService);
|
||||
}
|
||||
|
||||
final String followerIndex = "follower";
|
||||
try {
|
||||
final String leaderCluster = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||
final RepositoriesService repositoriesService = getFollowerCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
|
||||
final Repository repository = repositoriesService.repository(leaderCluster);
|
||||
assertThat(repository.getMetadata().type(), equalTo(CcrRepository.TYPE));
|
||||
assertThat(repository.getMetadata().name(), equalTo(leaderCluster));
|
||||
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
final Index index = indexMetadata.getIndex();
|
||||
final int shardId = i;
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> repository.getShardSnapshotStatus(
|
||||
new SnapshotId(CcrRepository.LATEST, CcrRepository.LATEST),
|
||||
new IndexId(index.getName(), index.getUUID()),
|
||||
new ShardId(index, shardId)));
|
||||
assertThat(exception.getMessage(), equalTo("simulated"));
|
||||
}
|
||||
assertThat(indicesStatsRequestsCount.getAndSet(0), equalTo(numberOfShards));
|
||||
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
|
||||
final PlainActionFuture<IndexRoutingTable> waitForRestoreInProgress = PlainActionFuture.newFuture();
|
||||
final ClusterStateListener listener = event -> {
|
||||
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
|
||||
if (restoreInProgress != null
|
||||
&& restoreInProgress.isEmpty() == false
|
||||
&& event.state().routingTable().hasIndex(followerIndex)) {
|
||||
waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex));
|
||||
}
|
||||
};
|
||||
clusterService.addListener(listener);
|
||||
|
||||
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
|
||||
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
|
||||
.renameReplacement(followerIndex)
|
||||
.masterNodeTimeout(TimeValue.MAX_VALUE)
|
||||
.indexSettings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
|
||||
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());
|
||||
|
||||
final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
|
||||
clusterService.removeListener(listener);
|
||||
|
||||
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
|
||||
assertBusy(() -> {
|
||||
SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
|
||||
for (int shardId = 0; shardId < numberOfShards; shardId++) {
|
||||
final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
|
||||
assertThat(snapshotShardSizeInfo.getShardSize(primary), equalTo(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
final long randomSize = randomNonNegativeLong();
|
||||
assertThat(snapshotShardSizeInfo.getShardSize(primary, randomSize), equalTo(randomSize));
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
transportServices.forEach(MockTransportService::clearAllRules);
|
||||
}
|
||||
|
||||
assertThat(indicesStatsRequestsCount.get(), equalTo(numberOfShards));
|
||||
blockCcrRestore.countDown();
|
||||
|
||||
followerClient().admin().cluster().prepareReroute().get();
|
||||
ensureFollowerGreen(followerIndex);
|
||||
|
||||
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
|
||||
}
|
||||
|
||||
private void assertExpectedDocument(String followerIndex, final int value) {
|
||||
final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get();
|
||||
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
|
||||
|
|
Loading…
Reference in New Issue