Add AllocationDecider that takes free disk space into account

This commit adds two main pieces, the first is a ClusterInfoService
that provides a service running on the master nodes that fetches the
total/free bytes for each data node in the cluster as well as the
sizes of all shards in the cluster. This information is gathered by
default every 30 seconds, and can be changed dynamically by setting
the `cluster.info.update.interval` setting. This ClusterInfoService
can hopefully be used in the future to weight nodes for allocation
based on their disk usage, if desired.

The second main piece is the DiskThresholdDecider, which can disallow
a shard from being allocated to a node, or from remaining on the node
depending on configuration parameters. There are three main
configuration parameters for the DiskThresholdDecider:

`cluster.routing.allocation.disk.threshold_enabled` controls whether
the decider is enabled. It defaults to false (disabled). Note that the
decider is also disabled for clusters with only a single data node.

`cluster.routing.allocation.disk.watermark.low` controls the low
watermark for disk usage. It defaults to 0.70, meaning ES will not
allocate new shards to nodes once they have more than 70% disk
used. It can also be set to an absolute byte value (like 500mb) to
prevent ES from allocating shards if less than the configured amount
of space is available.

`cluster.routing.allocation.disk.watermark.high` controls the high
watermark. It defaults to 0.85, meaning ES will attempt to relocate
shards to another node if the node disk usage rises above 85%. It can
also be set to an absolute byte value (similar to the low watermark)
to relocate shards once less than the configured amount of space is
available on the node.

Closes #3480
This commit is contained in:
Lee Hinman 2013-08-16 12:20:56 -06:00
parent 563111f0f9
commit 7d52d58747
19 changed files with 1574 additions and 35 deletions

View File

@ -93,3 +93,41 @@ The `index.routing.allocation.total_shards_per_node` setting allows to
control how many total shards for an index will be allocated per node.
It can be dynamically set on a live index using the update index
settings API.
[float]
=== Disk-based Shard Allocation
In 0.90.4 and later, Elasticsearch con be configured to prevent shard
allocation on nodes depending on disk usage for the node. This
functionality is disabled by default, and can be changed either in the
configuration file, or dynamically using:
[source,js]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"cluster.routing.allocation.disk.threshold_enabled" : true
}
}'
--------------------------------------------------
Once enabled, Elasticsearch uses two watermarks to decide whether
shards should be allocated or can remain on the node.
`cluster.routing.allocation.disk.watermark.low` controls the low
watermark for disk usage. It defaults to 0.70, meaning ES will not
allocate new shards to nodes once they have more than 70% disk
used. It can also be set to an absolute byte value (like 500mb) to
prevent ES from allocating shards if less than the configured amount
of space is available.
`cluster.routing.allocation.disk.watermark.high` controls the high
watermark. It defaults to 0.85, meaning ES will attempt to relocate
shards to another node if the node disk usage rises above 85%. It can
also be set to an absolute byte value (similar to the low watermark)
to relocate shards once less than the configured amount of space is
available on the node.
Both watermark settings can be changed dynamically using the cluster
settings API. By default, Elasticsearch will retrieve information
about the disk usage of the nodes every 30 seconds. This can also be
changed by setting the `cluster.info.update.interval` setting.

View File

