Makes testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats more robust (#64976) (#64989)
Today this test fails because the sizes of the snapshot shards are only kept in a very short period of time in the InternalSnapshotsInfoService and are not guaranteed to exist once the shards are correctly assigned. closes #64167
This commit is contained in:
parent
5c5fd50f8d
commit
e40d7e02ea
|
@ -29,6 +29,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -63,6 +65,7 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -70,6 +73,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.aMapWithSize;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -467,12 +471,12 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
|
||||
public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats() throws Exception {
|
||||
final String leaderIndex = "leader";
|
||||
final int numberOfShards = randomIntBetween(1, 2);
|
||||
final int numberOfShards = randomIntBetween(1, 5);
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
|
||||
.setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(),
|
||||
TimeValue.ZERO.getStringRep())), XContentType.JSON));
|
||||
|
||||
final int numDocs = scaledRandomIntBetween(0, 1_000);
|
||||
final int numDocs = scaledRandomIntBetween(0, 500);
|
||||
if (numDocs > 0) {
|
||||
final BulkRequestBuilder bulkRequest = leaderClient().prepareBulk(leaderIndex, "_doc");
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -515,69 +519,62 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
|
||||
}
|
||||
|
||||
final CountDownLatch blockCcrRestore = new CountDownLatch(1);
|
||||
final String followerIndex = "follower";
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
|
||||
|
||||
final List<MockTransportService> transportServices = new ArrayList<>();
|
||||
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
|
||||
final MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
|
||||
try {
|
||||
blockCcrRestore.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
final Map<Integer, Long> fetchedSnapshotShardSizes = new ConcurrentHashMap<>();
|
||||
|
||||
final PlainActionFuture<Void> waitForRestoreInProgress = PlainActionFuture.newFuture();
|
||||
final ClusterStateListener listener = event -> {
|
||||
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
|
||||
if (restoreInProgress != null
|
||||
&& restoreInProgress.isEmpty() == false
|
||||
&& event.state().routingTable().hasIndex(followerIndex)) {
|
||||
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex);
|
||||
for (ShardRouting shardRouting : indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)) {
|
||||
if (shardRouting.unassignedInfo().getLastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) {
|
||||
try {
|
||||
assertBusy(() -> {
|
||||
final Long snapshotShardSize = snapshotsInfoService.snapshotShardSizes().getShardSize(shardRouting);
|
||||
assertThat(snapshotShardSize, notNullValue());
|
||||
fetchedSnapshotShardSizes.put(shardRouting.getId(), snapshotShardSize);
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError("Failed to retrieve snapshot shard size for shard " + shardRouting, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
transportServices.add(mockTransportService);
|
||||
logger.info("--> [{}/{}] snapshot shard sizes fetched", fetchedSnapshotShardSizes.size(), numberOfShards);
|
||||
if (fetchedSnapshotShardSizes.size() == numberOfShards) {
|
||||
waitForRestoreInProgress.onResponse(null);
|
||||
}
|
||||
}
|
||||
};
|
||||
clusterService.addListener(listener);
|
||||
|
||||
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
|
||||
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
|
||||
.renameReplacement(followerIndex)
|
||||
.masterNodeTimeout(TimeValue.MAX_VALUE)
|
||||
.indexSettings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
|
||||
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());
|
||||
|
||||
waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
|
||||
clusterService.removeListener(listener);
|
||||
ensureFollowerGreen(followerIndex);
|
||||
|
||||
for (int shardId = 0; shardId < numberOfShards; shardId++) {
|
||||
assertThat("Snapshot shard size fetched for follower shard [" + shardId + "] does not match leader store size",
|
||||
fetchedSnapshotShardSizes.get(shardId),
|
||||
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
|
||||
}
|
||||
|
||||
try {
|
||||
final String followerIndex = "follower";
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
|
||||
final PlainActionFuture<IndexRoutingTable> waitForRestoreInProgress = PlainActionFuture.newFuture();
|
||||
final ClusterStateListener listener = event -> {
|
||||
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
|
||||
if (restoreInProgress != null
|
||||
&& restoreInProgress.isEmpty() == false
|
||||
&& event.state().routingTable().hasIndex(followerIndex)) {
|
||||
waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex));
|
||||
}
|
||||
};
|
||||
clusterService.addListener(listener);
|
||||
|
||||
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
|
||||
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
|
||||
.renameReplacement(followerIndex)
|
||||
.masterNodeTimeout(TimeValue.MAX_VALUE)
|
||||
.indexSettings(Settings.builder()
|
||||
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
|
||||
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());
|
||||
|
||||
final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
|
||||
clusterService.removeListener(listener);
|
||||
|
||||
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
|
||||
assertBusy(() -> {
|
||||
SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
|
||||
for (int shardId = 0; shardId < numberOfShards; shardId++) {
|
||||
Long snapshotShardSize = snapshotShardSizeInfo.getShardSize(indexRoutingTable.shard(shardId).primaryShard());
|
||||
assertThat(snapshotShardSize,
|
||||
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
|
||||
}
|
||||
}, 60L, TimeUnit.SECONDS);
|
||||
|
||||
blockCcrRestore.countDown();
|
||||
ensureFollowerGreen(followerIndex);
|
||||
|
||||
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
|
||||
} finally {
|
||||
transportServices.forEach(MockTransportService::clearAllRules);
|
||||
}
|
||||
assertHitCount(followerClient().prepareSearch(followerIndex).setSize(0).get(), numDocs);
|
||||
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
|
||||
}
|
||||
|
||||
public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue