Reroute shards when a node goes under disk watermarks

Previously we issued a reroute when a node went over the high watermark
in order to move shards away from the node. This change tracks nodes
that have previously been over the high or low watermarks and issues a
reroute when the node goes back underneath the watermark.

This allows shards that may be unassigned to be assigned back to a node
that was previously over the low watermark but no longer is.

Resolves #12422
This commit is contained in:
Lee Hinman 2015-07-24 10:57:21 -06:00
parent ba63d57e82
commit 5a8356c86e
2 changed files with 91 additions and 24 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
@ -37,6 +38,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Map;
import java.util.Set;
/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
@ -128,6 +130,8 @@ public class DiskThresholdDecider extends AllocationDecider {
*/
class DiskListener implements ClusterInfoService.Listener {
private final Client client;
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
private long lastRunNS;
DiskListener(Client client) {
@ -162,21 +166,55 @@ public class DiskThresholdDecider extends AllocationDecider {
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
if (usages != null) {
boolean reroute = false;
for (DiskUsage entry : usages.values()) {
warnAboutDiskIfNeeded(entry);
if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
entry.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
String explanation = "";
// Garbage collect nodes that have been removed from the cluster
// from the map that tracks watermark crossing
Set<String> nodes = usages.keySet();
for (String node : nodeHasPassedWatermark) {
if (nodes.contains(node) == false) {
nodeHasPassedWatermark.remove(node);
}
}
for (Map.Entry<String, DiskUsage> entry : usages.entrySet()) {
String node = entry.getKey();
DiskUsage usage = entry.getValue();
warnAboutDiskIfNeeded(usage);
if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) {
lastRunNS = System.nanoTime();
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
} else {
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
entry, DiskThresholdDecider.this.rerouteInterval);
node, DiskThresholdDecider.this.rerouteInterval);
}
nodeHasPassedWatermark.add(node);
} else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes() ||
usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) {
nodeHasPassedWatermark.add(node);
} else {
if (nodeHasPassedWatermark.contains(node)) {
// The node has previously been over the high or
// low watermark, but is no longer, so we should
// reroute so any unassigned shards can be allocated
// if they are able to be
if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) {
lastRunNS = System.nanoTime();
reroute = true;
explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node);
} else {
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
}
}
}
if (reroute) {
logger.info("high disk watermark exceeded on one or more nodes, rerouting shards");
logger.info("rerouting shards: [{}]", explanation);
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
}

View File

@ -48,6 +48,7 @@ import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
@ -59,13 +60,13 @@ public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
// Use the mock internal cluster info service, which has fake-able disk usages
.put(ClusterModule.CLUSTER_SERVICE_IMPL, MockInternalClusterInfoService.class.getName())
// Update more frequently
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "2s")
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "1s")
.build();
}
@Test
//@TestLogging("org.elasticsearch.cluster:TRACE,org.elasticsearch.cluster.routing.allocation.decider:TRACE")
public void testRerouteOccursOnDiskpassingHighWatermark() throws Exception {
public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3).get();
// Wait for all 3 nodes to be up
@ -87,7 +88,7 @@ public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, randomFrom("20b", "80%"))
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, randomFrom("10b", "90%"))
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, "1s")).get();
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, "1ms")).get();
// Create an index with 10 shards so we can check allocation for it
prepareCreate("test").setSettings(settingsBuilder()
@ -106,7 +107,7 @@ public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
}
});
List<String> realNodeNames = newArrayList();
final List<String> realNodeNames = newArrayList();
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
@ -121,22 +122,50 @@ public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 0)); // nothing free on node3
// Cluster info gathering interval is 2 seconds, give reroute 2 seconds to kick in
Thread.sleep(4000);
// Retrieve the count of shards on each node
final Map<String, Integer> nodesToShardCount = newHashMap();
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
}
});
// Update the disk usages so one node is now back under the high watermark
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50));
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50));
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 50)); // node3 has free space now
// Retrieve the count of shards on each node
resp = client().admin().cluster().prepareState().get();
iter = resp.getState().getRoutingNodes().iterator();
Map<String, Integer> nodesToShardCount = newHashMap();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
nodesToShardCount.clear();
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
}
});
}
/** Create a fake NodeStats for the given node and usage */