diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 965f38d87ed..414e6774ab7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -490,9 +490,7 @@ public class DiskThresholdDecider extends AllocationDecider { return targetShardSize == 0 ? defaultValue : targetShardSize; } else { if (shard.unassigned() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { - final Long shardSize = snapshotShardSizeInfo.getShardSize(shard); - assert shardSize != null : "no shard size provided for " + shard; - return shardSize; + return snapshotShardSizeInfo.getShardSize(shard, defaultValue); } return clusterInfo.getShardSize(shard, defaultValue); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardSizeInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardSizeInfo.java index 0534b62ea07..578f8f25334 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardSizeInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardSizeInfo.java @@ -48,6 +48,9 @@ public class SnapshotShardSizeInfo { public long getShardSize(ShardRouting shardRouting, long fallback) { final Long shardSize = getShardSize(shardRouting); - return shardSize == null ? fallback : shardSize; + if (shardSize == null || shardSize == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE) { + return fallback; + } + return shardSize; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java index 268f3054693..2a9bd2c106d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java @@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -170,9 +171,11 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase { verify(rerouteService, times(numberOfShards)).reroute(anyString(), any(Priority.class), any()); assertThat(getShardSnapshotStatusCount.get(), equalTo(numberOfShards)); + final SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes(); for (int i = 0; i < numberOfShards; i++) { final ShardRouting shardRouting = clusterService.state().routingTable().index(indexName).shard(i).primaryShard(); - assertThat(snapshotsInfoService.snapshotShardSizes().getShardSize(shardRouting), equalTo(expectedShardSizes[i])); + assertThat(snapshotShardSizeInfo.getShardSize(shardRouting), equalTo(expectedShardSizes[i])); + assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, Long.MIN_VALUE), equalTo(expectedShardSizes[i])); } } @@ -182,18 +185,19 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase { .put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)) .build(), clusterService, () -> repositoriesService, () -> rerouteService); - final Map results = new ConcurrentHashMap<>(); + final Map results = new ConcurrentHashMap<>(); final Repository mockRepository = new FilterRepository(mock(Repository.class)) { @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { final InternalSnapshotsInfoService.SnapshotShard snapshotShard = new InternalSnapshotsInfoService.SnapshotShard(new Snapshot("_repo", snapshotId), indexId, shardId); if (randomBoolean()) { - results.put(snapshotShard, Boolean.FALSE); + results.put(snapshotShard, Long.MIN_VALUE); throw new SnapshotException(snapshotShard.snapshot(), "simulated"); } else { - results.put(snapshotShard, Boolean.TRUE); - return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, randomNonNegativeLong(), null); + final long shardSize = randomNonNegativeLong(); + results.put(snapshotShard, shardSize); + return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, shardSize, null); } } }; @@ -218,13 +222,29 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase { addSnapshotRestoreIndicesThread.start(); addSnapshotRestoreIndicesThread.join(); + final Predicate failedSnapshotShardSizeRetrieval = shardSize -> shardSize == Long.MIN_VALUE; assertBusy(() -> { assertThat(snapshotsInfoService.numberOfKnownSnapshotShardSizes(), - equalTo((int) results.values().stream().filter(result -> result.equals(Boolean.TRUE)).count())); + equalTo((int) results.values().stream().filter(size -> failedSnapshotShardSizeRetrieval.test(size) == false).count())); assertThat(snapshotsInfoService.numberOfFailedSnapshotShardSizes(), - equalTo((int) results.values().stream().filter(result -> result.equals(Boolean.FALSE)).count())); + equalTo((int) results.values().stream().filter(failedSnapshotShardSizeRetrieval).count())); assertThat(snapshotsInfoService.numberOfUnknownSnapshotShardSizes(), equalTo(0)); }); + + final SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes(); + for (Map.Entry snapshotShard : results.entrySet()) { + final ShardId shardId = snapshotShard.getKey().shardId(); + final ShardRouting shardRouting = clusterService.state().routingTable().index(shardId.getIndexName()) + .shard(shardId.id()).primaryShard(); + assertThat(shardRouting, notNullValue()); + + final boolean success = failedSnapshotShardSizeRetrieval.test(snapshotShard.getValue()) == false; + assertThat(snapshotShardSizeInfo.getShardSize(shardRouting), + success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + final long defaultValue = randomNonNegativeLong(); + assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, defaultValue), + success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(defaultValue)); + } } public void testNoLongerMaster() throws Exception {