Fix refresh behavior in MockDiskUsagesIT (#57926)
Ensures that InternalClusterInfoService's internally cached stats are refreshed whenever the shard size or disk usage function (to mock out disk usage) are overridden. Closes #57888
This commit is contained in:
parent
6fc8317f07
commit
85b0b540f0
|
@ -92,10 +92,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
clusterInfoService.onMaster();
|
||||
|
||||
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
|
||||
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);
|
||||
|
||||
// start with all nodes below the watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));
|
||||
|
||||
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
|
@ -115,8 +115,8 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// move node2 above high watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
|
||||
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
|
||||
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100)));
|
||||
|
||||
logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2));
|
||||
|
||||
|
@ -128,7 +128,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// move all nodes below watermark again
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));
|
||||
|
||||
logger.info("--> waiting for shards to rebalance back onto node [{}]", nodeIds.get(2));
|
||||
|
||||
|
@ -154,10 +154,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
clusterInfoService.onMaster();
|
||||
|
||||
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
|
||||
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);
|
||||
|
||||
// start with all nodes below the low watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100)));
|
||||
|
||||
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
|
@ -184,8 +184,8 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
|
||||
// Move all nodes above the low watermark so no shard movement can occur, and at least one node above the flood stage watermark so
|
||||
// the index is blocked
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
|
||||
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 9));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
|
||||
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 9)));
|
||||
|
||||
assertBusy(() -> assertBlocked(
|
||||
client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"),
|
||||
|
@ -201,7 +201,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
logger.info("--> index is confirmed read-only, releasing disk space");
|
||||
|
||||
// Move all nodes below the high watermark so that the index is unblocked
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));
|
||||
|
||||
// Attempt to create a new document until DiskUsageMonitor unblocks the index
|
||||
assertBusy(() -> {
|
||||
|
@ -230,10 +230,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// shards are 1 byte large
|
||||
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 1L);
|
||||
|
||||
// start with all nodes below the watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L);
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L));
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
|
||||
|
@ -260,10 +260,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
|
||||
|
||||
// node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free:
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
|
||||
discoveryNode.getId().equals(nodeIds.get(2))
|
||||
? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
|
||||
: 1000L);
|
||||
: 1000L));
|
||||
|
||||
clusterInfoService.refresh();
|
||||
|
||||
|
@ -302,13 +302,13 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));
|
||||
|
||||
// shards are 1 byte large
|
||||
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 1L);
|
||||
|
||||
// node 2 only has space for one shard
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
|
||||
discoveryNode.getId().equals(nodeIds.get(2))
|
||||
? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
|
||||
: 1000L);
|
||||
: 1000L));
|
||||
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put("number_of_shards", 6)
|
||||
|
@ -352,10 +352,10 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
|
||||
|
||||
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
|
||||
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);
|
||||
|
||||
// start with all paths below the watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
|
||||
|
@ -381,15 +381,13 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count();
|
||||
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);
|
||||
|
||||
// one of the paths on node0 suddenly exceeds the high watermark
|
||||
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
|
||||
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100));
|
||||
|
||||
// disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
|
||||
|
||||
clusterInfoService.refresh();
|
||||
// one of the paths on node0 suddenly exceeds the high watermark
|
||||
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
|
||||
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)));
|
||||
|
||||
logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark);
|
||||
|
||||
|
|
|
@ -42,15 +42,25 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
|||
public static class TestPlugin extends Plugin {}
|
||||
|
||||
@Nullable // if no fakery should take place
|
||||
public volatile Function<ShardRouting, Long> shardSizeFunction;
|
||||
private volatile Function<ShardRouting, Long> shardSizeFunction;
|
||||
|
||||
@Nullable // if no fakery should take place
|
||||
public volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
|
||||
private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
|
||||
|
||||
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
||||
super(settings, clusterService, threadPool, client);
|
||||
}
|
||||
|
||||
public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction) {
|
||||
this.diskUsageFunction = diskUsageFunction;
|
||||
refresh();
|
||||
}
|
||||
|
||||
public void setShardSizeFunctionAndRefresh(Function<ShardRouting, Long> shardSizeFunction) {
|
||||
this.shardSizeFunction = shardSizeFunction;
|
||||
refresh();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
final ClusterInfo clusterInfo = super.getClusterInfo();
|
||||
|
|
Loading…
Reference in New Issue