Expand ClusterInfo to provide min / max disk usage forn allocation decider
Today we sum up the disk usage for the allocation decider which is broken since we don't stripe across multiple data paths. Each shard has it's own private path now but the allocation deciders still treat all paths as one big disk. This commit adds allows allocation deciders to access the least used and most used path to make better allocation decidsions upon canRemain and canAllocate calls. Yet, this commit doesn't fix all the issues since we still can't tell which shard can remain and which can't. This problem is out of scope in this commit and will be solved in a followup commit. Relates to #13106
This commit is contained in:
parent
35f9ee7a62
commit
0c71328186
|
@ -32,28 +32,53 @@ import java.util.Map;
|
|||
*/
|
||||
public class ClusterInfo {
|
||||
|
||||
private final Map<String, DiskUsage> usages;
|
||||
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
|
||||
private final Map<String, DiskUsage> mostAvailabeSpaceUsage;
|
||||
final Map<String, Long> shardSizes;
|
||||
public static final ClusterInfo EMPTY = new ClusterInfo();
|
||||
|
||||
private ClusterInfo() {
|
||||
this.usages = Collections.emptyMap();
|
||||
this.shardSizes = Collections.emptyMap();
|
||||
protected ClusterInfo() {
|
||||
this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP);
|
||||
}
|
||||
|
||||
public ClusterInfo(Map<String, DiskUsage> usages, Map<String, Long> shardSizes) {
|
||||
this.usages = usages;
|
||||
/**
|
||||
* Creates a new ClusterInfo instance.
|
||||
*
|
||||
* @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node.
|
||||
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
|
||||
* @param shardSizes a shardkey to size in bytes mapping per shard.
|
||||
* @see #shardIdentifierFromRouting
|
||||
*/
|
||||
public ClusterInfo(final Map<String, DiskUsage> leastAvailableSpaceUsage, final Map<String, DiskUsage> mostAvailableSpaceUsage, final Map<String, Long> shardSizes) {
|
||||
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
|
||||
this.shardSizes = shardSizes;
|
||||
this.mostAvailabeSpaceUsage = mostAvailableSpaceUsage;
|
||||
}
|
||||
|
||||
public Map<String, DiskUsage> getNodeDiskUsages() {
|
||||
return this.usages;
|
||||
/**
|
||||
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
|
||||
*/
|
||||
public Map<String, DiskUsage> getNodeLeastAvailableDiskUsages() {
|
||||
return this.leastAvailableSpaceUsage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a node id to disk usage mapping for the path that has the most available space on the node.
|
||||
*/
|
||||
public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
|
||||
return this.mostAvailabeSpaceUsage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
|
||||
*/
|
||||
public Long getShardSize(ShardRouting shardRouting) {
|
||||
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
|
||||
*/
|
||||
public long getShardSize(ShardRouting shardRouting, long defaultValue) {
|
||||
Long shardSize = getShardSize(shardRouting);
|
||||
return shardSize == null ? defaultValue : shardSize;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
public class DiskUsage {
|
||||
final String nodeId;
|
||||
final String nodeName;
|
||||
final String path;
|
||||
final long totalBytes;
|
||||
final long freeBytes;
|
||||
|
||||
|
@ -35,11 +36,12 @@ public class DiskUsage {
|
|||
* Create a new DiskUsage, if {@code totalBytes} is 0, {@get getFreeDiskAsPercentage}
|
||||
* will always return 100.0% free
|
||||
*/
|
||||
public DiskUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) {
|
||||
public DiskUsage(String nodeId, String nodeName, String path, long totalBytes, long freeBytes) {
|
||||
this.nodeId = nodeId;
|
||||
this.nodeName = nodeName;
|
||||
this.freeBytes = freeBytes;
|
||||
this.totalBytes = totalBytes;
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
|
@ -50,6 +52,10 @@ public class DiskUsage {
|
|||
return nodeName;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public double getFreeDiskAsPercentage() {
|
||||
// We return 100.0% in order to fail "open", in that if we have invalid
|
||||
// numbers for the total bytes, it's as if we don't know disk usage.
|
||||
|
@ -77,7 +83,7 @@ public class DiskUsage {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + nodeId + "][" + nodeName + "] free: " + new ByteSizeValue(getFreeBytes()) +
|
||||
return "[" + nodeId + "][" + nodeName + "][" + path + "] free: " + new ByteSizeValue(getFreeBytes()) +
|
||||
"[" + Strings.format1Decimals(getFreeDiskAsPercentage(), "%") + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
|
@ -35,9 +34,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
@ -47,7 +46,6 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* InternalClusterInfoService provides the ClusterInfoService interface,
|
||||
|
@ -67,7 +65,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
|
||||
private volatile TimeValue updateFrequency;
|
||||
|
||||
private volatile Map<String, DiskUsage> usages;
|
||||
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
|
||||
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
|
||||
private volatile Map<String, Long> shardSizes;
|
||||
private volatile boolean isMaster = false;
|
||||
private volatile boolean enabled;
|
||||
|
@ -84,7 +83,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
|
||||
ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.usages = Collections.emptyMap();
|
||||
this.leastAvailableSpaceUsages = Collections.emptyMap();
|
||||
this.mostAvailableSpaceUsages = Collections.emptyMap();
|
||||
this.shardSizes = Collections.emptyMap();
|
||||
this.transportNodesStatsAction = transportNodesStatsAction;
|
||||
this.transportIndicesStatsAction = transportIndicesStatsAction;
|
||||
|
@ -200,9 +200,16 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing node from cluster info: {}", removedNode.getId());
|
||||
}
|
||||
Map<String, DiskUsage> newUsages = new HashMap<>(usages);
|
||||
newUsages.remove(removedNode.getId());
|
||||
usages = Collections.unmodifiableMap(newUsages);
|
||||
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
Map<String, DiskUsage> newMaxUsages = new HashMap<>(leastAvailableSpaceUsages);
|
||||
newMaxUsages.remove(removedNode.getId());
|
||||
leastAvailableSpaceUsages = Collections.unmodifiableMap(newMaxUsages);
|
||||
}
|
||||
if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
Map<String, DiskUsage> newMinUsages = new HashMap<>(mostAvailableSpaceUsages);
|
||||
newMinUsages.remove(removedNode.getId());
|
||||
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMinUsages);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -210,7 +217,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return new ClusterInfo(usages, shardSizes);
|
||||
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -313,27 +320,11 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
|
||||
@Override
|
||||
public void onResponse(NodesStatsResponse nodeStatses) {
|
||||
Map<String, DiskUsage> newUsages = new HashMap<>();
|
||||
for (NodeStats nodeStats : nodeStatses.getNodes()) {
|
||||
if (nodeStats.getFs() == null) {
|
||||
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());
|
||||
} else {
|
||||
long available = 0;
|
||||
long total = 0;
|
||||
|
||||
for (FsInfo.Path info : nodeStats.getFs()) {
|
||||
available += info.getAvailable().bytes();
|
||||
total += info.getTotal().bytes();
|
||||
}
|
||||
String nodeId = nodeStats.getNode().id();
|
||||
String nodeName = nodeStats.getNode().getName();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("node: [{}], total disk: {}, available disk: {}", nodeId, total, available);
|
||||
}
|
||||
newUsages.put(nodeId, new DiskUsage(nodeId, nodeName, total, available));
|
||||
}
|
||||
}
|
||||
usages = Collections.unmodifiableMap(newUsages);
|
||||
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();
|
||||
Map<String, DiskUsage> newMostAvaiableUsages = new HashMap<>();
|
||||
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
|
||||
leastAvailableSpaceUsages = Collections.unmodifiableMap(newLeastAvaiableUsages);
|
||||
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMostAvaiableUsages);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -349,7 +340,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
|
||||
}
|
||||
// we empty the usages list, to be safe - we don't know what's going on.
|
||||
usages = Collections.emptyMap();
|
||||
leastAvailableSpaceUsages = Collections.emptyMap();
|
||||
mostAvailableSpaceUsages = Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -412,5 +404,34 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|||
}
|
||||
}
|
||||
|
||||
static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map<String, DiskUsage> newLeastAvaiableUsages, Map<String, DiskUsage> newMostAvaiableUsages) {
|
||||
for (NodeStats nodeStats : nodeStatsArray) {
|
||||
if (nodeStats.getFs() == null) {
|
||||
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());
|
||||
} else {
|
||||
FsInfo.Path leastAvailablePath = null;
|
||||
FsInfo.Path mostAvailablePath = null;
|
||||
for (FsInfo.Path info : nodeStats.getFs()) {
|
||||
if (leastAvailablePath == null) {
|
||||
assert mostAvailablePath == null;
|
||||
mostAvailablePath = leastAvailablePath = info;
|
||||
} else if (leastAvailablePath.getAvailable().bytes() > info.getAvailable().bytes()){
|
||||
leastAvailablePath = info;
|
||||
} else if (mostAvailablePath.getAvailable().bytes() < info.getAvailable().bytes()) {
|
||||
mostAvailablePath = info;
|
||||
}
|
||||
}
|
||||
String nodeId = nodeStats.getNode().id();
|
||||
String nodeName = nodeStats.getNode().getName();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("node: [{}], most available: total disk: {}, available disk: {} / least available: total disk: {}, available disk: {}", nodeId, mostAvailablePath.getTotal(), leastAvailablePath.getAvailable(), leastAvailablePath.getTotal(), leastAvailablePath.getAvailable());
|
||||
}
|
||||
newLeastAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(), leastAvailablePath.getTotal().bytes(), leastAvailablePath.getAvailable().bytes()));
|
||||
newMostAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(), mostAvailablePath.getTotal().bytes(), mostAvailablePath.getAvailable().bytes()));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
|
||||
@Override
|
||||
public void onNewInfo(ClusterInfo info) {
|
||||
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
|
||||
Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
|
||||
if (usages != null) {
|
||||
boolean reroute = false;
|
||||
String explanation = "";
|
||||
|
@ -339,7 +339,9 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
|
||||
final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow;
|
||||
final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh;
|
||||
DiskUsage usage = getDiskUsage(node, allocation);
|
||||
ClusterInfo clusterInfo = allocation.clusterInfo();
|
||||
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
|
||||
DiskUsage usage = getDiskUsage(node, allocation, usages);
|
||||
// First, check that the node currently over the low watermark
|
||||
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
||||
// Cache the used disk percentage for displaying disk percentages consistent with documentation
|
||||
|
@ -441,11 +443,16 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
|
||||
@Override
|
||||
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
|
||||
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
|
||||
}
|
||||
final Decision decision = earlyTerminate(allocation);
|
||||
if (decision != null) {
|
||||
return decision;
|
||||
}
|
||||
DiskUsage usage = getDiskUsage(node, allocation);
|
||||
ClusterInfo clusterInfo = allocation.clusterInfo();
|
||||
Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
|
||||
DiskUsage usage = getDiskUsage(node, allocation, usages);
|
||||
// If this node is already above the high threshold, the shard cannot remain (get it off!)
|
||||
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
||||
long freeBytes = usage.getFreeBytes();
|
||||
|
@ -472,9 +479,8 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
|
||||
}
|
||||
|
||||
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) {
|
||||
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, Map<String, DiskUsage> usages) {
|
||||
ClusterInfo clusterInfo = allocation.clusterInfo();
|
||||
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
|
||||
DiskUsage usage = usages.get(node.nodeId());
|
||||
if (usage == null) {
|
||||
// If there is no usage, and we have other nodes in the cluster,
|
||||
|
@ -488,7 +494,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
|
||||
if (includeRelocations) {
|
||||
long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true);
|
||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
|
||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), "_na_",
|
||||
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("usage without relocations: {}", usage);
|
||||
|
@ -508,7 +514,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
*/
|
||||
public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
|
||||
if (usages.size() == 0) {
|
||||
return new DiskUsage(node.nodeId(), node.node().name(), 0, 0);
|
||||
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", 0, 0);
|
||||
}
|
||||
long totalBytes = 0;
|
||||
long freeBytes = 0;
|
||||
|
@ -516,7 +522,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
totalBytes += du.getTotalBytes();
|
||||
freeBytes += du.getFreeBytes();
|
||||
}
|
||||
return new DiskUsage(node.nodeId(), node.node().name(), totalBytes / usages.size(), freeBytes / usages.size());
|
||||
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -528,7 +534,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
*/
|
||||
public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
|
||||
shardSize = (shardSize == null) ? 0 : shardSize;
|
||||
DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(),
|
||||
DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(),
|
||||
usage.getTotalBytes(), usage.getFreeBytes() - shardSize);
|
||||
return newUsage.getFreeDiskAsPercentage();
|
||||
}
|
||||
|
@ -600,7 +606,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
|
||||
}
|
||||
|
||||
final Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
|
||||
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
|
||||
// Fail open if there are no disk usages available
|
||||
if (usages.isEmpty()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
|
|
@ -164,7 +164,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
infoService.addListener(listener);
|
||||
ClusterInfo info = listener.get();
|
||||
assertNotNull("info should not be null", info);
|
||||
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
|
||||
Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
|
||||
Map<String, Long> shardSizes = info.shardSizes;
|
||||
assertNotNull(usages);
|
||||
assertNotNull(shardSizes);
|
||||
|
@ -197,7 +197,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
infoService.updateOnce();
|
||||
ClusterInfo info = listener.get();
|
||||
assertNotNull("failed to collect info", info);
|
||||
assertThat("some usages are populated", info.getNodeDiskUsages().size(), Matchers.equalTo(2));
|
||||
assertThat("some usages are populated", info.getNodeLeastAvailableDiskUsages().size(), Matchers.equalTo(2));
|
||||
assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0));
|
||||
|
||||
|
||||
|
@ -231,7 +231,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
// node info will time out both on the request level on the count down latch. this means
|
||||
// it is likely to update the node disk usage based on the one response that came be from local
|
||||
// node.
|
||||
assertThat(info.getNodeDiskUsages().size(), greaterThanOrEqualTo(1));
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThanOrEqualTo(1));
|
||||
// indices is guaranteed to time out on the latch, not updating anything.
|
||||
assertThat(info.shardSizes.size(), greaterThan(1));
|
||||
|
||||
|
@ -252,7 +252,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
infoService.updateOnce();
|
||||
info = listener.get();
|
||||
assertNotNull("info should not be null", info);
|
||||
assertThat(info.getNodeDiskUsages().size(), equalTo(0));
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0));
|
||||
assertThat(info.shardSizes.size(), equalTo(0));
|
||||
|
||||
// check we recover
|
||||
|
@ -261,7 +261,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
|||
infoService.updateOnce();
|
||||
info = listener.get();
|
||||
assertNotNull("info should not be null", info);
|
||||
assertThat(info.getNodeDiskUsages().size(), equalTo(2));
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(2));
|
||||
assertThat(info.shardSizes.size(), greaterThan(0));
|
||||
|
||||
}
|
||||
|
|
|
@ -19,16 +19,25 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DiskUsageTests extends ESTestCase {
|
||||
|
||||
@Test
|
||||
public void diskUsageCalcTest() {
|
||||
DiskUsage du = new DiskUsage("node1", "n1", 100, 40);
|
||||
DiskUsage du = new DiskUsage("node1", "n1", "random", 100, 40);
|
||||
assertThat(du.getFreeDiskAsPercentage(), equalTo(40.0));
|
||||
assertThat(du.getUsedDiskAsPercentage(), equalTo(100.0 - 40.0));
|
||||
assertThat(du.getFreeBytes(), equalTo(40L));
|
||||
|
@ -37,19 +46,19 @@ public class DiskUsageTests extends ESTestCase {
|
|||
|
||||
// Test that DiskUsage handles invalid numbers, as reported by some
|
||||
// filesystems (ZFS & NTFS)
|
||||
DiskUsage du2 = new DiskUsage("node1", "n1", 100, 101);
|
||||
DiskUsage du2 = new DiskUsage("node1", "n1","random", 100, 101);
|
||||
assertThat(du2.getFreeDiskAsPercentage(), equalTo(101.0));
|
||||
assertThat(du2.getFreeBytes(), equalTo(101L));
|
||||
assertThat(du2.getUsedBytes(), equalTo(-1L));
|
||||
assertThat(du2.getTotalBytes(), equalTo(100L));
|
||||
|
||||
DiskUsage du3 = new DiskUsage("node1", "n1", -1, -1);
|
||||
DiskUsage du3 = new DiskUsage("node1", "n1", "random",-1, -1);
|
||||
assertThat(du3.getFreeDiskAsPercentage(), equalTo(100.0));
|
||||
assertThat(du3.getFreeBytes(), equalTo(-1L));
|
||||
assertThat(du3.getUsedBytes(), equalTo(0L));
|
||||
assertThat(du3.getTotalBytes(), equalTo(-1L));
|
||||
|
||||
DiskUsage du4 = new DiskUsage("node1", "n1", 0, 0);
|
||||
DiskUsage du4 = new DiskUsage("node1", "n1","random", 0, 0);
|
||||
assertThat(du4.getFreeDiskAsPercentage(), equalTo(100.0));
|
||||
assertThat(du4.getFreeBytes(), equalTo(0L));
|
||||
assertThat(du4.getUsedBytes(), equalTo(0L));
|
||||
|
@ -62,7 +71,7 @@ public class DiskUsageTests extends ESTestCase {
|
|||
for (int i = 1; i < iters; i++) {
|
||||
long total = between(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
long free = between(Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
DiskUsage du = new DiskUsage("random", "random", total, free);
|
||||
DiskUsage du = new DiskUsage("random", "random", "random", total, free);
|
||||
if (total == 0) {
|
||||
assertThat(du.getFreeBytes(), equalTo(free));
|
||||
assertThat(du.getTotalBytes(), equalTo(0L));
|
||||
|
@ -78,4 +87,52 @@ public class DiskUsageTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testFillDiskUsage() {
|
||||
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();
|
||||
Map<String, DiskUsage> newMostAvaiableUsages = new HashMap<>();
|
||||
FsInfo.Path[] node1FSInfo = new FsInfo.Path[] {
|
||||
new FsInfo.Path("/middle", "/dev/sda", 100, 90, 80),
|
||||
new FsInfo.Path("/least", "/dev/sdb", 200, 190, 70),
|
||||
new FsInfo.Path("/most", "/dev/sdc", 300, 290, 280),
|
||||
};
|
||||
FsInfo.Path[] node2FSInfo = new FsInfo.Path[] {
|
||||
new FsInfo.Path("/least_most", "/dev/sda", 100, 90, 80),
|
||||
};
|
||||
|
||||
FsInfo.Path[] node3FSInfo = new FsInfo.Path[] {
|
||||
new FsInfo.Path("/least", "/dev/sda", 100, 90, 70),
|
||||
new FsInfo.Path("/most", "/dev/sda", 100, 90, 80),
|
||||
};
|
||||
NodeStats[] nodeStats = new NodeStats[] {
|
||||
new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
|
||||
null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null),
|
||||
new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null),
|
||||
new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
|
||||
null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null)
|
||||
};
|
||||
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
|
||||
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
|
||||
DiskUsage mostNode_1 = newMostAvaiableUsages.get("node_1");
|
||||
assertDiskUsage(mostNode_1, node1FSInfo[2]);
|
||||
assertDiskUsage(leastNode_1, node1FSInfo[1]);
|
||||
|
||||
DiskUsage leastNode_2 = newLeastAvaiableUsages.get("node_2");
|
||||
DiskUsage mostNode_2 = newMostAvaiableUsages.get("node_2");
|
||||
assertDiskUsage(leastNode_2, node2FSInfo[0]);
|
||||
assertDiskUsage(mostNode_2, node2FSInfo[0]);
|
||||
|
||||
DiskUsage leastNode_3 = newLeastAvaiableUsages.get("node_3");
|
||||
DiskUsage mostNode_3 = newMostAvaiableUsages.get("node_3");
|
||||
assertDiskUsage(leastNode_3, node3FSInfo[0]);
|
||||
assertDiskUsage(mostNode_3, node3FSInfo[1]);
|
||||
}
|
||||
|
||||
private void assertDiskUsage(DiskUsage usage, FsInfo.Path path) {
|
||||
assertEquals(usage.toString(), usage.getPath(), path.getPath());
|
||||
assertEquals(usage.toString(), usage.getTotalBytes(), path.getTotal().bytes());
|
||||
assertEquals(usage.toString(), usage.getFreeBytes(), path.getAvailable().bytes());
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,9 +63,9 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
|||
ClusterService clusterService, ThreadPool threadPool) {
|
||||
super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool);
|
||||
this.clusterName = ClusterName.clusterNameFromSettings(settings);
|
||||
stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", 100, 100));
|
||||
stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", 100, 100));
|
||||
stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", 100, 100));
|
||||
stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", "_na_", 100, 100));
|
||||
stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", "_na_", 100, 100));
|
||||
stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", "_na_", 100, 100));
|
||||
}
|
||||
|
||||
public void setN1Usage(String nodeName, DiskUsage newUsage) {
|
||||
|
|
|
@ -59,7 +59,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
|||
AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
|
||||
return new ClusterInfo() {
|
||||
@Override
|
||||
public Long getShardSize(ShardRouting shardRouting) {
|
||||
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
|
||||
|
@ -118,7 +118,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
|||
final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
|
||||
return new ClusterInfo() {
|
||||
@Override
|
||||
public Long getShardSize(ShardRouting shardRouting) {
|
||||
if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) {
|
||||
|
|
|
@ -66,7 +66,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
|
|||
new ClusterInfoService() {
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) {
|
||||
return new ClusterInfo() {
|
||||
@Override
|
||||
public Long getShardSize(ShardRouting shardRouting) {
|
||||
if (shardRouting.index().equals("test")) {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.test.ESAllocationTestCase;
|
|||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -65,15 +66,15 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node1", new DiskUsage("node1", "node1", 100, 10)); // 90% used
|
||||
usages.put("node2", new DiskUsage("node2", "node2", 100, 35)); // 65% used
|
||||
usages.put("node3", new DiskUsage("node3", "node3", 100, 60)); // 40% used
|
||||
usages.put("node4", new DiskUsage("node4", "node4", 100, 80)); // 20% used
|
||||
usages.put("node1", new DiskUsage("node1", "node1", "_na_", 100, 10)); // 90% used
|
||||
usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 35)); // 65% used
|
||||
usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 60)); // 40% used
|
||||
usages.put("node4", new DiskUsage("node4", "node4", "_na_", 100, 80)); // 20% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
shardSizes.put("[test][0][r]", 10L);
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(
|
||||
|
@ -92,7 +93,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.concurrent_recoveries", 10)
|
||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
|
||||
|
@ -259,16 +259,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build();
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node1", new DiskUsage("node1", "n1", 100, 10)); // 90% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 10)); // 90% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", 100, 60)); // 40% used
|
||||
usages.put("node4", new DiskUsage("node4", "n4", 100, 80)); // 20% used
|
||||
usages.put("node5", new DiskUsage("node5", "n5", 100, 85)); // 15% used
|
||||
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 10)); // 90% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 10)); // 90% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 60)); // 40% used
|
||||
usages.put("node4", new DiskUsage("node4", "n4", "_na_", 100, 80)); // 20% used
|
||||
usages.put("node5", new DiskUsage("node5", "n5", "_na_", 100, 85)); // 15% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
shardSizes.put("[test][0][r]", 10L);
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(
|
||||
|
@ -329,8 +329,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary);
|
||||
|
||||
// Make node without the primary now habitable to replicas
|
||||
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", 100, 35)); // 65% used
|
||||
final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "_na_", 100, 35)); // 65% used
|
||||
final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
cis = new ClusterInfoService() {
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
|
@ -524,12 +524,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "71%").build();
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node1", new DiskUsage("node1", "n1", 100, 31)); // 69% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 1)); // 99% used
|
||||
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 31)); // 69% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 1)); // 99% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(
|
||||
|
@ -590,13 +590,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build();
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node2", new DiskUsage("node2", "node2", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "node3", 100, 0)); // 100% used
|
||||
usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 0)); // 100% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
shardSizes.put("[test][0][r]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(
|
||||
|
@ -661,8 +661,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", 100, 0)); // 100% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used
|
||||
|
||||
DiskUsage node1Usage = decider.averageUsage(rn, usages);
|
||||
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
|
||||
|
@ -675,10 +675,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", 100, 0)); // 100% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used
|
||||
usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used
|
||||
|
||||
Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", 100, 30), 11L);
|
||||
Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "_na_", 100, 30), 11L);
|
||||
assertThat(after, equalTo(19.0));
|
||||
}
|
||||
|
||||
|
@ -691,16 +691,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
|
||||
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node1", new DiskUsage("node1", "n1", 100, 40)); // 60% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 40)); // 60% used
|
||||
usages.put("node2", new DiskUsage("node3", "n3", 100, 40)); // 60% used
|
||||
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 40)); // 60% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 40)); // 60% used
|
||||
usages.put("node2", new DiskUsage("node3", "n3", "_na_", 100, 40)); // 60% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 14L); // 14 bytes
|
||||
shardSizes.put("[test][0][r]", 14L);
|
||||
shardSizes.put("[test2][0][p]", 1L); // 1 bytes
|
||||
shardSizes.put("[test2][0][r]", 1L);
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(
|
||||
|
@ -797,13 +797,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
|||
|
||||
// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
|
||||
Map<String, DiskUsage> usages = new HashMap<>();
|
||||
usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used
|
||||
usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 20)); // 80% used
|
||||
usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 100)); // 0% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 40L);
|
||||
shardSizes.put("[test][1][p]", 40L);
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes));
|
||||
|
||||
DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
|
||||
MetaData metaData = MetaData.builder()
|
||||
|
|
|
@ -20,15 +20,15 @@
|
|||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
@ -89,13 +89,120 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
|
|||
assertFalse("relocations should now be disabled", decider.isIncludeRelocations());
|
||||
}
|
||||
|
||||
public void testCanAllocateUsesMaxAvailableSpace() {
|
||||
NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY);
|
||||
ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
|
||||
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
|
||||
|
||||
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsNew(metaData.index("test"))
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
|
||||
.put(node_0)
|
||||
.put(node_1)
|
||||
).build();
|
||||
|
||||
// actual test -- after all that bloat :)
|
||||
Map<String, DiskUsage> leastAvailableUsages = new HashMap<>();
|
||||
leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full
|
||||
leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full
|
||||
|
||||
Map<String, DiskUsage> mostAvailableUsage = new HashMap<>();
|
||||
mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, randomIntBetween(20, 100))); // 20 - 99 percent since after allocation there must be at least 10% left and shard is 10byte
|
||||
mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, randomIntBetween(0, 10))); // this is weird and smells like a bug! it should be up to 20%?
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes));
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
|
||||
}
|
||||
|
||||
|
||||
public void testCanRemainUsesLeastAvailableSpace() {
|
||||
NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY);
|
||||
ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
|
||||
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
|
||||
|
||||
|
||||
DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
|
||||
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
ShardRoutingHelper.initialize(test_0, node_0.getId());
|
||||
ShardRoutingHelper.moveToStarted(test_0);
|
||||
|
||||
|
||||
ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
ShardRoutingHelper.initialize(test_1, node_1.getId());
|
||||
ShardRoutingHelper.moveToStarted(test_1);
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsNew(metaData.index("test"))
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
logger.info("--> adding two nodes");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
|
||||
.put(node_0)
|
||||
.put(node_1)
|
||||
).build();
|
||||
|
||||
// actual test -- after all that bloat :)
|
||||
Map<String, DiskUsage> leastAvailableUsages = new HashMap<>();
|
||||
leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 10)); // 90% used
|
||||
leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 9)); // 91% used
|
||||
|
||||
Map<String, DiskUsage> mostAvailableUsage = new HashMap<>();
|
||||
mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 90)); // 10% used
|
||||
mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 90)); // 10% used
|
||||
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
shardSizes.put("[test][1][p]", 10L);
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes));
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
|
||||
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
|
||||
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
|
||||
try {
|
||||
decider.canRemain(test_0, new RoutingNode("node_1", node_1), allocation);
|
||||
fail("not allocated on this node");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
// not allocated on that node
|
||||
}
|
||||
try {
|
||||
decider.canRemain(test_1, new RoutingNode("node_0", node_0), allocation);
|
||||
fail("not allocated on this node");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
// not allocated on that node
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testShardSizeAndRelocatingSize() {
|
||||
Map<String, Long> shardSizes = new HashMap<>();
|
||||
shardSizes.put("[test][0][r]", 10L);
|
||||
shardSizes.put("[test][1][r]", 100L);
|
||||
shardSizes.put("[test][2][r]", 1000L);
|
||||
shardSizes.put("[other][0][p]", 10000L);
|
||||
ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, shardSizes);
|
||||
ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP, shardSizes);
|
||||
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
ShardRoutingHelper.initialize(test_0, "node1");
|
||||
ShardRoutingHelper.moveToStarted(test_0);
|
||||
|
|
|
@ -76,9 +76,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
// Start with all nodes at 50% usage
|
||||
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
|
||||
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
|
||||
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50));
|
||||
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50));
|
||||
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", 100, 50));
|
||||
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50));
|
||||
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50));
|
||||
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50));
|
||||
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
|
||||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, randomFrom("20b", "80%"))
|
||||
|
@ -97,8 +97,8 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
@Override
|
||||
public void run() {
|
||||
ClusterInfo info = cis.getClusterInfo();
|
||||
logger.info("--> got: {} nodes", info.getNodeDiskUsages().size());
|
||||
assertThat(info.getNodeDiskUsages().size(), greaterThan(0));
|
||||
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -113,9 +113,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
// Update the disk usages so one node has now passed the high watermark
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 0)); // nothing free on node3
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 0)); // nothing free on node3
|
||||
|
||||
// Retrieve the count of shards on each node
|
||||
final Map<String, Integer> nodesToShardCount = newHashMap();
|
||||
|
@ -138,9 +138,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// Update the disk usages so one node is now back under the high watermark
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 50)); // node3 has free space now
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); // node3 has free space now
|
||||
|
||||
// Retrieve the count of shards on each node
|
||||
nodesToShardCount.clear();
|
||||
|
|
Loading…
Reference in New Issue