Encapsualte common code in methods

This commit groups duplicated code in methods to make the actual decision
easier to read. There is no change in functionality in this change.
This commit is contained in:
Simon Willnauer 2015-06-23 15:24:59 +02:00
parent fe330b868a
commit 49bbc42ac8
1 changed files with 74 additions and 102 deletions

View File

@ -295,63 +295,14 @@ public class DiskThresholdDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow;
double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh;
// Always allow allocation if the decider is disabled
if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
if (logger.isTraceEnabled()) {
logger.trace("Only a single node is present, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "only a single node is present");
}
// Fail open there is no info available
ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}
// Fail open if there are no disk usages available
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
if (logger.isDebugEnabled()) {
logger.debug("Unable to determine disk usage for [{}], defaulting to average across nodes [{} total] [{} free] [{}% free]",
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}
}
if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, shardSizes, false);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
}
usage = usageIncludingRelocations;
final Decision decision = earlyTerminate(allocation);
if (decision != null) {
return decision;
}
final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow;
final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh;
DiskUsage usage = getDiskUsage(node, allocation);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
@ -432,9 +383,10 @@ public class DiskThresholdDecider extends AllocationDecider {
}
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
Map<String, Long> shardSizes = allocation.clusterInfo().getShardSizes();
Long shardSize = shardSizes.get(shardIdentifierFromRouting(shardRouting));
shardSize = shardSize == null ? 0 : shardSize;
double freeSpaceAfterShard = this.freeDiskPercentageAfterShardAssigned(usage, shardSize);
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
logger.warn("After allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
@ -454,53 +406,11 @@ public class DiskThresholdDecider extends AllocationDecider {
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
final Decision decision = earlyTerminate(allocation);
if (decision != null) {
return decision;
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return allocation.decision(Decision.YES, NAME, "only a single node is present");
}
ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
if (logger.isDebugEnabled()) {
logger.debug("Unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}
}
if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, shardSizes, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
}
usage = usageIncludingRelocations;
}
DiskUsage usage = getDiskUsage(node, allocation);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
@ -527,6 +437,34 @@ public class DiskThresholdDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
}
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
if (logger.isDebugEnabled()) {
logger.debug("Unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}
}
if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, shardSizes, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
}
usage = usageIncludingRelocations;
}
return usage;
}
/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
@ -604,4 +542,38 @@ public class DiskThresholdDecider extends AllocationDecider {
}
}
}
private Decision earlyTerminate(RoutingAllocation allocation) {
// Always allow allocation if the decider is disabled
if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
if (logger.isTraceEnabled()) {
logger.trace("Only a single node is present, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "only a single node is present");
}
// Fail open there is no info available
final ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}
final Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
// Fail open if there are no disk usages available
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
}
return null;
}
}