Avoid overshooting watermarks during relocation (#46079)

Today the `DiskThresholdDecider` attempts to account for already-relocating
shards when deciding how to allocate or relocate a shard. Its goal is to stop
relocating shards onto a node before that node exceeds the low watermark, and
to stop relocating shards away from a node as soon as the node drops below the
high watermark.

The decider handles multiple data paths by only accounting for relocating
shards that affect the appropriate data path. However, this mechanism does not
correctly account for _new_ relocating shards, which are unwittingly ignored.
This means that we may evict far too many shards from a node above the high
watermark, and may relocate far too many shards onto a node causing it to blow
right past the low watermark and potentially other watermarks too.

There are in fact two distinct issues that this PR fixes. New incoming shards
have an unknown data path until the `ClusterInfoService` refreshes its
statistics. New outgoing shards have a known data path, but we fail to account
for the change of the corresponding `ShardRouting` from `STARTED` to
`RELOCATING`, meaning that we fail to find the correct data path and treat the
path as unknown here too.

This PR also reworks the `MockDiskUsagesIT` test to avoid using fake data paths
for all shards. With the changes here, the data paths are handled in tests as
they are in production, except that their sizes are fake.

Fixes #45177
This commit is contained in:
David Turner 2019-08-29 12:39:28 +01:00
parent b526309fbd
commit d340530a47
6 changed files with 430 additions and 221 deletions

View File

@ -47,9 +47,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -88,7 +87,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
@ -275,6 +274,11 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
}
}
// allow tests to adjust the node stats on receipt
List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
return nodeStats;
}
/**
* Refreshes the ClusterInfo in a blocking fashion
*/
@ -284,12 +288,13 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
}
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
public void onResponse(NodesStatsResponse nodesStatsResponse) {
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()),
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
}
@Override
@ -402,7 +407,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
if (leastAvailablePath == null) {
assert mostAvailablePath == null;
mostAvailablePath = leastAvailablePath = info;
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
leastAvailablePath = info;
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
mostAvailablePath = info;

View File

@ -90,16 +90,36 @@ public class DiskThresholdDecider extends AllocationDecider {
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (dataPath.equals(actualPath)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getExpectedShardSize(routing, allocation, 0);
} else if (subtractShardsMovingAway && routing.relocating()) {
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
continue;
}
final String actualPath = clusterInfo.getDataPath(routing);
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
}
}
if (subtractShardsMovingAway) {
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (actualPath == null) {
// we might know the path of this shard from before when it was relocating
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
}
}
}
return totalSize;
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -1002,4 +1001,20 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED));
}
/**
* ClusterInfo that always reports /dev/null for the shards' data paths.
*/
static class DevNullClusterInfo extends ClusterInfo {
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
}
@Override
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
}
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;

View File

