Take Shard data path into account in DiskThresholdDecider

The path that a shard is allocated on is not taken into account when
we decide to move a shard away from a node because it passed a watermark.
Even worse we potentially moved away (relocated) a shard that was not even
allocated on that disk but on another on the node in question. This commit
adds a ShardRouting -> dataPath mapping to ClusterInfo that allows to identify
on which disk the shards are allocated on.

Relates to #13106
This commit is contained in:
Simon Willnauer 2015-08-29 10:14:51 +02:00
parent fc840407db
commit a17d7500d3
14 changed files with 245 additions and 100 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth; import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
@ -106,7 +107,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards // only report on fully started shards
shardsStats.add(new ShardStats(indexShard, SHARD_STATS_FLAGS)); shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardPath;
import java.io.IOException; import java.io.IOException;
@ -48,13 +49,13 @@ public class ShardStats implements Streamable, ToXContent {
ShardStats() { ShardStats() {
} }
public ShardStats(IndexShard indexShard, CommonStatsFlags flags) { public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) {
this.shardRouting = indexShard.routingEntry(); this.shardRouting = routing;
this.dataPath = indexShard.shardPath().getRootDataPath().toString(); this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = indexShard.shardPath().getRootStatePath().toString(); this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = indexShard.shardPath().isCustomDataPath(); this.isCustomDataPath = shardPath.isCustomDataPath();
this.commonStats = new CommonStats(indexShard, flags); this.commitStats = commitStats;
this.commitStats = indexShard.commitStats(); this.commonStats = commonStats;
} }
/** /**

View File

@ -162,6 +162,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
flags.set(CommonStatsFlags.Flag.Recovery); flags.set(CommonStatsFlags.Flag.Recovery);
} }
return new ShardStats(indexShard, flags); return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats());
} }
} }

View File

@ -33,12 +33,13 @@ import java.util.Map;
public class ClusterInfo { public class ClusterInfo {
private final Map<String, DiskUsage> leastAvailableSpaceUsage; private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailabeSpaceUsage; private final Map<String, DiskUsage> mostAvailableSpaceUsage;
final Map<String, Long> shardSizes; final Map<String, Long> shardSizes;
public static final ClusterInfo EMPTY = new ClusterInfo(); public static final ClusterInfo EMPTY = new ClusterInfo();
private final Map<ShardRouting, String> routingToDataPath;
protected ClusterInfo() { protected ClusterInfo() {
this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP); this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP);
} }
/** /**
@ -47,12 +48,14 @@ public class ClusterInfo {
* @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node. * @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node.
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node. * @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
* @param shardSizes a shardkey to size in bytes mapping per shard. * @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @see #shardIdentifierFromRouting * @see #shardIdentifierFromRouting
*/ */
public ClusterInfo(final Map<String, DiskUsage> leastAvailableSpaceUsage, final Map<String, DiskUsage> mostAvailableSpaceUsage, final Map<String, Long> shardSizes) { public ClusterInfo(final Map<String, DiskUsage> leastAvailableSpaceUsage, final Map<String, DiskUsage> mostAvailableSpaceUsage, final Map<String, Long> shardSizes, Map<ShardRouting, String> routingToDataPath) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes; this.shardSizes = shardSizes;
this.mostAvailabeSpaceUsage = mostAvailableSpaceUsage; this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
} }
/** /**
@ -66,7 +69,7 @@ public class ClusterInfo {
* Returns a node id to disk usage mapping for the path that has the most available space on the node. * Returns a node id to disk usage mapping for the path that has the most available space on the node.
*/ */
public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() { public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return this.mostAvailabeSpaceUsage; return this.mostAvailableSpaceUsage;
} }
/** /**
@ -76,6 +79,13 @@ public class ClusterInfo {
return shardSizes.get(shardIdentifierFromRouting(shardRouting)); return shardSizes.get(shardIdentifierFromRouting(shardRouting));
} }
/**
* Returns the nodes absolute data-path the given shard is allocated on or <code>null</code> if the information is not available.
*/
public String getDataPath(ShardRouting shardRouting) {
return routingToDataPath.get(shardRouting);
}
/** /**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available. * Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/ */

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -67,6 +68,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages; private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages; private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<ShardRouting, String> shardRoutingToDataPath;
private volatile Map<String, Long> shardSizes; private volatile Map<String, Long> shardSizes;
private volatile boolean isMaster = false; private volatile boolean isMaster = false;
private volatile boolean enabled; private volatile boolean enabled;
@ -85,6 +87,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
super(settings); super(settings);
this.leastAvailableSpaceUsages = Collections.emptyMap(); this.leastAvailableSpaceUsages = Collections.emptyMap();
this.mostAvailableSpaceUsages = Collections.emptyMap(); this.mostAvailableSpaceUsages = Collections.emptyMap();
this.shardRoutingToDataPath = Collections.emptyMap();
this.shardSizes = Collections.emptyMap(); this.shardSizes = Collections.emptyMap();
this.transportNodesStatsAction = transportNodesStatsAction; this.transportNodesStatsAction = transportNodesStatsAction;
this.transportIndicesStatsAction = transportIndicesStatsAction; this.transportIndicesStatsAction = transportIndicesStatsAction;
@ -217,7 +220,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
@Override @Override
public ClusterInfo getClusterInfo() { public ClusterInfo getClusterInfo() {
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes); return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
} }
@Override @Override
@ -350,16 +353,11 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
@Override @Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) { public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards(); ShardStats[] stats = indicesStatsResponse.getShards();
HashMap<String, Long> newShardSizes = new HashMap<>(); final HashMap<String, Long> newShardSizes = new HashMap<>();
for (ShardStats s : stats) { final HashMap<ShardRouting, String> newShardRoutingToDataPath = new HashMap<>();
long size = s.getStats().getStore().sizeInBytes(); buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
newShardSizes.put(sid, size);
}
shardSizes = Collections.unmodifiableMap(newShardSizes); shardSizes = Collections.unmodifiableMap(newShardSizes);
shardRoutingToDataPath = Collections.unmodifiableMap(newShardRoutingToDataPath);
} }
@Override @Override
@ -376,6 +374,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
} }
// we empty the usages list, to be safe - we don't know what's going on. // we empty the usages list, to be safe - we don't know what's going on.
shardSizes = Collections.emptyMap(); shardSizes = Collections.emptyMap();
shardRoutingToDataPath = Collections.emptyMap();
} }
} }
}); });
@ -404,6 +403,18 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
} }
} }
static void buildShardLevelInfo(ESLogger logger, ShardStats[] stats, HashMap<String, Long> newShardSizes, HashMap<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
newShardSizes.put(sid, size);
}
}
static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map<String, DiskUsage> newLeastAvaiableUsages, Map<String, DiskUsage> newMostAvaiableUsages) { static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map<String, DiskUsage> newLeastAvaiableUsages, Map<String, DiskUsage> newMostAvaiableUsages) {
for (NodeStats nodeStats : nodeStatsArray) { for (NodeStats nodeStats : nodeStatsArray) {
if (nodeStats.getFs() == null) { if (nodeStats.getFs() == null) {

View File

@ -313,13 +313,16 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards * of all shards
*/ */
public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway) { public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) {
long totalSize = 0; long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
if (routing.initializing() && routing.relocatingNodeId() != null) { String actualPath = clusterInfo.getDataPath(routing);
totalSize += getShardSize(routing, clusterInfo); if (dataPath.equals(actualPath)) {
} else if (subtractShardsMovingAway && routing.relocating()) { if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize -= getShardSize(routing, clusterInfo); totalSize += getShardSize(routing, clusterInfo);
} else if (subtractShardsMovingAway && routing.relocating()) {
totalSize -= getShardSize(routing, clusterInfo);
}
} }
} }
return totalSize; return totalSize;
@ -450,15 +453,19 @@ public class DiskThresholdDecider extends AllocationDecider {
if (decision != null) { if (decision != null) {
return decision; return decision;
} }
ClusterInfo clusterInfo = allocation.clusterInfo(); final ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages(); final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
DiskUsage usage = getDiskUsage(node, allocation, usages); final DiskUsage usage = getDiskUsage(node, allocation, usages);
final String dataPath = clusterInfo.getDataPath(shardRouting);
// If this node is already above the high threshold, the shard cannot remain (get it off!) // If this node is already above the high threshold, the shard cannot remain (get it off!)
double freeDiskPercentage = usage.getFreeDiskAsPercentage(); final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes(); final long freeBytes = usage.getFreeBytes();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes); logger.debug("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes);
} }
if (dataPath == null || usage.getPath().equals(dataPath) == false) {
return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk");
}
if (freeBytes < freeBytesThresholdHigh.bytes()) { if (freeBytes < freeBytesThresholdHigh.bytes()) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
@ -493,8 +500,8 @@ public class DiskThresholdDecider extends AllocationDecider {
} }
if (includeRelocations) { if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true); long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true, usage.getPath());
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), "_na_", DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage); logger.trace("usage without relocations: {}", usage);