@ -0,0 +1,49 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
/**
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
* and a map of shard ids to shard sizes, see
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
*/
public class ClusterInfo {
private final ImmutableMap<String, DiskUsage> usages;
private final ImmutableMap<String, Long> shardSizes;
public ClusterInfo(ImmutableMap<String, DiskUsage> usages, ImmutableMap<String, Long> shardSizes) {
this.usages = usages;
this.shardSizes = shardSizes;
}
public Map<String, DiskUsage> getNodeDiskUsages() {
return this.usages;
}
public Map<String, Long> getShardSizes() {
return this.shardSizes;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
public interface ClusterInfoService {
public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance();
public ClusterInfo getClusterInfo();
}

View File

@ -78,5 +78,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(NodeAliasesUpdatedAction.class).asEagerSingleton();
bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton();
bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
/**
* Encapsulation class used to represent the amount of disk used on a node.
*/
public class DiskUsage {
final String nodeId;
final long totalBytes;
final long freeBytes;
public DiskUsage(String nodeId, long totalBytes, long freeBytes) {
if ((totalBytes < freeBytes) || (totalBytes < 0)) {
throw new IllegalStateException("Free bytes [" + freeBytes +
"] cannot be less than 0 or greater than total bytes [" + totalBytes + "]");
}
this.nodeId = nodeId;
this.totalBytes = totalBytes;
this.freeBytes = freeBytes;
}
public double getFreeDiskAsPercentage() {
double freePct = 100.0 * ((double)freeBytes / totalBytes);
return freePct;
}
public long getFreeBytes() {
return freeBytes;
}
public long getTotalBytes() {
return totalBytes;
}
public long getUsedBytes() {
return getTotalBytes() - getFreeBytes();
}
public String toString() {
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "]";
}
}

View File

@ -0,0 +1,30 @@
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ImmutableSettings;
/**
* ClusterInfoService that provides empty maps for disk usage and shard sizes
*/
public class EmptyClusterInfoService extends AbstractComponent implements ClusterInfoService {
private final static class Holder {
private final static EmptyClusterInfoService instance = new EmptyClusterInfoService();
}
private final ClusterInfo emptyClusterInfo;
private EmptyClusterInfoService() {
super(ImmutableSettings.EMPTY);
emptyClusterInfo = new ClusterInfo(ImmutableMap.<String, DiskUsage>of(), ImmutableMap.<String, Long>of());
}
public static EmptyClusterInfoService getInstance() {
return Holder.instance;
}
@Override
public ClusterInfo getClusterInfo() {
return emptyClusterInfo;
}
}

View File

@ -0,0 +1,311 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap;
import java.util.Map;
/**
* InternalClusterInfoService provides the ClusterInfoService interface,
* routinely updated on a timer. The timer can be dynamically changed by
* setting the <code>cluster.info.update.interval</code> setting (defaulting
* to 30 seconds). The InternalClusterInfoService only runs on the master node.
* Listens for changes in the number of data nodes and immediately submits a
* ClusterInfoUpdateJob if a node has been added.
*
* Every time the timer runs, gathers information about the disk usage and
* shard sizes across the cluster.
*/
public final class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval";
private volatile TimeValue updateFrequency;
private volatile ImmutableMap<String, DiskUsage> usages;
private volatile ImmutableMap<String, Long> shardSizes;
private volatile boolean isMaster = false;
private volatile boolean enabled;
private final TransportNodesStatsAction transportNodesStatsAction;
private final TransportIndicesStatsAction transportIndicesStatsAction;
private final ClusterService clusterService;
private final ThreadPool threadPool;
@Inject
public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService,
TransportNodesStatsAction transportNodesStatsAction,
TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
ThreadPool threadPool) {
super(settings);
this.usages = ImmutableMap.of();
this.shardSizes = ImmutableMap.of();
this.transportNodesStatsAction = transportNodesStatsAction;
this.transportIndicesStatsAction = transportIndicesStatsAction;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.updateFrequency = settings.getAsTime(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, TimeValue.timeValueSeconds(30));
this.enabled = settings.getAsBoolean(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, false);
nodeSettingsService.addListener(new ApplySettings());
// Add InternalClusterInfoService to listen for Master changes
this.clusterService.add((LocalNodeMasterListener)this);
// Add to listen for state changes (when nodes are added)
this.clusterService.add((ClusterStateListener)this);
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newUpdateFrequency = settings.getAsTime(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, null);
// ClusterInfoService is only enabled if the DiskThresholdDecider is enabled
Boolean newEnabled = settings.getAsBoolean(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null);
if (newUpdateFrequency != null) {
if (newUpdateFrequency.getMillis() < TimeValue.timeValueSeconds(10).getMillis()) {
logger.warn("[{}] set too low [{}] (< 10s)", INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, newUpdateFrequency);
throw new IllegalStateException("Unable to set " + INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL + " less than 10 seconds");
} else {
logger.info("updating [{}] from [{}] to [{}]", INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, updateFrequency, newUpdateFrequency);
InternalClusterInfoService.this.updateFrequency = newUpdateFrequency;
}
}
// We don't log about enabling it here, because the DiskThresholdDecider will already be logging about enable/disable
if (newEnabled != null) {
InternalClusterInfoService.this.enabled = newEnabled;
}
}
}
@Override
public void onMaster() {
this.isMaster = true;
if (logger.isTraceEnabled()) {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
}
}
@Override
public void offMaster() {
this.isMaster = false;
}
@Override
public String executorName() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!this.enabled) {
return;
}
// Check whether it was a data node that was added
boolean dataNodeAdded = false;
for (DiscoveryNode addedNode : event.nodesDelta().addedNodes()) {
if (addedNode.dataNode()) {
dataNodeAdded = true;
break;
}
}
if (this.isMaster && dataNodeAdded && clusterService.state().getNodes().getDataNodes().size() > 1) {
if (logger.isDebugEnabled()) {
logger.debug("data node was added, retrieving new cluster info");
}
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
}
if (this.isMaster && event.nodesRemoved()) {
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
if (removedNode.dataNode()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing node from cluster info: {}", removedNode.getId());
}
Map<String, DiskUsage> newUsages = new HashMap<String, DiskUsage>(usages);
newUsages.remove(removedNode.getId());
usages = ImmutableMap.copyOf(newUsages);
}
}
}
}
@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(usages, shardSizes);
}
/**
* Class used to submit {@link ClusterInfoUpdateJob}s on the
* {@link InternalClusterInfoService} threadpool, these jobs will
* reschedule themselves by placing a new instance of this class onto the
* scheduled threadpool.
*/
public class SubmitReschedulingClusterInfoUpdatedJob implements Runnable {
@Override
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Submitting new rescheduling cluster info update job");
}
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(true));
}
}
/**
* Runnable class that performs a {@Link NodesStatsRequest} to retrieve
* disk usages for nodes in the cluster and an {@link IndicesStatsRequest}
* to retrieve the sizes of all shards to ensure they can fit on nodes
* during shard balancing.
*/
public class ClusterInfoUpdateJob implements Runnable {
// This boolean is used to signal to the ClusterInfoUpdateJob that it
// needs to reschedule itself to run again at a later time. It can be
// set to false to only run once
private final boolean reschedule;
public ClusterInfoUpdateJob(boolean reschedule) {
this.reschedule = reschedule;
}
@Override
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Performing ClusterInfoUpdateJob");
}
if (isMaster && this.reschedule) {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
}
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
}
if (!enabled) {
// Short-circuit if not enabled
if (logger.isTraceEnabled()) {
logger.trace("Skipping ClusterInfoUpdatedJob since it is disabled");
}
return;
}
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.fs(true);
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
transportNodesStatsAction.execute(nodesStatsRequest, new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
Map<String, DiskUsage> newUsages = new HashMap<String, DiskUsage>();
for (NodeStats nodeStats : nodeStatses.getNodes()) {
if (nodeStats.getFs() == null) {
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());
} else {
long available = 0;
long total = 0;
for (FsStats.Info info : nodeStats.getFs()) {
available += info.getAvailable().bytes();
total += info.getTotal().bytes();
}
String nodeId = nodeStats.getNode().id();
if (logger.isTraceEnabled()) {
logger.trace("node: [{}], total disk: {}, available disk: {}", nodeId, total, available);
}
newUsages.put(nodeId, new DiskUsage(nodeId, total, available));
}
}
usages = ImmutableMap.copyOf(newUsages);
}
@Override
public void onFailure(Throwable e) {
logger.error("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
});
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
indicesStatsRequest.store(true);
transportIndicesStatsAction.execute(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
HashMap<String, Long> newShardSizes = new HashMap<String, Long>();
for (ShardStats s : stats) {
long size = s.getStats().getStore().sizeInBytes();
String sid = shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
newShardSizes.put(sid, size);
}
shardSizes = ImmutableMap.copyOf(newShardSizes);
}
@Override
public void onFailure(Throwable e) {
logger.error("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
});
if (logger.isTraceEnabled()) {
logger.trace("Finished ClusterInfoUpdateJob");
}
}
}
/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
*/
public static String shardIdentifierFromRouting(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
@ -53,7 +54,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
public class AllocationService extends AbstractComponent {
private final AllocationDeciders allocationDeciders;
private final ClusterInfoService clusterInfoService;
private final ShardsAllocators shardsAllocators;
public AllocationService() {
@ -63,15 +64,15 @@ public class AllocationService extends AbstractComponent {
public AllocationService(Settings settings) {
this(settings,
new AllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)),
new ShardsAllocators(settings)
);
new ShardsAllocators(settings), ClusterInfoService.EMPTY);
}
@Inject
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators) {
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) {
super(settings);
this.allocationDeciders = allocationDeciders;
this.shardsAllocators = shardsAllocators;
this.clusterInfoService = clusterInfoService;
}
/**
@ -87,7 +88,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards);
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo());
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
@ -112,7 +113,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards);
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo());
boolean changed = false;
for (ShardRouting failedShard : failedShards) {
changed |= applyFailedShard(allocation, failedShard, true);
@ -130,7 +131,7 @@ public class AllocationService extends AbstractComponent {
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
@ -151,7 +152,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
@ -167,7 +168,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
Iterable<DiscoveryNode> dataNodes = allocation.nodes().dataNodes().values();
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -34,8 +35,8 @@ public class FailedRerouteAllocation extends RoutingAllocation {
private final List<ShardRouting> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<ShardRouting> failedShards) {
super(deciders, routingNodes, nodes);
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<ShardRouting> failedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, nodes, clusterInfo);
this.failedShards = failedShards;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -92,6 +93,8 @@ public class RoutingAllocation {
private final AllocationExplanation explanation = new AllocationExplanation();
private final ClusterInfo clusterInfo;
private Map<ShardId, String> ignoredShardToNodes = null;
private boolean ignoreDisable = false;
@ -103,10 +106,11 @@ public class RoutingAllocation {
* @param routingNodes Routing nodes in the current cluster
* @param nodes TODO: Documentation
*/
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo) {
this.deciders = deciders;
this.routingNodes = routingNodes;
this.nodes = nodes;
this.clusterInfo = clusterInfo;
}
/**
@ -149,6 +153,10 @@ public class RoutingAllocation {
return nodes;
}
public ClusterInfo clusterInfo() {
return clusterInfo;
}
/**
* Get explanations of current routing
* @return explanation of routing

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -34,8 +35,8 @@ public class StartedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> startedShards;
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
super(deciders, routingNodes, nodes);
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, nodes, clusterInfo);
this.startedShards = startedShards;
}

View File

@ -54,6 +54,7 @@ public class AllocationDeciders extends AllocationDecider {
.add(new DisableAllocationDecider(settings, nodeSettingsService))
.add(new AwarenessAllocationDecider(settings, nodeSettingsService))
.add(new ShardsLimitAllocationDecider(settings))
.add(new DiskThresholdDecider(settings, nodeSettingsService))
.build()
);
}

View File

@ -60,6 +60,7 @@ public class AllocationDecidersModule extends AbstractModule {
allocationMultibinder.addBinding().to(DisableAllocationDecider.class);
allocationMultibinder.addBinding().to(AwarenessAllocationDecider.class);
allocationMultibinder.addBinding().to(ShardsLimitAllocationDecider.class);
allocationMultibinder.addBinding().to(DiskThresholdDecider.class);
for (Class<? extends AllocationDecider> allocation : allocations) {
allocationMultibinder.addBinding().to(allocation);
}

View File

@ -0,0 +1,345 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Map;
import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifierFromRouting;
/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
* being allocated to has enough disk space.
*
* It has three configurable settings, all of which can be changed dynamically:
*
* <code>cluster.routing.allocation.disk.watermark.low</code> is the low disk
* watermark. New shards will not allocated to a node with usage higher than this,
* although this watermark may be passed by allocating a shard. It defaults to
* 0.70 (70.0%).
*
* <code>cluster.routing.allocation.disk.watermark.high</code> is the high disk
* watermark. If a node has usage higher than this, shards are not allowed to
* remain on the node. In addition, if allocating a shard to a node causes the
* node to pass this watermark, it will not be allowed. It defaults to
* 0.85 (85.0%).
*
* Both watermark settings are expressed in terms of used disk percentage, or
* exact byte values for free space (like "500mb")
*
* <code>cluster.routing.allocation.disk.threshold_enabled</code> is used to
* enable or disable this decider. It defaults to false (disabled).
*/
public class DiskThresholdDecider extends AllocationDecider {
private volatile Double freeDiskThresholdLow;
private volatile Double freeDiskThresholdHigh;
private volatile ByteSizeValue freeBytesThresholdLow;
private volatile ByteSizeValue freeBytesThresholdHigh;
private volatile boolean enabled;
public static final String CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED = "cluster.routing.allocation.disk.threshold_enabled";
public static final String CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.low";
public static final String CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.high";
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String newLowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, null);
String newHighWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, null);
Boolean newEnableSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null);
if (newEnableSetting != null) {
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED,
DiskThresholdDecider.this.enabled, newEnableSetting);
DiskThresholdDecider.this.enabled = newEnableSetting;
}
if (newLowWatermark != null) {
if (!validWatermarkSetting(newLowWatermark)) {
throw new ElasticSearchParseException("Unable to parse low watermark: [" + newLowWatermark + "]");
}
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, newLowWatermark);
DiskThresholdDecider.this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(newLowWatermark);
DiskThresholdDecider.this.freeBytesThresholdLow = thresholdBytesFromWatermark(newLowWatermark);
}
if (newHighWatermark != null) {
if (!validWatermarkSetting(newHighWatermark)) {
throw new ElasticSearchParseException("Unable to parse high watermark: [" + newHighWatermark + "]");
}
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, newHighWatermark);
DiskThresholdDecider.this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(newHighWatermark);
DiskThresholdDecider.this.freeBytesThresholdHigh = thresholdBytesFromWatermark(newHighWatermark);
}
}
}
public DiskThresholdDecider() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public DiskThresholdDecider(Settings settings) {
this(settings, new NodeSettingsService(settings));
}
@Inject
protected DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
String lowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "0.7");
String highWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "0.85");
if (!validWatermarkSetting(lowWatermark)) {
throw new ElasticSearchParseException("Unable to parse low watermark: [" + lowWatermark + "]");
}
if (!validWatermarkSetting(highWatermark)) {
throw new ElasticSearchParseException("Unable to parse high watermark: [" + highWatermark + "]");
}
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark);
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark);
this.enabled = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, false);
nodeSettingsService.addListener(new ApplySettings());
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return Decision.YES;
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return Decision.YES;
}
ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return Decision.YES;
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return Decision.YES;
}
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
if (logger.isDebugEnabled()) {
logger.debug("Unable to determine disk usage for [{}], defaulting to average across nodes [{} total] [{} free] [{}% free]",
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}
}
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
if (logger.isDebugEnabled()) {
logger.debug("Node [{}] has {}% free disk", node.nodeId(), freeDiskPercentage);
}
if (freeBytes < freeBytesThresholdLow.bytes()) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return Decision.NO;
}
if (freeDiskPercentage < freeDiskThresholdLow) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return Decision.NO;
}
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
Long shardSize = shardSizes.get(shardIdentifierFromRouting(shardRouting));
shardSize = shardSize == null ? 0 : shardSize;
double freeSpaceAfterShard = this.freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
logger.warn("After allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
return Decision.NO;
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
logger.warn("After allocating, node [{}] would have less than the required {}% free disk threshold ({}% free), preventing allocation",
node.nodeId(), freeDiskThresholdHigh, freeSpaceAfterShard);
return Decision.NO;
}
return Decision.YES;
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return Decision.YES;
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return Decision.YES;
}
ClusterInfo clusterInfo = allocation.clusterInfo();
if (clusterInfo == null) {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return Decision.YES;
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return Decision.YES;
}
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
if (logger.isDebugEnabled()) {
logger.debug("Unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}
}
// If this node is already above the high threshold, the shard cannot remain (get it off!)
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
if (logger.isDebugEnabled()) {
logger.debug("Node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes);
}
if (freeBytes < freeBytesThresholdHigh.bytes()) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return Decision.NO;
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return Decision.NO;
}
return Decision.YES;
}
/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param node Node to return an averaged DiskUsage object for
* @param usages Map of nodeId to DiskUsage for all known nodes
* @return DiskUsage representing given node using the average disk usage
*/
public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}
return new DiskUsage(node.nodeId(), totalBytes / usages.size(), freeBytes / usages.size());
}
/**
* Given the DiskUsage for a node and the size of the shard, return the
* percentage of free disk if the shard were to be allocated to the node.
* @param usage A DiskUsage for the node to have space computed for
* @param shardSize Size in bytes of the shard
* @return Percentage of free space after the shard is assigned to the node
*/
public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
shardSize = (shardSize == null) ? 0 : shardSize;
return 100.0 - (((double)(usage.getUsedBytes() + shardSize) / usage.getTotalBytes()) * 100.0);
}
/**
* Attempts to parse the watermark into a percentage, returning 100.0% if
* it cannot be parsed.
*/
public double thresholdPercentageFromWatermark(String watermark) {
try {
return 100.0 * Double.parseDouble(watermark);
} catch (NumberFormatException ex) {
return 100.0;
}
}
/**
* Attempts to parse the watermark into a {@link ByteSizeValue}, returning
* a ByteSizeValue of 0 bytes if the value cannot be parsed.
*/
public ByteSizeValue thresholdBytesFromWatermark(String watermark) {
try {
return ByteSizeValue.parseBytesSizeValue(watermark);
} catch (ElasticSearchParseException ex) {
return ByteSizeValue.parseBytesSizeValue("0b");
}
}
/**
* Checks if a watermark string is a valid percentage or byte size value,
* returning true if valid, false if invalid.
*/
public boolean validWatermarkSetting(String watermark) {
try {
double w = Double.parseDouble(watermark);
if (w < 0 || w > 1.0) {
return false;
}
return true;
} catch (NumberFormatException e) {
try {
ByteSizeValue.parseBytesSizeValue(watermark);
return true;
} catch (ElasticSearchParseException ex) {
return false;
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.settings;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
@ -68,6 +69,10 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
}
public void addDynamicSettings(String... settings) {

View File

@ -0,0 +1,60 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
public class DiskUsageTests extends ElasticsearchTestCase {
@Test
public void diskUsageCalcTest() {
DiskUsage du = new DiskUsage("node1", 100, 40);
assertThat(du.getFreeDiskAsPercentage(), equalTo(40.0));
assertThat(du.getFreeBytes(), equalTo(40L));
assertThat(du.getUsedBytes(), equalTo(60L));
assertThat(du.getTotalBytes(), equalTo(100L));
}
@Test
public void randomDiskUsageTest() {
for (int i=1;i<1000000;i++) {
long total = between(Integer.MIN_VALUE, Integer.MAX_VALUE);
long free = between(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (free > total || total <= 0) {
try {
new DiskUsage("random", total, free);
fail("should never reach this");
} catch (IllegalStateException e) {
assert true;
}
} else {
DiskUsage du = new DiskUsage("random", total, free);
assertThat(du.getFreeBytes(), equalTo(free));
assertThat(du.getTotalBytes(), equalTo(total));
assertThat(du.getUsedBytes(), equalTo(total - free));
assertThat(du.getFreeDiskAsPercentage(), equalTo(100.0 * ((double)free / total)));
}
}
}
}

View File

@ -0,0 +1,601 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.Matchers.equalTo;
public class DiskThresholdDeciderTests extends ElasticsearchTestCase {
@Test
public void diskThresholdTest() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node1", new DiskUsage("node1", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", 100, 35)); // 65% used
usages.put("node3", new DiskUsage("node3", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", 100, 80)); // 20% used
Map<String, Long> shardSizes = new HashMap<String, Long>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Primary shard should be initializing, replica should not
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that we're able to start the primary
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
// Assert that node1 didn't get any shards because its disk usage is too high
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
logger.info("--> adding node3");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica is initialized now that node3 is available with enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> changing decider settings");
// Set the low threshold to 60 instead of 70
// Set the high threshold to 70 instead of 80
// node2 now should not have new shards allocated to it, but shards can remain
diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.6)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.7).build();
deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> changing settings again");
// Set the low threshold to 50 instead of 60
// Set the high threshold to 60 instead of 70
// node2 now should not have new shards allocated to it, and shards cannot remain
diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.5)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.6).build();
deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
// Shard hasn't been moved off of node2 yet because there's nowhere for it to go
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> adding node4");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node4"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> apply INITIALIZING shards");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
// Node4 is available now, so the shard is moved off of node2
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node4").shards().size(), equalTo(1));
}
@Test
public void diskThresholdWithAbsoluteSizesTest() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "30b")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "20b").build();
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node1", new DiskUsage("node1", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", 100, 35)); // 65% used
usages.put("node3", new DiskUsage("node3", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", 100, 80)); // 20% used
Map<String, Long> shardSizes = new HashMap<String, Long>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Primary shard should be initializing, replica should not
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that we're able to start the primary
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
// Assert that node1 didn't get any shards because its disk usage is too high
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
logger.info("--> adding node3");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node3"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica is initialized now that node3 is available with enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that the replica couldn't be started since node1 doesn't have enough space
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> changing decider settings");
// Set the low threshold to 60 instead of 70
// Set the high threshold to 70 instead of 80
// node2 now should not have new shards allocated to it, but shards can remain
diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "40b")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "30b").build();
deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> changing settings again");
// Set the low threshold to 50 instead of 60
// Set the high threshold to 60 instead of 70
// node2 now should not have new shards allocated to it, and shards cannot remain
diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "50b")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "40b").build();
deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
// Shard hasn't been moved off of node2 yet because there's nowhere for it to go
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
logger.info("--> adding node4");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes())
.put(newNode("node4"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shards remain started
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> apply INITIALIZING shards");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
assertThat(clusterState.getRoutingNodes().node("node1").shards().size(), equalTo(0));
// Node4 is available now, so the shard is moved off of node2
assertThat(clusterState.getRoutingNodes().node("node2").shards().size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").shards().size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node4").shards().size(), equalTo(1));
}
@Test
public void diskThresholdWithShardSizes() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.71).build();
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node1", new DiskUsage("node1", 100, 31)); // 69% used
usages.put("node2", new DiskUsage("node2", 100, 1)); // 99% used
Map<String, Long> shardSizes = new HashMap<String, Long>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = routingTable()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding node1");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node2")) // node2 is added because DiskThresholdDecider automatically ignore single-node clusters
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shard can't be allocated to node1 (or node2) because it would cause too much usage
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
// No shards are started, no nodes have enough disk for allocation
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(0));
}
@Test
public void unknownDiskUsageTest() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build();
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node2", new DiskUsage("node2", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", 100, 0)); // 100% used
Map<String, Long> shardSizes = new HashMap<String, Long>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = routingTable()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding node1");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1"))
.put(newNode("node3")) // node3 is added because DiskThresholdDecider automatically ignore single-node clusters
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Shard can't be allocated to node1 (or node2) because the average usage is 75% > 70%
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
// No shards are started, node1 doesn't have enough disk usage
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(0));
}
@Test
public void averageUsageUnitTest() {
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
DiskThresholdDecider decider = new DiskThresholdDecider(ImmutableSettings.EMPTY);
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node2", new DiskUsage("node2", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", 100, 0)); // 100% used
DiskUsage node1Usage = decider.averageUsage(rn, usages);
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
assertThat(node1Usage.getFreeBytes(), equalTo(25L));
}
@Test
public void freeDiskPercentageAfterShardAssignedUnitTest() {
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
DiskThresholdDecider decider = new DiskThresholdDecider(ImmutableSettings.EMPTY);
Map<String, DiskUsage> usages = new HashMap<String, DiskUsage>();
usages.put("node2", new DiskUsage("node2", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", 100, 0)); // 100% used
Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", 100, 30), 11L);
assertThat(after, equalTo(19.0));
}
public void logShardStates(ClusterState state) {
RoutingNodes rn = state.routingNodes();
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
rn.shards(new Predicate<MutableShardRouting>() {
@Override
public boolean apply(org.elasticsearch.cluster.routing.MutableShardRouting input) {
return true;
}
}).size(),
rn.shardsWithState(UNASSIGNED).size(),
rn.shardsWithState(INITIALIZING).size(),
rn.shardsWithState(RELOCATING).size(),
rn.shardsWithState(STARTED).size());
logger.info("--> unassigned: {}, initializing: {}, relocating: {}, started: {}",
rn.shardsWithState(UNASSIGNED),
rn.shardsWithState(INITIALIZING),
rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED));
}
}

View File

@ -19,30 +19,13 @@
package org.elasticsearch.test.unit.cluster.routing.allocation;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -60,6 +43,19 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.List;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
public class BalanceConfigurationTests {
private final ESLogger logger = Loggers.getLogger(BalanceConfigurationTests.class);
@ -460,7 +456,7 @@ public class BalanceConfigurationTests {
unassigned.clear();
return changed;
}
}));
}), ClusterInfoService.EMPTY);
MetaData.Builder metaDataBuilder = newMetaDataBuilder();
RoutingTable.Builder routingTableBuilder = routingTable();
IndexMetaData.Builder indexMeta = newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Builder;
@ -65,7 +66,7 @@ public class RandomAllocationDeciderTests extends ElasticsearchTestCase {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<AllocationDecider>(Arrays.asList(new SameShardAllocationDecider(ImmutableSettings.EMPTY),
randomAllocationDecider))), new ShardsAllocators());
randomAllocationDecider))), new ShardsAllocators(), ClusterInfoService.EMPTY);
int indices = between(1, 20);
Builder metaBuilder = newMetaDataBuilder();
int maxNumReplicas = 1;