@ -16,191 +16,179 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MockDiskUsagesIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
// Use the mock internal cluster info service, which has fake-able disk usages
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
}
@Override
public Settings indexSettings() {
// ensure that indices do not use custom data paths
return Settings.builder().put(super.indexSettings()).putNull(IndexMetaData.SETTING_DATA_PATH).build();
}
private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
}
public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3);
// Start with all nodes at 50% usage
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
cis.setUpdateFrequency(TimeValue.timeValueMillis(200));
cis.onMaster();
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50));
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50));
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50));
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "20b" : "80%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
watermarkBytes ? "0b" : "100%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get();
// Create an index with 10 shards so we can check allocation for it
prepareCreate("test").setSettings(Settings.builder()
.put("number_of_shards", 10)
.put("number_of_replicas", 0)).get();
ensureGreen("test");
// Block until the "fake" cluster info is retrieved at least once
assertBusy(() -> {
final ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
});
final List<String> realNodeNames = new ArrayList<>();
{
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
realNodeNames.add(node.nodeId());
logger.info("--> node {} has {} shards",
node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}
// Update the disk usages so one node has now passed the high watermark
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 0)); // nothing free on node3
final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());
logger.info("--> waiting for shards to relocate off node [{}]", realNodeNames.get(2));
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
clusterInfoService.onMaster();
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
// start with all nodes below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), watermarkBytes ? "0b" : "100%")
.put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")));
// Create an index with 10 shards so we can check allocation for it
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 10).put("number_of_replicas", 0)));
ensureGreen("test");
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final Map<String, Integer> nodesToShardCount = new HashMap<>();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info("--> node {} has {} shards",
node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3));
assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3));
});
// Update the disk usages so one node is now back under the high watermark
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); // node3 has free space now
// move node2 above high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100));
logger.info("--> waiting for shards to rebalance back onto node [{}]", realNodeNames.get(2));
logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2));
assertBusy(() -> {
final Map<String, Integer> nodesToShardCount = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info("--> node {} has {} shards",
node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(5));
assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(5));
assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0));
});
// move all nodes below watermark again
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
logger.info("--> waiting for shards to rebalance back onto node [{}]", nodeIds.get(2));
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3));
assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3));
});
}
public void testAutomaticReleaseOfIndexBlock() throws Exception {
List<String> nodes = internalCluster().startNodes(3);
for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}
// Wait for all 3 nodes to be up
assertBusy(() -> {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
});
final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());
// Start with all nodes at 50% usage
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
cis.setUpdateFrequency(TimeValue.timeValueMillis(100));
cis.onMaster();
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50));
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50));
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50));
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
clusterInfoService.onMaster();
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
// start with all nodes below the low watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100));
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "15b" : "85%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
watermarkBytes ? "5b" : "95%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms")).get();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), watermarkBytes ? "5b" : "95%")
.put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms")));
// Create an index with 6 shards so we can check allocation for it
prepareCreate("test").setSettings(Settings.builder()
.put("number_of_shards", 6)
.put("number_of_replicas", 0)).get();
ensureGreen("test");
// Block until the "fake" cluster info is retrieved at least once
assertBusy(() -> {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
});
final List<String> realNodeNames = new ArrayList<>();
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
for (RoutingNode node : resp.getState().getRoutingNodes()) {
realNodeNames.add(node.nodeId());
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
{
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2));
}
client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertSearchHits(client().prepareSearch().get(), "1");
client().prepareIndex("test", "doc", "1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertSearchHits(client().prepareSearch("test").get(), "1");
// Block all nodes so that re-balancing does not occur (BalancedShardsAllocator)
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 3));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 3));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 3));
// Move all nodes above the low watermark so no shard movement can occur, and at least one node above the flood stage watermark so
// the index is blocked
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 14));
// Wait until index "test" is blocked
assertBusy(() -> assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"),
assertBusy(() -> assertBlocked(
client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"),
IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
assertFalse(client().admin().cluster().prepareHealth("test").setWaitForEvents(Priority.LANGUID).get().isTimedOut());
@ -208,23 +196,234 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
// Cannot add further documents
assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"),
IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
assertSearchHits(client().prepareSearch().get(), "1");
assertSearchHits(client().prepareSearch("test").get(), "1");
// Update the disk usages so all nodes are back under the high and flood watermarks
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 11));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 11));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 11));
logger.info("--> index is confirmed read-only, releasing disk space");
// Move all nodes below the high watermark so that the index is unblocked
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
// Attempt to create a new document until DiskUsageMonitor unblocks the index
assertBusy(() -> {
try {
client().prepareIndex("test", "doc", "3").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
client().prepareIndex("test", "doc", "3").setSource("foo", "bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
} catch (ClusterBlockException e) {
throw new AssertionError("retrying", e);
}
});
assertSearchHits(client().prepareSearch().get(), "1", "3");
assertSearchHits(client().prepareSearch("test").get(), "1", "3");
}
public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception {
for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
final AtomicReference<ClusterState> masterAppliedClusterState = new AtomicReference<>();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
masterAppliedClusterState.set(event.state());
clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state
});
// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
// start with all nodes below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")
.put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")));
final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
ensureGreen("test");
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2));
});
// disable rebalancing, or else we might move too many shards away and then rebalance them back again
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
// node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free:
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
discoveryNode.getId().equals(nodeIds.get(2))
? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
: 1000L);
clusterInfoService.refresh();
logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2));
// must wait for relocation to start
assertBusy(() -> assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1)));
// ensure that relocations finished without moving any more shards
ensureGreen("test");
assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1));
}
public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
for (int i = 0; i < 3; i++) {
// ensure that each node has a single data path
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
}
final AtomicReference<ClusterState> masterAppliedClusterState = new AtomicReference<>();
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1));
masterAppliedClusterState.set(event.state());
clusterInfoService.refresh(); // so that a subsequent reroute sees disk usage according to the current state
});
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "85%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "100%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));
// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
// node 2 only has space for one shard
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
discoveryNode.getId().equals(nodeIds.get(2))
? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
: 1000L);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("number_of_shards", 6)
.put("number_of_replicas", 0)
.put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey(), nodeIds.get(2))));
ensureGreen("test");
assertBusy(() -> {
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 3 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(3));
assertThat("node1 has 3 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(3));
assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0));
});
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.putNull(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_id").getKey())));
logger.info("--> waiting for shards to relocate onto node [{}]", nodeIds.get(2));
ensureGreen("test");
assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1));
}
public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception {
// start one node with two data paths
final Path pathOverWatermark = createTempDir();
final Settings.Builder twoPathSettings = Settings.builder();
if (randomBoolean()) {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString(), pathOverWatermark.toString());
} else {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), pathOverWatermark.toString(), createTempDir().toString());
}
internalCluster().startNode(twoPathSettings);
final String nodeWithTwoPaths = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0).getNode().getId();
// other two nodes have one data path each
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
// start with all paths below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));
final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
ensureGreen("test");
{
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2));
}
final long shardsOnGoodPath = Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count();
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);
// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100));
// disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
clusterInfoService.refresh();
logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark);
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}
});
ensureGreen("test");
for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}
assertThat("should not have moved any shards off of the path that wasn't too full",
Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(), equalTo(shardsOnGoodPath));
}
private Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info("----> node {} has {} shards",
node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
return shardCountByNodeId;
}
private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
}
}