View File

@ -254,7 +254,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
if (indexShard.routingEntry() == null) { if (indexShard.routingEntry() == null) {
continue; continue;
} }
IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard, flags) }); IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()) });
if (!statsByShard.containsKey(indexService.index())) { if (!statsByShard.containsKey(indexService.index())) {
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats)); statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
} else { } else {

View File

@ -31,10 +31,16 @@ import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
@ -153,7 +159,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(TestPlugin.class, return pluginList(TestPlugin.class,
MockTransportService.TestPlugin.class); MockTransportService.TestPlugin.class);
} }
@Test @Test
@ -161,7 +167,9 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
internalCluster().startNodesAsync(2, internalCluster().startNodesAsync(2,
Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "200ms").build()) Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "200ms").build())
.get(); .get();
assertAcked(prepareCreate("test").setSettings(settingsBuilder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0).build())); assertAcked(prepareCreate("test").setSettings(settingsBuilder()
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0)
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE).build()));
ensureGreen("test"); ensureGreen("test");
InternalTestCluster internalTestCluster = internalCluster(); InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the master node // Get the cluster info service on the master node
@ -170,13 +178,18 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
infoService.addListener(listener); infoService.addListener(listener);
ClusterInfo info = listener.get(); ClusterInfo info = listener.get();
assertNotNull("info should not be null", info); assertNotNull("info should not be null", info);
Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages(); final Map<String, DiskUsage> leastUsages = info.getNodeLeastAvailableDiskUsages();
Map<String, Long> shardSizes = info.shardSizes; final Map<String, DiskUsage> mostUsages = info.getNodeMostAvailableDiskUsages();
assertNotNull(usages); final Map<String, Long> shardSizes = info.shardSizes;
assertNotNull(leastUsages);
assertNotNull(shardSizes); assertNotNull(shardSizes);
assertThat("some usages are populated", usages.values().size(), Matchers.equalTo(2)); assertThat("some usages are populated", leastUsages.values().size(), Matchers.equalTo(2));
assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0)); assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0));
for (DiskUsage usage : usages.values()) { for (DiskUsage usage : leastUsages.values()) {
logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
}
for (DiskUsage usage : mostUsages.values()) {
logger.info("--> usage: {}", usage); logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L)); assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
} }
@ -184,6 +197,21 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
logger.info("--> shard size: {}", size); logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThan(0L)); assertThat("shard size is greater than 0", size, greaterThan(0L));
} }
ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getMasterName());
ClusterState state = clusterService.state();
RoutingNodes routingNodes = state.getRoutingNodes();
for (ShardRouting shard : routingNodes.getRoutingTable().allShards()) {
String dataPath = info.getDataPath(shard);
assertNotNull(dataPath);
String nodeId = shard.currentNodeId();
DiscoveryNode discoveryNode = state.getNodes().get(nodeId);
IndicesService indicesService = internalTestCluster.getInstance(IndicesService.class, discoveryNode.getName());
IndexService indexService = indicesService.indexService(shard.index());
IndexShard indexShard = indexService.shard(shard.id());
assertEquals(indexShard.shardPath().getRootDataPath().toString(), dataPath);
}
} }
@Test @Test
@ -238,6 +266,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
// it is likely to update the node disk usage based on the one response that came be from local // it is likely to update the node disk usage based on the one response that came be from local
// node. // node.
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThanOrEqualTo(1)); assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThanOrEqualTo(1));
assertThat(info.getNodeMostAvailableDiskUsages().size(), greaterThanOrEqualTo(1));
// indices is guaranteed to time out on the latch, not updating anything. // indices is guaranteed to time out on the latch, not updating anything.
assertThat(info.shardSizes.size(), greaterThan(1)); assertThat(info.shardSizes.size(), greaterThan(1));
@ -259,6 +288,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
info = listener.get(); info = listener.get();
assertNotNull("info should not be null", info); assertNotNull("info should not be null", info);
assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0)); assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0));
assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(0));
assertThat(info.shardSizes.size(), equalTo(0)); assertThat(info.shardSizes.size(), equalTo(0));
// check we recover // check we recover
@ -268,6 +298,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
info = listener.get(); info = listener.get();
assertNotNull("info should not be null", info); assertNotNull("info should not be null", info);
assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(2)); assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(2));
assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(2));
assertThat(info.shardSizes.size(), greaterThan(0)); assertThat(info.shardSizes.size(), greaterThan(0));
} }

View File

@ -22,12 +22,20 @@ package org.elasticsearch.cluster;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Test; import org.junit.Test;
import java.nio.file.Path;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -87,6 +95,39 @@ public class DiskUsageTests extends ESTestCase {
} }
} }
} }
public void testFillShardLevelInfo() {
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_0, "node1");
ShardRoutingHelper.moveToStarted(test_0);
Path test0Path = createTempDir().resolve("indices").resolve("test").resolve("0");
CommonStats commonStats0 = new CommonStats();
commonStats0.store = new StoreStats(100, 1);
ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_1, "node2");
ShardRoutingHelper.moveToStarted(test_1);
Path test1Path = createTempDir().resolve("indices").resolve("test").resolve("1");
CommonStats commonStats1 = new CommonStats();
commonStats1.store = new StoreStats(1000, 1);
ShardStats[] stats = new ShardStats[] {
new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0 , null),
new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1 , null)
};
HashMap<String, Long> shardSizes = new HashMap<>();
HashMap<ShardRouting, String> routingToPath = new HashMap<>();
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
assertEquals(2, shardSizes.size());
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));
assertEquals(100l, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(test_0)).longValue());
assertEquals(1000l, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(test_1)).longValue());
assertEquals(2, routingToPath.size());
assertTrue(routingToPath.containsKey(test_0));
assertTrue(routingToPath.containsKey(test_1));
assertEquals(test0Path.getParent().getParent().getParent().toAbsolutePath().toString(), routingToPath.get(test_0));
assertEquals(test1Path.getParent().getParent().getParent().toAbsolutePath().toString(), routingToPath.get(test_1));
}
public void testFillDiskUsage() { public void testFillDiskUsage() {
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>(); Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests;
import org.elasticsearch.cluster.routing.allocation.decider.MockDiskUsagesIT; import org.elasticsearch.cluster.routing.allocation.decider.MockDiskUsagesIT;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -63,9 +64,9 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
ClusterService clusterService, ThreadPool threadPool) { ClusterService clusterService, ThreadPool threadPool) {
super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool); super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool);
this.clusterName = ClusterName.clusterNameFromSettings(settings); this.clusterName = ClusterName.clusterNameFromSettings(settings);
stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", "_na_", 100, 100)); stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", "_na_", 100, 100)); stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", "_na_", 100, 100)); stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", "/dev/null", 100, 100));
} }
public void setN1Usage(String nodeName, DiskUsage newUsage) { public void setN1Usage(String nodeName, DiskUsage newUsage) {
@ -92,4 +93,9 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
// Not used, so noop // Not used, so noop
return new CountDownLatch(0); return new CountDownLatch(0);
} }
public ClusterInfo getClusterInfo() {
ClusterInfo clusterInfo = super.getClusterInfo();
return new ClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes, DiskThresholdDeciderTests.DEV_NULL_MAP);
}
} }

View File

@ -41,7 +41,6 @@ import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.util.*; import java.util.*;
@ -66,15 +65,15 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "node1", "_na_", 100, 10)); // 90% used usages.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 35)); // 65% used usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 35)); // 65% used
usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 60)); // 40% used usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", "node4", "_na_", 100, 80)); // 20% used usages.put("node4", new DiskUsage("node4", "node4", "/dev/null", 100, 80)); // 20% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L); shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -259,16 +258,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 10)); // 90% used usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 10)); // 90% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 10)); // 90% used
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 60)); // 40% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", "n4", "_na_", 100, 80)); // 20% used usages.put("node4", new DiskUsage("node4", "n4", "/dev/null", 100, 80)); // 20% used
usages.put("node5", new DiskUsage("node5", "n5", "_na_", 100, 85)); // 15% used usages.put("node5", new DiskUsage("node5", "n5", "/dev/null", 100, 85)); // 15% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L); shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -329,8 +328,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary); logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary);
// Make node without the primary now habitable to replicas // Make node without the primary now habitable to replicas
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "_na_", 100, 35)); // 65% used usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
cis = new ClusterInfoService() { cis = new ClusterInfoService() {
@Override @Override
public ClusterInfo getClusterInfo() { public ClusterInfo getClusterInfo() {
@ -524,12 +523,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "71%").build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "71%").build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 31)); // 69% used usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 31)); // 69% used
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 1)); // 99% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 1)); // 99% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -590,13 +589,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 50)); // 50% used usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 0)); // 100% used usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 0)); // 100% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -661,8 +660,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
DiskUsage node1Usage = decider.averageUsage(rn, usages); DiskUsage node1Usage = decider.averageUsage(rn, usages);
assertThat(node1Usage.getTotalBytes(), equalTo(100L)); assertThat(node1Usage.getTotalBytes(), equalTo(100L));
@ -675,10 +674,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "_na_", 100, 30), 11L); Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 11L);
assertThat(after, equalTo(19.0)); assertThat(after, equalTo(19.0));
} }
@ -691,16 +690,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build(); .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 40)); // 60% used usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 40)); // 60% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used
usages.put("node2", new DiskUsage("node3", "n3", "_na_", 100, 40)); // 60% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 40)); // 60% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 14L); // 14 bytes shardSizes.put("[test][0][p]", 14L); // 14 bytes
shardSizes.put("[test][0][r]", 14L); shardSizes.put("[test][0][r]", 14L);
shardSizes.put("[test2][0][p]", 1L); // 1 bytes shardSizes.put("[test2][0][p]", 1L); // 1 bytes
shardSizes.put("[test2][0][r]", 1L); shardSizes.put("[test2][0][r]", 1L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList( new HashSet<>(Arrays.asList(
@ -797,13 +796,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
Map<String, DiskUsage> usages = new HashMap<>(); Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 20)); // 80% used usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 100)); // 0% used usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 40L); shardSizes.put("[test][0][p]", 40L);
shardSizes.put("[test][1][p]", 40L); shardSizes.put("[test][1][p]", 40L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), DEV_NULL_MAP);
DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
@ -916,4 +915,26 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
rn.shardsWithState(RELOCATING), rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED)); rn.shardsWithState(STARTED));
} }
public static final Map<ShardRouting, String> DEV_NULL_MAP = Collections.unmodifiableMap(new StaticValueMap("/dev/null"));
// a test only map that always returns the same value no matter what key is passed
private static final class StaticValueMap extends AbstractMap<ShardRouting, String> {
private final String value;
private StaticValueMap(String value) {
this.value = value;
}
@Override
public String get(Object key) {
return value;
}
@Override
public Set<Entry<ShardRouting, String>> entrySet() {
throw new UnsupportedOperationException("this is a test-only map that only supports #get(Object key)");
}
}
} }

