Add snapshot shard size based test in DiskThresholdDeciderTests (#63913)

This commit adds a test in DiskThresholdDeciderTests that verifies
 the allocation of a snapshot recovery source based shard in the 
situation where the snapshot shard size was successfully provided 
by the SnapshotInfoService introduced in #61906 and when the 
service failed to provide the size.

Relates #61906
This commit is contained in:
Tanguy Leroux 2020-10-20 14:59:00 +02:00 committed by GitHub
parent eff7f06ca6
commit b2e07076a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 117 additions and 0 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import com.carrotsearch.hppc.IntHashSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
@ -26,11 +27,13 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -40,6 +43,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.UnassignedInfo.Reason;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -47,11 +52,18 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllo
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService.SnapshotShard;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.util.Arrays;
@ -63,6 +75,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@ -70,6 +83,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
@ -1130,6 +1144,109 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
" actual free: [20.0%]"));
}
public void testDiskThresholdWithSnapshotShardSizes() {
final long shardSizeInBytes = randomBoolean() ? 10L : 50L;
logger.info("--> using shard size [{}]", shardSizeInBytes);
final Settings diskSettings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "95%")
.build();
final ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 21)); // 79% used
usagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 1)); // 99% used
final ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
final ClusterInfoService clusterInfoService = () -> new DevNullClusterInfo(usages, usages, ImmutableOpenMap.of());
final AllocationDeciders deciders = new AllocationDeciders(
new HashSet<>(Arrays.asList(
new RestoreInProgressAllocationDecider(),
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
makeDecider(diskSettings))));
final Snapshot snapshot = new Snapshot("_repository", new SnapshotId("_snapshot_name", UUIDs.randomBase64UUID(random())));
final IndexId indexId = new IndexId("_indexid_name", UUIDs.randomBase64UUID(random()));
final ShardId shardId = new ShardId(new Index("test", IndexMetadata.INDEX_UUID_NA_VALUE), 0);
final Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.putInSyncAllocationIds(0, singleton(AllocationId.newInitializing().getId()))
).build();
final RoutingTable routingTable = RoutingTable.builder()
.addAsNewRestore(
metadata.index("test"),
new RecoverySource.SnapshotRecoverySource("_restore_uuid", snapshot, Version.CURRENT, indexId),
new IntHashSet()
).build();
final ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shards = ImmutableOpenMap.builder();
shards.put(shardId, new RestoreInProgress.ShardRestoreStatus("node1"));
final RestoreInProgress.Builder restores = new RestoreInProgress.Builder()
.add(new RestoreInProgress.Entry("_restore_uuid", snapshot, RestoreInProgress.State.INIT, singletonList("test"),
shards.build()));
ClusterState clusterState = ClusterState.builder(new ClusterName(getTestName()))
.metadata(metadata)
.routingTable(routingTable)
.putCustom(RestoreInProgress.TYPE, restores.build())
.nodes(DiscoveryNodes.builder()
.add(newNode("node1"))
.add(newNode("node2")) // node2 is added because DiskThresholdDecider automatically ignore single-node clusters
).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().map(ShardRouting::unassignedInfo)
.allMatch(unassignedInfo -> Reason.NEW_INDEX_RESTORED.equals(unassignedInfo.getReason())), is(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().map(ShardRouting::unassignedInfo)
.allMatch(unassignedInfo -> AllocationStatus.NO_ATTEMPT.equals(unassignedInfo.getLastAllocationStatus())), is(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
final AtomicReference<SnapshotShardSizeInfo> snapshotShardSizeInfoRef = new AtomicReference<>(SnapshotShardSizeInfo.EMPTY);
final AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY), clusterInfoService, snapshotShardSizeInfoRef::get);
// reroute triggers snapshot shard size fetching
clusterState = strategy.reroute(clusterState, "reroute");
logShardStates(clusterState);
// shard cannot be assigned yet as the snapshot shard size is unknown
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().map(ShardRouting::unassignedInfo)
.allMatch(unassignedInfo -> AllocationStatus.FETCHING_SHARD_DATA.equals(unassignedInfo.getLastAllocationStatus())), is(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
final SnapshotShard snapshotShard = new SnapshotShard(snapshot, indexId, shardId);
final ImmutableOpenMap.Builder<SnapshotShard, Long> snapshotShardSizes = ImmutableOpenMap.builder();
final boolean shouldAllocate;
if (randomBoolean()) {
logger.info("--> simulating snapshot shards size retrieval success");
snapshotShardSizes.put(snapshotShard, shardSizeInBytes);
logger.info("--> shard allocation depends on its size");
shouldAllocate = shardSizeInBytes < usages.get("node1").getFreeBytes();
} else {
logger.info("--> simulating snapshot shards size retrieval failure");
snapshotShardSizes.put(snapshotShard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
logger.info("--> shard is always allocated when its size could not be retrieved");
shouldAllocate = true;
}
snapshotShardSizeInfoRef.set(new SnapshotShardSizeInfo(snapshotShardSizes.build()));
// reroute uses the previous snapshot shard size
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
logShardStates(clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(shouldAllocate ? 0 : 1));
assertThat(clusterState.getRoutingNodes().shardsWithState("test", INITIALIZING, STARTED).size(), equalTo(shouldAllocate ? 1 : 0));
}
public void logShardStates(ClusterState state) {
RoutingNodes rn = state.getRoutingNodes();
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",