mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-26 06:46:10 +00:00
Reroute shards automatically when high disk watermark is exceeded
This adds a Listener interface to the ClusterInfoService, this is used by the DiskThresholdDecider, which adds a listener to check for nodes passing the high watermark. If a node is past the high watermark an empty reroute is issued so shards can be reallocated if desired. A reroute will only be issued once every `cluster.routing.allocation.disk.reroute_interval`, which is "60s" by default. Refactors InternalClusterInfoService to delegate the nodes stats and indices stats gathering into separate methods so they have be overriden by extending classes. Each stat gathering method returns a CountDownLatch that can be used to wait until processing for that part is successful before calling the listeners. Fixes #8146
This commit is contained in:
parent
1645434af5
commit
4ac7b02ce7
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* An action listener that allows passing in a {@link CountDownLatch} that
|
||||
* will be counted down after onResponse or onFailure is called
|
||||
*/
|
||||
public final class LatchedActionListener<T> implements ActionListener<T> {
|
||||
|
||||
private final ActionListener<T> delegate;
|
||||
private final CountDownLatch latch;
|
||||
|
||||
public LatchedActionListener(ActionListener<T> delegate, CountDownLatch latch) {
|
||||
this.delegate = delegate;
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(T t) {
|
||||
try {
|
||||
delegate.onResponse(t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
delegate.onFailure(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
@ -19,10 +19,25 @@
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
/**
|
||||
* Interface for a class used to gather information about a cluster at
|
||||
* regular intervals
|
||||
*/
|
||||
public interface ClusterInfoService {
|
||||
|
||||
public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance();
|
||||
|
||||
/** The latest cluster information */
|
||||
public ClusterInfo getClusterInfo();
|
||||
|
||||
/** Add a listener that will be called every time new information is gathered */
|
||||
public void addListener(Listener listener);
|
||||
|
||||
/**
|
||||
* Interface for listeners to implement in order to perform actions when
|
||||
* new information about the cluster has been gathered
|
||||
*/
|
||||
public interface Listener {
|
||||
public void onNewInfo(ClusterInfo info);
|
||||
}
|
||||
}
|
||||
|
@ -47,6 +47,7 @@ import java.util.Set;
|
||||
public class ClusterModule extends AbstractModule implements SpawnModules {
|
||||
|
||||
private final Settings settings;
|
||||
public static final String CLUSTER_SERVICE_IMPL = "cluster.info.service.type";
|
||||
|
||||
private Set<Class<? extends IndexTemplateFilter>> indexTemplateFilters = new HashSet<>();
|
||||
|
||||
@ -87,7 +88,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
||||
bind(NodeMappingRefreshAction.class).asEagerSingleton();
|
||||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||
|
||||
bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
|
||||
bind(ClusterInfoService.class).to(settings.getAsClass(CLUSTER_SERVICE_IMPL, InternalClusterInfoService.class)).asEagerSingleton();
|
||||
|
||||
Multibinder<IndexTemplateFilter> mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class);
|
||||
for (Class<? extends IndexTemplateFilter> indexTemplateFilter : indexTemplateFilters) {
|
||||
|
@ -55,6 +55,6 @@ public class DiskUsage {
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "]";
|
||||
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "%]";
|
||||
}
|
||||
}
|
||||
|
@ -46,4 +46,9 @@ public class EmptyClusterInfoService extends AbstractComponent implements Cluste
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return emptyClusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// no-op, no new info is ever gathered, so adding listeners is useless
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
@ -42,8 +43,9 @@ import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* InternalClusterInfoService provides the ClusterInfoService interface,
|
||||
@ -56,7 +58,7 @@ import java.util.Map;
|
||||
* Every time the timer runs, gathers information about the disk usage and
|
||||
* shard sizes across the cluster.
|
||||
*/
|
||||
public final class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
|
||||
public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
|
||||
|
||||
public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval";
|
||||
|
||||
@ -70,6 +72,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
private final TransportIndicesStatsAction transportIndicesStatsAction;
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
private final Set<Listener> listeners = Collections.synchronizedSet(new HashSet<Listener>());
|
||||
|
||||
@Inject
|
||||
public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService,
|
||||
@ -188,6 +191,11 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
return new ClusterInfo(usages, shardSizes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
this.listeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Class used to submit {@link ClusterInfoUpdateJob}s on the
|
||||
* {@link InternalClusterInfoService} threadpool, these jobs will
|
||||
@ -210,6 +218,34 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the latest nodes stats, calling the listener when complete
|
||||
* @return a latch that can be used to wait for the nodes stats to complete if desired
|
||||
*/
|
||||
protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
|
||||
nodesStatsRequest.clear();
|
||||
nodesStatsRequest.fs(true);
|
||||
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
|
||||
|
||||
transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
|
||||
return latch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the latest indices stats, calling the listener when complete
|
||||
* @return a latch that can be used to wait for the indices stats to complete if desired
|
||||
*/
|
||||
protected CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
|
||||
indicesStatsRequest.clear();
|
||||
indicesStatsRequest.store(true);
|
||||
|
||||
transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
|
||||
return latch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runnable class that performs a {@Link NodesStatsRequest} to retrieve
|
||||
@ -252,12 +288,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
return;
|
||||
}
|
||||
|
||||
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
|
||||
nodesStatsRequest.clear();
|
||||
nodesStatsRequest.fs(true);
|
||||
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
|
||||
|
||||
transportNodesStatsAction.execute(nodesStatsRequest, new ActionListener<NodesStatsResponse>() {
|
||||
CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
|
||||
@Override
|
||||
public void onResponse(NodesStatsResponse nodeStatses) {
|
||||
Map<String, DiskUsage> newUsages = new HashMap<>();
|
||||
@ -294,10 +325,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
}
|
||||
});
|
||||
|
||||
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
|
||||
indicesStatsRequest.clear();
|
||||
indicesStatsRequest.store(true);
|
||||
transportIndicesStatsAction.execute(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
|
||||
CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<IndicesStatsResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
|
||||
ShardStats[] stats = indicesStatsResponse.getShards();
|
||||
@ -325,8 +353,24 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
|
||||
}
|
||||
});
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Finished ClusterInfoUpdateJob");
|
||||
try {
|
||||
nodeLatch.await(15, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout");
|
||||
}
|
||||
|
||||
try {
|
||||
indicesLatch.await(15, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout");
|
||||
}
|
||||
|
||||
for (Listener l : listeners) {
|
||||
try {
|
||||
l.onNewInfo(getClusterInfo());
|
||||
} catch (Exception e) {
|
||||
logger.info("Failed executing ClusterInfoService listener", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -53,10 +53,10 @@ public class AllocationDecidersModule extends AbstractModule {
|
||||
protected void configure() {
|
||||
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
|
||||
for (Class<? extends AllocationDecider> deciderClass : DEFAULT_ALLOCATION_DECIDERS) {
|
||||
allocationMultibinder.addBinding().to(deciderClass);
|
||||
allocationMultibinder.addBinding().to(deciderClass).asEagerSingleton();
|
||||
}
|
||||
for (Class<? extends AllocationDecider> allocation : allocations) {
|
||||
allocationMultibinder.addBinding().to(allocation);
|
||||
allocationMultibinder.addBinding().to(allocation).asEagerSingleton();
|
||||
}
|
||||
|
||||
bind(AllocationDeciders.class).asEagerSingleton();
|
||||
|
@ -20,7 +20,9 @@
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.DiskUsage;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
@ -30,6 +32,7 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.RatioValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.List;
|
||||
@ -70,11 +73,13 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
private volatile ByteSizeValue freeBytesThresholdHigh;
|
||||
private volatile boolean includeRelocations;
|
||||
private volatile boolean enabled;
|
||||
private volatile TimeValue rerouteInterval;
|
||||
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED = "cluster.routing.allocation.disk.threshold_enabled";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.low";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.high";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS = "cluster.routing.allocation.disk.include_relocations";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL = "cluster.routing.allocation.disk.reroute_interval";
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override
|
||||
@ -83,6 +88,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
String newHighWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, null);
|
||||
Boolean newRelocationsSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, null);
|
||||
Boolean newEnableSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null);
|
||||
TimeValue newRerouteInterval = settings.getAsTime(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, null);
|
||||
|
||||
if (newEnableSetting != null) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED,
|
||||
@ -110,15 +116,59 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
DiskThresholdDecider.this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(newHighWatermark);
|
||||
DiskThresholdDecider.this.freeBytesThresholdHigh = thresholdBytesFromWatermark(newHighWatermark);
|
||||
}
|
||||
if (newRerouteInterval != null) {
|
||||
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, newRerouteInterval);
|
||||
DiskThresholdDecider.this.rerouteInterval = newRerouteInterval;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens for a node to go over the high watermark and kicks off an empty
|
||||
* reroute if it does
|
||||
*/
|
||||
class DiskListener implements ClusterInfoService.Listener {
|
||||
private final Client client;
|
||||
private long lastRun;
|
||||
|
||||
DiskListener(Client client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNewInfo(ClusterInfo info) {
|
||||
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
|
||||
if (usages != null) {
|
||||
for (DiskUsage entry : usages.values()) {
|
||||
if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) {
|
||||
if ((System.currentTimeMillis() - lastRun) > DiskThresholdDecider.this.rerouteInterval.millis()) {
|
||||
lastRun = System.currentTimeMillis();
|
||||
logger.info("high watermark [{}/{}%] exceeded on {}, rerouting shards",
|
||||
DiskThresholdDecider.this.freeBytesThresholdHigh, DiskThresholdDecider.this.freeDiskThresholdHigh, entry);
|
||||
// Execute an empty reroute, but don't block on the response
|
||||
client.admin().cluster().prepareReroute().execute();
|
||||
// Only one reroute is required, short circuit
|
||||
return;
|
||||
} else {
|
||||
logger.debug("high watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
|
||||
entry, DiskThresholdDecider.this.rerouteInterval);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public DiskThresholdDecider(Settings settings) {
|
||||
this(settings, new NodeSettingsService(settings));
|
||||
// It's okay the Client is null here, because the empty cluster info
|
||||
// service will never actually call the listener where the client is
|
||||
// needed. Also this constructor is only used for tests
|
||||
this(settings, new NodeSettingsService(settings), ClusterInfoService.EMPTY, null);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsService, ClusterInfoService infoService, Client client) {
|
||||
super(settings);
|
||||
String lowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "85%");
|
||||
String highWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "90%");
|
||||
@ -136,9 +186,11 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark);
|
||||
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark);
|
||||
this.includeRelocations = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true);
|
||||
this.rerouteInterval = settings.getAsTime(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, TimeValue.timeValueSeconds(60));
|
||||
|
||||
this.enabled = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true);
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
infoService.addListener(new DiskListener(client));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -206,9 +258,9 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
if (includeRelocations) {
|
||||
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
|
||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("usage without relocations: {}", usage);
|
||||
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("usage without relocations: {}", usage);
|
||||
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
|
||||
}
|
||||
usage = usageIncludingRelocations;
|
||||
}
|
||||
@ -348,9 +400,9 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
|
||||
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
|
||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("usage without relocations: {}", usage);
|
||||
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("usage without relocations: {}", usage);
|
||||
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
|
||||
}
|
||||
usage = usageIncludingRelocations;
|
||||
}
|
||||
|
@ -87,7 +87,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
|
||||
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
|
||||
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
|
||||
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS);
|
||||
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
|
||||
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, Validator.TIME_NON_NEGATIVE);
|
||||
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME_NON_NEGATIVE);
|
||||
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
|
||||
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
|
||||
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
|
||||
|
@ -56,6 +56,25 @@ public class FsStats implements Iterable<FsStats.Info>, Streamable, ToXContent {
|
||||
double diskQueue = -1;
|
||||
double diskServiceTime = -1;
|
||||
|
||||
public Info() {
|
||||
}
|
||||
|
||||
public Info(String path, @Nullable String mount, @Nullable String dev, long total, long free, long available, long diskReads,
|
||||
long diskWrites, long diskReadBytes, long diskWriteBytes, double diskQueue, double diskServiceTime) {
|
||||
this.path = path;
|
||||
this.mount = mount;
|
||||
this.dev = dev;
|
||||
this.total = total;
|
||||
this.free = free;
|
||||
this.available = available;
|
||||
this.diskReads = diskReads;
|
||||
this.diskWrites = diskWrites;
|
||||
this.diskReadBytes = diskReadBytes;
|
||||
this.diskWriteBytes = diskWriteBytes;
|
||||
this.diskQueue = diskQueue;
|
||||
this.diskServiceTime = diskServiceTime;
|
||||
}
|
||||
|
||||
static public Info readInfoFrom(StreamInput in) throws IOException {
|
||||
Info i = new Info();
|
||||
i.readFrom(in);
|
||||
@ -286,7 +305,7 @@ public class FsStats implements Iterable<FsStats.Info>, Streamable, ToXContent {
|
||||
|
||||
}
|
||||
|
||||
FsStats(long timestamp, Info[] infos) {
|
||||
public FsStats(long timestamp, Info[] infos) {
|
||||
this.timestamp = timestamp;
|
||||
this.infos = infos;
|
||||
this.total = null;
|
||||
|
@ -82,6 +82,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
@ -273,6 +278,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
@ -324,6 +334,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
strategy = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.concurrent_recoveries", 10)
|
||||
@ -524,6 +539,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
@ -586,6 +606,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
@ -685,6 +710,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||
logger.info("--> calling fake getClusterInfo");
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Listener listener) {
|
||||
// noop
|
||||
}
|
||||
};
|
||||
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
|
@ -0,0 +1,203 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static com.google.common.collect.Lists.newArrayList;
|
||||
import static com.google.common.collect.Maps.newHashMap;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
|
||||
public class MockDiskUsagesTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return ImmutableSettings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// Use the mock internal cluster info service, which has fake-able disk usages
|
||||
.put(ClusterModule.CLUSTER_SERVICE_IMPL, MockInternalClusterInfoService.class.getName())
|
||||
// Update more frequently
|
||||
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "2s")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
//@TestLogging("org.elasticsearch.cluster:TRACE,org.elasticsearch.cluster.routing.allocation.decider:TRACE")
|
||||
public void testRerouteOccursOnDiskpassingHighWatermark() throws Exception {
|
||||
List<String> nodes = internalCluster().startNodesAsync(3).get();
|
||||
|
||||
// Wait for all 3 nodes to be up
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
|
||||
assertThat(resp.getNodes().length, equalTo(3));
|
||||
}
|
||||
});
|
||||
|
||||
// Start with all nodes at 50% usage
|
||||
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
|
||||
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
|
||||
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), 100, 50));
|
||||
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), 100, 50));
|
||||
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), 100, 50));
|
||||
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
|
||||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "20b")
|
||||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "10b")
|
||||
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, "1s")).get();
|
||||
|
||||
// Create an index with 10 shards so we can check allocation for it
|
||||
prepareCreate("test").setSettings(settingsBuilder()
|
||||
.put("number_of_shards", 10)
|
||||
.put("number_of_replicas", 0)
|
||||
.put("index.routing.allocation.exclude._name", "")).get();
|
||||
ensureGreen("test");
|
||||
|
||||
// Block until the "fake" cluster info is retrieved at least once
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterInfo info = cis.getClusterInfo();
|
||||
logger.info("--> got: {} nodes", info.getNodeDiskUsages().size());
|
||||
assertThat(info.getNodeDiskUsages().size(), greaterThan(0));
|
||||
}
|
||||
});
|
||||
|
||||
List<String> realNodeNames = newArrayList();
|
||||
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
|
||||
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
|
||||
while (iter.hasNext()) {
|
||||
RoutingNode node = iter.next();
|
||||
realNodeNames.add(node.nodeId());
|
||||
logger.info("--> node {} has {} shards",
|
||||
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
|
||||
}
|
||||
|
||||
// Update the disk usages so one node has now passed the high watermark
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), 100, 50));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), 100, 50));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), 100, 0)); // nothing free on node3
|
||||
|
||||
// Cluster info gathering interval is 2 seconds, give reroute 2 seconds to kick in
|
||||
Thread.sleep(4000);
|
||||
|
||||
// Retrieve the count of shards on each node
|
||||
resp = client().admin().cluster().prepareState().get();
|
||||
iter = resp.getState().getRoutingNodes().iterator();
|
||||
Map<String, Integer> nodesToShardCount = newHashMap();
|
||||
while (iter.hasNext()) {
|
||||
RoutingNode node = iter.next();
|
||||
logger.info("--> node {} has {} shards",
|
||||
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
|
||||
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
|
||||
}
|
||||
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
|
||||
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
|
||||
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
|
||||
}
|
||||
|
||||
/** Create a fake NodeStats for the given node and usage */
|
||||
public static NodeStats makeStats(String nodeName, DiskUsage usage) {
|
||||
FsStats.Info[] infos = new FsStats.Info[1];
|
||||
FsStats.Info info = new FsStats.Info("/path.data", null, null,
|
||||
usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes(), -1, -1, -1, -1, -1, -1);
|
||||
infos[0] = info;
|
||||
FsStats fsStats = new FsStats(System.currentTimeMillis(), infos);
|
||||
return new NodeStats(new DiscoveryNode(nodeName, null, Version.V_2_0_0),
|
||||
System.currentTimeMillis(),
|
||||
null, null, null, null, null, null,
|
||||
fsStats,
|
||||
null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fake ClusterInfoService class that allows updating the nodes stats disk
|
||||
* usage with fake values
|
||||
*/
|
||||
public static class MockInternalClusterInfoService extends InternalClusterInfoService {
|
||||
|
||||
private final ClusterName clusterName;
|
||||
private volatile NodeStats[] stats = new NodeStats[3];
|
||||
|
||||
@Inject
|
||||
public MockInternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService,
|
||||
TransportNodesStatsAction transportNodesStatsAction,
|
||||
TransportIndicesStatsAction transportIndicesStatsAction,
|
||||
ClusterService clusterService, ThreadPool threadPool) {
|
||||
super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool);
|
||||
this.clusterName = ClusterName.clusterNameFromSettings(settings);
|
||||
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", 100, 100));
|
||||
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", 100, 100));
|
||||
stats[2] = makeStats("node_t3", new DiskUsage("node_t3", 100, 100));
|
||||
}
|
||||
|
||||
public void setN1Usage(String nodeName, DiskUsage newUsage) {
|
||||
stats[0] = makeStats(nodeName, newUsage);
|
||||
}
|
||||
|
||||
public void setN2Usage(String nodeName, DiskUsage newUsage) {
|
||||
stats[1] = makeStats(nodeName, newUsage);
|
||||
}
|
||||
|
||||
public void setN3Usage(String nodeName, DiskUsage newUsage) {
|
||||
stats[2] = makeStats(nodeName, newUsage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
|
||||
NodesStatsResponse response = new NodesStatsResponse(clusterName, stats);
|
||||
listener.onResponse(response);
|
||||
return new CountDownLatch(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
|
||||
// Not used, so noop
|
||||
return new CountDownLatch(0);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user