In #61906 we agreed on always providing the default value ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE when the SnasphotInfoService failed to retrieve the exact size for a given snapshot shard. The motivation was to allow the shard allocation to move forward in case of failures (so that the unassigned shard does not get stuck in an unassigned state for too long) while relying on the fallback values for shard sizes. Sadly a bug in the SnapshotShardSizeInfo#getShardSize(ShardRouting, long) makes the default value to be ignored when the snapshot shard size retrieval previously failed, returning ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE instead of the provided default value. With DiskThresholdDecider also not relying on the provided default value this triggers some assertion like in #63376 which helped us to spot the bug. Closes ##63376
This commit is contained in:
parent
e4f37d96f4
commit
eac99dd594
|
@ -490,9 +490,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
return targetShardSize == 0 ? defaultValue : targetShardSize;
|
return targetShardSize == 0 ? defaultValue : targetShardSize;
|
||||||
} else {
|
} else {
|
||||||
if (shard.unassigned() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
|
if (shard.unassigned() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
|
||||||
final Long shardSize = snapshotShardSizeInfo.getShardSize(shard);
|
return snapshotShardSizeInfo.getShardSize(shard, defaultValue);
|
||||||
assert shardSize != null : "no shard size provided for " + shard;
|
|
||||||
return shardSize;
|
|
||||||
}
|
}
|
||||||
return clusterInfo.getShardSize(shard, defaultValue);
|
return clusterInfo.getShardSize(shard, defaultValue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,9 @@ public class SnapshotShardSizeInfo {
|
||||||
|
|
||||||
public long getShardSize(ShardRouting shardRouting, long fallback) {
|
public long getShardSize(ShardRouting shardRouting, long fallback) {
|
||||||
final Long shardSize = getShardSize(shardRouting);
|
final Long shardSize = getShardSize(shardRouting);
|
||||||
return shardSize == null ? fallback : shardSize;
|
if (shardSize == null || shardSize == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE) {
|
||||||
|
return fallback;
|
||||||
|
}
|
||||||
|
return shardSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
|
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
||||||
|
@ -170,9 +171,11 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
|
||||||
verify(rerouteService, times(numberOfShards)).reroute(anyString(), any(Priority.class), any());
|
verify(rerouteService, times(numberOfShards)).reroute(anyString(), any(Priority.class), any());
|
||||||
assertThat(getShardSnapshotStatusCount.get(), equalTo(numberOfShards));
|
assertThat(getShardSnapshotStatusCount.get(), equalTo(numberOfShards));
|
||||||
|
|
||||||
|
final SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
|
||||||
for (int i = 0; i < numberOfShards; i++) {
|
for (int i = 0; i < numberOfShards; i++) {
|
||||||
final ShardRouting shardRouting = clusterService.state().routingTable().index(indexName).shard(i).primaryShard();
|
final ShardRouting shardRouting = clusterService.state().routingTable().index(indexName).shard(i).primaryShard();
|
||||||
assertThat(snapshotsInfoService.snapshotShardSizes().getShardSize(shardRouting), equalTo(expectedShardSizes[i]));
|
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting), equalTo(expectedShardSizes[i]));
|
||||||
|
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, Long.MIN_VALUE), equalTo(expectedShardSizes[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,18 +185,19 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
|
||||||
.put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10))
|
.put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10))
|
||||||
.build(), clusterService, () -> repositoriesService, () -> rerouteService);
|
.build(), clusterService, () -> repositoriesService, () -> rerouteService);
|
||||||
|
|
||||||
final Map<InternalSnapshotsInfoService.SnapshotShard, Boolean> results = new ConcurrentHashMap<>();
|
final Map<InternalSnapshotsInfoService.SnapshotShard, Long> results = new ConcurrentHashMap<>();
|
||||||
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
|
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
|
||||||
@Override
|
@Override
|
||||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
|
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
|
||||||
final InternalSnapshotsInfoService.SnapshotShard snapshotShard =
|
final InternalSnapshotsInfoService.SnapshotShard snapshotShard =
|
||||||
new InternalSnapshotsInfoService.SnapshotShard(new Snapshot("_repo", snapshotId), indexId, shardId);
|
new InternalSnapshotsInfoService.SnapshotShard(new Snapshot("_repo", snapshotId), indexId, shardId);
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
results.put(snapshotShard, Boolean.FALSE);
|
results.put(snapshotShard, Long.MIN_VALUE);
|
||||||
throw new SnapshotException(snapshotShard.snapshot(), "simulated");
|
throw new SnapshotException(snapshotShard.snapshot(), "simulated");
|
||||||
} else {
|
} else {
|
||||||
results.put(snapshotShard, Boolean.TRUE);
|
final long shardSize = randomNonNegativeLong();
|
||||||
return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, randomNonNegativeLong(), null);
|
results.put(snapshotShard, shardSize);
|
||||||
|
return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, shardSize, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -218,13 +222,29 @@ public class InternalSnapshotsInfoServiceTests extends ESTestCase {
|
||||||
addSnapshotRestoreIndicesThread.start();
|
addSnapshotRestoreIndicesThread.start();
|
||||||
addSnapshotRestoreIndicesThread.join();
|
addSnapshotRestoreIndicesThread.join();
|
||||||
|
|
||||||
|
final Predicate<Long> failedSnapshotShardSizeRetrieval = shardSize -> shardSize == Long.MIN_VALUE;
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
assertThat(snapshotsInfoService.numberOfKnownSnapshotShardSizes(),
|
assertThat(snapshotsInfoService.numberOfKnownSnapshotShardSizes(),
|
||||||
equalTo((int) results.values().stream().filter(result -> result.equals(Boolean.TRUE)).count()));
|
equalTo((int) results.values().stream().filter(size -> failedSnapshotShardSizeRetrieval.test(size) == false).count()));
|
||||||
assertThat(snapshotsInfoService.numberOfFailedSnapshotShardSizes(),
|
assertThat(snapshotsInfoService.numberOfFailedSnapshotShardSizes(),
|
||||||
equalTo((int) results.values().stream().filter(result -> result.equals(Boolean.FALSE)).count()));
|
equalTo((int) results.values().stream().filter(failedSnapshotShardSizeRetrieval).count()));
|
||||||
assertThat(snapshotsInfoService.numberOfUnknownSnapshotShardSizes(), equalTo(0));
|
assertThat(snapshotsInfoService.numberOfUnknownSnapshotShardSizes(), equalTo(0));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
final SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
|
||||||
|
for (Map.Entry<InternalSnapshotsInfoService.SnapshotShard, Long> snapshotShard : results.entrySet()) {
|
||||||
|
final ShardId shardId = snapshotShard.getKey().shardId();
|
||||||
|
final ShardRouting shardRouting = clusterService.state().routingTable().index(shardId.getIndexName())
|
||||||
|
.shard(shardId.id()).primaryShard();
|
||||||
|
assertThat(shardRouting, notNullValue());
|
||||||
|
|
||||||
|
final boolean success = failedSnapshotShardSizeRetrieval.test(snapshotShard.getValue()) == false;
|
||||||
|
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting),
|
||||||
|
success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||||
|
final long defaultValue = randomNonNegativeLong();
|
||||||
|
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, defaultValue),
|
||||||
|
success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(defaultValue));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNoLongerMaster() throws Exception {
|
public void testNoLongerMaster() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue