From 4ac7b02ce7d2c07e1f217788ac676cac1516fc32 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 23 Oct 2014 12:39:18 +0200 Subject: [PATCH] Reroute shards automatically when high disk watermark is exceeded This adds a Listener interface to the ClusterInfoService, this is used by the DiskThresholdDecider, which adds a listener to check for nodes passing the high watermark. If a node is past the high watermark an empty reroute is issued so shards can be reallocated if desired. A reroute will only be issued once every `cluster.routing.allocation.disk.reroute_interval`, which is "60s" by default. Refactors InternalClusterInfoService to delegate the nodes stats and indices stats gathering into separate methods so they have be overriden by extending classes. Each stat gathering method returns a CountDownLatch that can be used to wait until processing for that part is successful before calling the listeners. Fixes #8146 --- .../action/LatchedActionListener.java | 55 +++++ .../cluster/ClusterInfoService.java | 15 ++ .../elasticsearch/cluster/ClusterModule.java | 3 +- .../org/elasticsearch/cluster/DiskUsage.java | 2 +- .../cluster/EmptyClusterInfoService.java | 5 + .../cluster/InternalClusterInfoService.java | 74 +++++-- .../decider/AllocationDecidersModule.java | 4 +- .../decider/DiskThresholdDecider.java | 68 +++++- .../ClusterDynamicSettingsModule.java | 3 +- .../org/elasticsearch/monitor/fs/FsStats.java | 21 +- .../decider/DiskThresholdDeciderTests.java | 30 +++ .../decider/MockDiskUsagesTests.java | 203 ++++++++++++++++++ 12 files changed, 454 insertions(+), 29 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/LatchedActionListener.java create mode 100644 src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesTests.java diff --git a/src/main/java/org/elasticsearch/action/LatchedActionListener.java b/src/main/java/org/elasticsearch/action/LatchedActionListener.java new file mode 100644 index 00000000000..fa17ae5c8fe --- /dev/null +++ b/src/main/java/org/elasticsearch/action/LatchedActionListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import java.util.concurrent.CountDownLatch; + +/** + * An action listener that allows passing in a {@link CountDownLatch} that + * will be counted down after onResponse or onFailure is called + */ +public final class LatchedActionListener implements ActionListener { + + private final ActionListener delegate; + private final CountDownLatch latch; + + public LatchedActionListener(ActionListener delegate, CountDownLatch latch) { + this.delegate = delegate; + this.latch = latch; + } + + @Override + public void onResponse(T t) { + try { + delegate.onResponse(t); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Throwable e) { + try { + delegate.onFailure(e); + } finally { + latch.countDown(); + } + } +} diff --git a/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java b/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java index 568a5582dc1..e17b2326386 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java @@ -19,10 +19,25 @@ package org.elasticsearch.cluster; +/** + * Interface for a class used to gather information about a cluster at + * regular intervals + */ public interface ClusterInfoService { public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance(); + /** The latest cluster information */ public ClusterInfo getClusterInfo(); + /** Add a listener that will be called every time new information is gathered */ + public void addListener(Listener listener); + + /** + * Interface for listeners to implement in order to perform actions when + * new information about the cluster has been gathered + */ + public interface Listener { + public void onNewInfo(ClusterInfo info); + } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 085992ff23a..3798079fe77 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -47,6 +47,7 @@ import java.util.Set; public class ClusterModule extends AbstractModule implements SpawnModules { private final Settings settings; + public static final String CLUSTER_SERVICE_IMPL = "cluster.info.service.type"; private Set> indexTemplateFilters = new HashSet<>(); @@ -87,7 +88,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); - bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton(); + bind(ClusterInfoService.class).to(settings.getAsClass(CLUSTER_SERVICE_IMPL, InternalClusterInfoService.class)).asEagerSingleton(); Multibinder mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class); for (Class indexTemplateFilter : indexTemplateFilters) { diff --git a/src/main/java/org/elasticsearch/cluster/DiskUsage.java b/src/main/java/org/elasticsearch/cluster/DiskUsage.java index 2751c616206..acea8594ae0 100644 --- a/src/main/java/org/elasticsearch/cluster/DiskUsage.java +++ b/src/main/java/org/elasticsearch/cluster/DiskUsage.java @@ -55,6 +55,6 @@ public class DiskUsage { } public String toString() { - return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "]"; + return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "%]"; } } diff --git a/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java b/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java index 92a2b0af4a9..096ea43e9b1 100644 --- a/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java +++ b/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java @@ -46,4 +46,9 @@ public class EmptyClusterInfoService extends AbstractComponent implements Cluste public ClusterInfo getClusterInfo() { return emptyClusterInfo; } + + @Override + public void addListener(Listener listener) { + // no-op, no new info is ever gathered, so adding listeners is useless + } } diff --git a/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 6c4ed216be3..1281084eb5e 100644 --- a/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -42,8 +43,9 @@ import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * InternalClusterInfoService provides the ClusterInfoService interface, @@ -56,7 +58,7 @@ import java.util.Map; * Every time the timer runs, gathers information about the disk usage and * shard sizes across the cluster. */ -public final class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener { +public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener { public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval"; @@ -70,6 +72,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen private final TransportIndicesStatsAction transportIndicesStatsAction; private final ClusterService clusterService; private final ThreadPool threadPool; + private final Set listeners = Collections.synchronizedSet(new HashSet()); @Inject public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService, @@ -188,6 +191,11 @@ public final class InternalClusterInfoService extends AbstractComponent implemen return new ClusterInfo(usages, shardSizes); } + @Override + public void addListener(Listener listener) { + this.listeners.add(listener); + } + /** * Class used to submit {@link ClusterInfoUpdateJob}s on the * {@link InternalClusterInfoService} threadpool, these jobs will @@ -210,6 +218,34 @@ public final class InternalClusterInfoService extends AbstractComponent implemen } } + /** + * Retrieve the latest nodes stats, calling the listener when complete + * @return a latch that can be used to wait for the nodes stats to complete if desired + */ + protected CountDownLatch updateNodeStats(final ActionListener listener) { + final CountDownLatch latch = new CountDownLatch(1); + final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true"); + nodesStatsRequest.clear(); + nodesStatsRequest.fs(true); + nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15)); + + transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch)); + return latch; + } + + /** + * Retrieve the latest indices stats, calling the listener when complete + * @return a latch that can be used to wait for the indices stats to complete if desired + */ + protected CountDownLatch updateIndicesStats(final ActionListener listener) { + final CountDownLatch latch = new CountDownLatch(1); + final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.clear(); + indicesStatsRequest.store(true); + + transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch)); + return latch; + } /** * Runnable class that performs a {@Link NodesStatsRequest} to retrieve @@ -252,12 +288,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen return; } - NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true"); - nodesStatsRequest.clear(); - nodesStatsRequest.fs(true); - nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15)); - - transportNodesStatsAction.execute(nodesStatsRequest, new ActionListener() { + CountDownLatch nodeLatch = updateNodeStats(new ActionListener() { @Override public void onResponse(NodesStatsResponse nodeStatses) { Map newUsages = new HashMap<>(); @@ -294,10 +325,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen } }); - IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); - indicesStatsRequest.clear(); - indicesStatsRequest.store(true); - transportIndicesStatsAction.execute(indicesStatsRequest, new ActionListener() { + CountDownLatch indicesLatch = updateIndicesStats(new ActionListener() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { ShardStats[] stats = indicesStatsResponse.getShards(); @@ -325,8 +353,24 @@ public final class InternalClusterInfoService extends AbstractComponent implemen } }); - if (logger.isTraceEnabled()) { - logger.trace("Finished ClusterInfoUpdateJob"); + try { + nodeLatch.await(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout"); + } + + try { + indicesLatch.await(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout"); + } + + for (Listener l : listeners) { + try { + l.onNewInfo(getClusterInfo()); + } catch (Exception e) { + logger.info("Failed executing ClusterInfoService listener", e); + } } } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java index 6f19e9557d7..b634dd58f44 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java @@ -53,10 +53,10 @@ public class AllocationDecidersModule extends AbstractModule { protected void configure() { Multibinder allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class); for (Class deciderClass : DEFAULT_ALLOCATION_DECIDERS) { - allocationMultibinder.addBinding().to(deciderClass); + allocationMultibinder.addBinding().to(deciderClass).asEagerSingleton(); } for (Class allocation : allocations) { - allocationMultibinder.addBinding().to(allocation); + allocationMultibinder.addBinding().to(allocation).asEagerSingleton(); } bind(AllocationDeciders.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 1923264e5cb..2201ab1f981 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -20,7 +20,9 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -30,6 +32,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.RatioValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.settings.NodeSettingsService; import java.util.List; @@ -70,11 +73,13 @@ public class DiskThresholdDecider extends AllocationDecider { private volatile ByteSizeValue freeBytesThresholdHigh; private volatile boolean includeRelocations; private volatile boolean enabled; + private volatile TimeValue rerouteInterval; public static final String CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED = "cluster.routing.allocation.disk.threshold_enabled"; public static final String CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.low"; public static final String CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.high"; public static final String CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS = "cluster.routing.allocation.disk.include_relocations"; + public static final String CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL = "cluster.routing.allocation.disk.reroute_interval"; class ApplySettings implements NodeSettingsService.Listener { @Override @@ -83,6 +88,7 @@ public class DiskThresholdDecider extends AllocationDecider { String newHighWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, null); Boolean newRelocationsSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, null); Boolean newEnableSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null); + TimeValue newRerouteInterval = settings.getAsTime(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, null); if (newEnableSetting != null) { logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, @@ -110,15 +116,59 @@ public class DiskThresholdDecider extends AllocationDecider { DiskThresholdDecider.this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(newHighWatermark); DiskThresholdDecider.this.freeBytesThresholdHigh = thresholdBytesFromWatermark(newHighWatermark); } + if (newRerouteInterval != null) { + logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, newRerouteInterval); + DiskThresholdDecider.this.rerouteInterval = newRerouteInterval; + } + } + } + + /** + * Listens for a node to go over the high watermark and kicks off an empty + * reroute if it does + */ + class DiskListener implements ClusterInfoService.Listener { + private final Client client; + private long lastRun; + + DiskListener(Client client) { + this.client = client; + } + + @Override + public void onNewInfo(ClusterInfo info) { + Map usages = info.getNodeDiskUsages(); + if (usages != null) { + for (DiskUsage entry : usages.values()) { + if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) { + if ((System.currentTimeMillis() - lastRun) > DiskThresholdDecider.this.rerouteInterval.millis()) { + lastRun = System.currentTimeMillis(); + logger.info("high watermark [{}/{}%] exceeded on {}, rerouting shards", + DiskThresholdDecider.this.freeBytesThresholdHigh, DiskThresholdDecider.this.freeDiskThresholdHigh, entry); + // Execute an empty reroute, but don't block on the response + client.admin().cluster().prepareReroute().execute(); + // Only one reroute is required, short circuit + return; + } else { + logger.debug("high watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute", + entry, DiskThresholdDecider.this.rerouteInterval); + return; + } + } + } + } } } public DiskThresholdDecider(Settings settings) { - this(settings, new NodeSettingsService(settings)); + // It's okay the Client is null here, because the empty cluster info + // service will never actually call the listener where the client is + // needed. Also this constructor is only used for tests + this(settings, new NodeSettingsService(settings), ClusterInfoService.EMPTY, null); } @Inject - public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsService) { + public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsService, ClusterInfoService infoService, Client client) { super(settings); String lowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "85%"); String highWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "90%"); @@ -136,9 +186,11 @@ public class DiskThresholdDecider extends AllocationDecider { this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark); this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark); this.includeRelocations = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true); + this.rerouteInterval = settings.getAsTime(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, TimeValue.timeValueSeconds(60)); this.enabled = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true); nodeSettingsService.addListener(new ApplySettings()); + infoService.addListener(new DiskListener(client)); } /** @@ -206,9 +258,9 @@ public class DiskThresholdDecider extends AllocationDecider { if (includeRelocations) { long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); - if (logger.isDebugEnabled()) { - logger.debug("usage without relocations: {}", usage); - logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); + if (logger.isTraceEnabled()) { + logger.trace("usage without relocations: {}", usage); + logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); } usage = usageIncludingRelocations; } @@ -348,9 +400,9 @@ public class DiskThresholdDecider extends AllocationDecider { Map shardSizes = clusterInfo.getShardSizes(); long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); - if (logger.isDebugEnabled()) { - logger.debug("usage without relocations: {}", usage); - logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); + if (logger.isTraceEnabled()) { + logger.trace("usage without relocations: {}", usage); + logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); } usage = usageIncludingRelocations; } diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 8d038f5e40b..64c5bc409ae 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -87,7 +87,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK); clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED); clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS); - clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME); + clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, Validator.TIME_NON_NEGATIVE); + clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME_NON_NEGATIVE); clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED); clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME); clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE); diff --git a/src/main/java/org/elasticsearch/monitor/fs/FsStats.java b/src/main/java/org/elasticsearch/monitor/fs/FsStats.java index 0c323ca0236..f6e7290fef5 100644 --- a/src/main/java/org/elasticsearch/monitor/fs/FsStats.java +++ b/src/main/java/org/elasticsearch/monitor/fs/FsStats.java @@ -56,6 +56,25 @@ public class FsStats implements Iterable, Streamable, ToXContent { double diskQueue = -1; double diskServiceTime = -1; + public Info() { + } + + public Info(String path, @Nullable String mount, @Nullable String dev, long total, long free, long available, long diskReads, + long diskWrites, long diskReadBytes, long diskWriteBytes, double diskQueue, double diskServiceTime) { + this.path = path; + this.mount = mount; + this.dev = dev; + this.total = total; + this.free = free; + this.available = available; + this.diskReads = diskReads; + this.diskWrites = diskWrites; + this.diskReadBytes = diskReadBytes; + this.diskWriteBytes = diskWriteBytes; + this.diskQueue = diskQueue; + this.diskServiceTime = diskServiceTime; + } + static public Info readInfoFrom(StreamInput in) throws IOException { Info i = new Info(); i.readFrom(in); @@ -286,7 +305,7 @@ public class FsStats implements Iterable, Streamable, ToXContent { } - FsStats(long timestamp, Info[] infos) { + public FsStats(long timestamp, Info[] infos) { this.timestamp = timestamp; this.infos = infos; this.total = null; diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 3264ce427de..8f846b17296 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -82,6 +82,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } + + @Override + public void addListener(Listener listener) { + // noop + } }; AllocationService strategy = new AllocationService(settingsBuilder() @@ -273,6 +278,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } + + @Override + public void addListener(Listener listener) { + // noop + } }; AllocationService strategy = new AllocationService(settingsBuilder() @@ -324,6 +334,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo2; } + + @Override + public void addListener(Listener listener) { + // noop + } }; strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) @@ -524,6 +539,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } + + @Override + public void addListener(Listener listener) { + // noop + } }; AllocationService strategy = new AllocationService(settingsBuilder() @@ -586,6 +606,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } + + @Override + public void addListener(Listener listener) { + // noop + } }; AllocationService strategy = new AllocationService(settingsBuilder() @@ -685,6 +710,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } + + @Override + public void addListener(Listener listener) { + // noop + } }; AllocationService strategy = new AllocationService(settingsBuilder() diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesTests.java new file mode 100644 index 00000000000..8fafa2e2e13 --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesTests.java @@ -0,0 +1,203 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.monitor.fs.FsStats; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class MockDiskUsagesTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Use the mock internal cluster info service, which has fake-able disk usages + .put(ClusterModule.CLUSTER_SERVICE_IMPL, MockInternalClusterInfoService.class.getName()) + // Update more frequently + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "2s") + .build(); + } + + @Test + //@TestLogging("org.elasticsearch.cluster:TRACE,org.elasticsearch.cluster.routing.allocation.decider:TRACE") + public void testRerouteOccursOnDiskpassingHighWatermark() throws Exception { + List nodes = internalCluster().startNodesAsync(3).get(); + + // Wait for all 3 nodes to be up + assertBusy(new Runnable() { + @Override + public void run() { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); + assertThat(resp.getNodes().length, equalTo(3)); + } + }); + + // Start with all nodes at 50% usage + final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) + internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); + cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), 100, 50)); + cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), 100, 50)); + cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), 100, 50)); + + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder() + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "20b") + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "10b") + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, "1s")).get(); + + // Create an index with 10 shards so we can check allocation for it + prepareCreate("test").setSettings(settingsBuilder() + .put("number_of_shards", 10) + .put("number_of_replicas", 0) + .put("index.routing.allocation.exclude._name", "")).get(); + ensureGreen("test"); + + // Block until the "fake" cluster info is retrieved at least once + assertBusy(new Runnable() { + @Override + public void run() { + ClusterInfo info = cis.getClusterInfo(); + logger.info("--> got: {} nodes", info.getNodeDiskUsages().size()); + assertThat(info.getNodeDiskUsages().size(), greaterThan(0)); + } + }); + + List realNodeNames = newArrayList(); + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + Iterator iter = resp.getState().getRoutingNodes().iterator(); + while (iter.hasNext()) { + RoutingNode node = iter.next(); + realNodeNames.add(node.nodeId()); + logger.info("--> node {} has {} shards", + node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + + // Update the disk usages so one node has now passed the high watermark + cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), 100, 50)); + cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), 100, 50)); + cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), 100, 0)); // nothing free on node3 + + // Cluster info gathering interval is 2 seconds, give reroute 2 seconds to kick in + Thread.sleep(4000); + + // Retrieve the count of shards on each node + resp = client().admin().cluster().prepareState().get(); + iter = resp.getState().getRoutingNodes().iterator(); + Map nodesToShardCount = newHashMap(); + while (iter.hasNext()) { + RoutingNode node = iter.next(); + logger.info("--> node {} has {} shards", + node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5)); + assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5)); + assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0)); + } + + /** Create a fake NodeStats for the given node and usage */ + public static NodeStats makeStats(String nodeName, DiskUsage usage) { + FsStats.Info[] infos = new FsStats.Info[1]; + FsStats.Info info = new FsStats.Info("/path.data", null, null, + usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes(), -1, -1, -1, -1, -1, -1); + infos[0] = info; + FsStats fsStats = new FsStats(System.currentTimeMillis(), infos); + return new NodeStats(new DiscoveryNode(nodeName, null, Version.V_2_0_0), + System.currentTimeMillis(), + null, null, null, null, null, null, + fsStats, + null, null, null); + } + + /** + * Fake ClusterInfoService class that allows updating the nodes stats disk + * usage with fake values + */ + public static class MockInternalClusterInfoService extends InternalClusterInfoService { + + private final ClusterName clusterName; + private volatile NodeStats[] stats = new NodeStats[3]; + + @Inject + public MockInternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService, + TransportNodesStatsAction transportNodesStatsAction, + TransportIndicesStatsAction transportIndicesStatsAction, + ClusterService clusterService, ThreadPool threadPool) { + super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool); + this.clusterName = ClusterName.clusterNameFromSettings(settings); + stats[0] = makeStats("node_t1", new DiskUsage("node_t1", 100, 100)); + stats[1] = makeStats("node_t2", new DiskUsage("node_t2", 100, 100)); + stats[2] = makeStats("node_t3", new DiskUsage("node_t3", 100, 100)); + } + + public void setN1Usage(String nodeName, DiskUsage newUsage) { + stats[0] = makeStats(nodeName, newUsage); + } + + public void setN2Usage(String nodeName, DiskUsage newUsage) { + stats[1] = makeStats(nodeName, newUsage); + } + + public void setN3Usage(String nodeName, DiskUsage newUsage) { + stats[2] = makeStats(nodeName, newUsage); + } + + @Override + public CountDownLatch updateNodeStats(final ActionListener listener) { + NodesStatsResponse response = new NodesStatsResponse(clusterName, stats); + listener.onResponse(response); + return new CountDownLatch(0); + } + + @Override + public CountDownLatch updateIndicesStats(final ActionListener listener) { + // Not used, so noop + return new CountDownLatch(0); + } + } +}