View File

@ -18,110 +18,80 @@
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
/**
* Fake ClusterInfoService class that allows updating the nodes stats disk
* usage with fake values
*/
public class MockInternalClusterInfoService extends InternalClusterInfoService {
/** This is a marker plugin used to trigger MockNode to use this mock info service. */
public static class TestPlugin extends Plugin {}
private final ClusterName clusterName;
private volatile NodeStats[] stats = new NodeStats[3];
@Nullable // if no fakery should take place
public volatile Function<ShardRouting, Long> shardSizeFunction;
/** Create a fake NodeStats for the given node and usage */
public static NodeStats makeStats(String nodeName, DiskUsage usage) {
FsInfo.Path[] paths = new FsInfo.Path[1];
FsInfo.Path path = new FsInfo.Path("/dev/null", null,
usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes());
paths[0] = path;
FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), null, paths);
return new NodeStats(
new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
System.currentTimeMillis(),
null, null, null, null, null,
fsInfo,
null, null, null,
null, null, null, null);
}
@Nullable // if no fakery should take place
public volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
stats[2] = makeStats("node_t3", new DiskUsage("node_t3", "n3", "/dev/null", 100, 100));
}
public void setN1Usage(String nodeName, DiskUsage newUsage) {
stats[0] = makeStats(nodeName, newUsage);
}
public void setN2Usage(String nodeName, DiskUsage newUsage) {
stats[1] = makeStats(nodeName, newUsage);
}
public void setN3Usage(String nodeName, DiskUsage newUsage) {
stats[2] = makeStats(nodeName, newUsage);
}
@Override
public CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
NodesStatsResponse response = new NodesStatsResponse(clusterName, Arrays.asList(stats), Collections.emptyList());
listener.onResponse(response);
return new CountDownLatch(0);
}
@Override
public CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
// Not used, so noop
return new CountDownLatch(0);
}
@Override
public ClusterInfo getClusterInfo() {
ClusterInfo clusterInfo = super.getClusterInfo();
return new DevNullClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(),
clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes);
final ClusterInfo clusterInfo = super.getClusterInfo();
return new SizeFakingClusterInfo(clusterInfo);
}
/**
* ClusterInfo that always points to DevNull.
*/
public static class DevNullClusterInfo extends ClusterInfo {
public DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
@Override
List<NodeStats> adjustNodesStats(List<NodeStats> nodesStats) {
final BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction = this.diskUsageFunction;
if (diskUsageFunction == null) {
return nodesStats;
}
return nodesStats.stream().map(nodeStats -> {
final DiscoveryNode discoveryNode = nodeStats.getNode();
final FsInfo oldFsInfo = nodeStats.getFs();
return new NodeStats(discoveryNode, nodeStats.getTimestamp(), nodeStats.getIndices(), nodeStats.getOs(),
nodeStats.getProcess(), nodeStats.getJvm(), nodeStats.getThreadPool(), new FsInfo(oldFsInfo.getTimestamp(),
oldFsInfo.getIoStats(),
StreamSupport.stream(oldFsInfo.spliterator(), false)
.map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath))
.toArray(FsInfo.Path[]::new)), nodeStats.getTransport(),
nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(),
nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats());
}).collect(Collectors.toList());
}
class SizeFakingClusterInfo extends ClusterInfo {
SizeFakingClusterInfo(ClusterInfo delegate) {
super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(),
delegate.shardSizes, delegate.routingToDataPath);
}
@Override
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
public Long getShardSize(ShardRouting shardRouting) {
final Function<ShardRouting, Long> shardSizeFunction = MockInternalClusterInfoService.this.shardSizeFunction;
if (shardSizeFunction == null) {
return super.getShardSize(shardRouting);
}
return shardSizeFunction.apply(shardRouting);
}
}