View File

@ -124,18 +124,17 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes)); final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes), Collections.EMPTY_MAP);
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation)); assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
} }
public void testCanRemainUsesLeastAvailableSpace() { public void testCanRemainUsesLeastAvailableSpace() {
NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY); NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY);
ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
Map<ShardRouting, String> shardRoutingMap = new HashMap<>();
DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT); DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT);
DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT); DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT);
@ -143,11 +142,12 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_0, node_0.getId()); ShardRoutingHelper.initialize(test_0, node_0.getId());
ShardRoutingHelper.moveToStarted(test_0); ShardRoutingHelper.moveToStarted(test_0);
shardRoutingMap.put(test_0, "/node0/least");
ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_1, node_1.getId()); ShardRoutingHelper.initialize(test_1, node_1.getId());
ShardRoutingHelper.moveToStarted(test_1); ShardRoutingHelper.moveToStarted(test_1);
shardRoutingMap.put(test_1, "/node1/least");
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -167,17 +167,19 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
// actual test -- after all that bloat :) // actual test -- after all that bloat :)
Map<String, DiskUsage> leastAvailableUsages = new HashMap<>(); Map<String, DiskUsage> leastAvailableUsages = new HashMap<>();
leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 10)); // 90% used leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "/node0/least", 100, 10)); // 90% used
leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 9)); // 91% used leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "/node1/least", 100, 9)); // 91% used
Map<String, DiskUsage> mostAvailableUsage = new HashMap<>(); Map<String, DiskUsage> mostAvailableUsage = new HashMap<>();
mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 90)); // 10% used mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "/node0/most", 100, 90)); // 10% used
mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 90)); // 10% used mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "/node1/most", 100, 90)); // 10% used
Map<String, Long> shardSizes = new HashMap<>(); Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][1][p]", 10L); shardSizes.put("[test][1][p]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes)); shardSizes.put("[test][2][p]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes), shardRoutingMap);
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation)); assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
@ -193,6 +195,17 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
// not allocated on that node // not allocated on that node
} }
ShardRouting test_2 = ShardRouting.newUnassigned("test", 2, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_2, node_1.getId());
ShardRoutingHelper.moveToStarted(test_2);
shardRoutingMap.put(test_2, "/node1/most");
assertEquals("can stay since allocated on a different path with enough space", Decision.YES, decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation));
ShardRouting test_3 = ShardRouting.newUnassigned("test", 3, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_3, node_1.getId());
ShardRoutingHelper.moveToStarted(test_3);
assertEquals("can stay since we don't have information about this shard", Decision.YES, decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation));
} }
@ -202,7 +215,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
shardSizes.put("[test][1][r]", 100L); shardSizes.put("[test][1][r]", 100L);
shardSizes.put("[test][2][r]", 1000L); shardSizes.put("[test][2][r]", 1000L);
shardSizes.put("[other][0][p]", 10000L); shardSizes.put("[other][0][p]", 10000L);
ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP, shardSizes); ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP, shardSizes, DiskThresholdDeciderTests.DEV_NULL_MAP);
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_0, "node1"); ShardRoutingHelper.initialize(test_0, "node1");
ShardRoutingHelper.moveToStarted(test_0); ShardRoutingHelper.moveToStarted(test_0);
@ -222,8 +235,10 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
assertEquals(10l, DiskThresholdDecider.getShardSize(test_0, info)); assertEquals(10l, DiskThresholdDecider.getShardSize(test_0, info));
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2)); RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2));
assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
assertEquals(0l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/some/other/dev"));
assertEquals(0l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/some/other/dev"));
ShardRouting test_3 = ShardRouting.newUnassigned("test", 3, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); ShardRouting test_3 = ShardRouting.newUnassigned("test", 3, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_3, "node1"); ShardRoutingHelper.initialize(test_3, "node1");
@ -239,11 +254,11 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard())); node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard()));
if (other_0.primary()) { if (other_0.primary()) {
assertEquals(10100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); assertEquals(10100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(10090l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); assertEquals(10090l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
} else { } else {
assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false)); assertEquals(100l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true)); assertEquals(90l, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
} }
} }

View File

@ -82,9 +82,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
// Start with all nodes at 50% usage // Start with all nodes at 50% usage
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50));
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50));
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder() client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, randomFrom("20b", "80%")) .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, randomFrom("20b", "80%"))
@ -172,7 +172,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
/** Create a fake NodeStats for the given node and usage */ /** Create a fake NodeStats for the given node and usage */
public static NodeStats makeStats(String nodeName, DiskUsage usage) { public static NodeStats makeStats(String nodeName, DiskUsage usage) {
FsInfo.Path[] paths = new FsInfo.Path[1]; FsInfo.Path[] paths = new FsInfo.Path[1];
FsInfo.Path path = new FsInfo.Path("/path.data", null, FsInfo.Path path = new FsInfo.Path("/dev/null", null,
usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes()); usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes());
paths[0] = path; paths[0] = path;
FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), paths); FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), paths);

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
@ -559,7 +560,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test"); IndexService test = indicesService.indexService("test");
IndexShard shard = test.shard(0); IndexShard shard = test.shard(0);
ShardStats stats = new ShardStats(shard, new CommonStatsFlags()); ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), shard.commitStats());
assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath());
assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath());
assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath());