Switch indices read-only if a node runs out of disk space (#25541)
Today when we run out of disk all kinds of crazy things can happen and nodes are becoming hard to maintain once out of disk is hit. While we try to move shards away if we hit watermarks this might not be possible in many situations. Based on the discussion in #24299 this change monitors disk utilization and adds a flood-stage watermark that causes all indices that are allocated on a node hitting the flood-stage mark to be switched read-only (with the option to be deleted). This allows users to react on the low disk situation while subsequent write requests will be rejected. Users can switch individual indices read-write once the situation is sorted out. There is no automatic read-write switch once the node has enough space. This requires user interaction. The flood-stage watermark is set to `95%` utilization by default. Closes #24299
This commit is contained in:
parent
26de905f1e
commit
6e5cc424a8
|
@ -27,15 +27,4 @@ public interface ClusterInfoService {
|
||||||
|
|
||||||
/** The latest cluster information */
|
/** The latest cluster information */
|
||||||
ClusterInfo getClusterInfo();
|
ClusterInfo getClusterInfo();
|
||||||
|
|
||||||
/** Add a listener that will be called every time new information is gathered */
|
|
||||||
void addListener(Listener listener);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Interface for listeners to implement in order to perform actions when
|
|
||||||
* new information about the cluster has been gathered
|
|
||||||
*/
|
|
||||||
interface Listener {
|
|
||||||
void onNewInfo(ClusterInfo info);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,9 +36,4 @@ public class EmptyClusterInfoService extends AbstractComponent implements Cluste
|
||||||
public ClusterInfo getClusterInfo() {
|
public ClusterInfo getClusterInfo() {
|
||||||
return ClusterInfo.EMPTY;
|
return ClusterInfo.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// no-op, no new info is ever gathered, so adding listeners is useless
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* InternalClusterInfoService provides the ClusterInfoService interface,
|
* InternalClusterInfoService provides the ClusterInfoService interface,
|
||||||
|
@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final NodeClient client;
|
private final NodeClient client;
|
||||||
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
private final Consumer<ClusterInfo> listener;
|
||||||
|
|
||||||
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
|
||||||
|
Consumer<ClusterInfo> listener) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
|
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||||
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
|
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||||
|
@ -109,6 +111,7 @@ public class InternalClusterInfoService extends AbstractComponent
|
||||||
this.clusterService.addLocalNodeMasterListener(this);
|
this.clusterService.addLocalNodeMasterListener(this);
|
||||||
// Add to listen for state changes (when nodes are added)
|
// Add to listen for state changes (when nodes are added)
|
||||||
this.clusterService.addListener(this);
|
this.clusterService.addListener(this);
|
||||||
|
this.listener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setEnabled(boolean enabled) {
|
private void setEnabled(boolean enabled) {
|
||||||
|
@ -201,11 +204,6 @@ public class InternalClusterInfoService extends AbstractComponent
|
||||||
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
|
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
this.listeners.add(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class used to submit {@link #maybeRefresh()} on the
|
* Class used to submit {@link #maybeRefresh()} on the
|
||||||
* {@link InternalClusterInfoService} threadpool, these jobs will
|
* {@link InternalClusterInfoService} threadpool, these jobs will
|
||||||
|
@ -362,21 +360,17 @@ public class InternalClusterInfoService extends AbstractComponent
|
||||||
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
|
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
|
||||||
}
|
}
|
||||||
ClusterInfo clusterInfo = getClusterInfo();
|
ClusterInfo clusterInfo = getClusterInfo();
|
||||||
for (Listener l : listeners) {
|
|
||||||
try {
|
try {
|
||||||
l.onNewInfo(clusterInfo);
|
listener.accept(clusterInfo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.info("Failed executing ClusterInfoService listener", e);
|
logger.info("Failed executing ClusterInfoService listener", e);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
|
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
|
||||||
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
|
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
|
||||||
MetaData meta = state.getMetaData();
|
|
||||||
for (ShardStats s : stats) {
|
for (ShardStats s : stats) {
|
||||||
IndexMetaData indexMeta = meta.index(s.getShardRouting().index());
|
|
||||||
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
|
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
|
||||||
long size = s.getStats().getStore().sizeInBytes();
|
long size = s.getStats().getStore().sizeInBytes();
|
||||||
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
|
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
|
||||||
|
|
|
@ -19,18 +19,23 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation;
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.ObjectLookupContainer;
|
import com.carrotsearch.hppc.ObjectLookupContainer;
|
||||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterInfo;
|
import org.elasticsearch.cluster.ClusterInfo;
|
||||||
import org.elasticsearch.cluster.ClusterInfoService;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.DiskUsage;
|
import org.elasticsearch.cluster.DiskUsage;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
|
@ -40,21 +45,19 @@ import org.elasticsearch.common.util.set.Sets;
|
||||||
* reroute if it does. Also responsible for logging about nodes that have
|
* reroute if it does. Also responsible for logging about nodes that have
|
||||||
* passed the disk watermarks
|
* passed the disk watermarks
|
||||||
*/
|
*/
|
||||||
public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener {
|
public class DiskThresholdMonitor extends AbstractComponent {
|
||||||
private final DiskThresholdSettings diskThresholdSettings;
|
private final DiskThresholdSettings diskThresholdSettings;
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
|
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
|
||||||
|
private final Supplier<ClusterState> clusterStateSupplier;
|
||||||
private long lastRunNS;
|
private long lastRunNS;
|
||||||
|
|
||||||
// TODO: remove injection when ClusterInfoService is not injected
|
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
|
||||||
@Inject
|
Client client) {
|
||||||
public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings,
|
|
||||||
ClusterInfoService infoService, Client client) {
|
|
||||||
super(settings);
|
super(settings);
|
||||||
|
this.clusterStateSupplier = clusterStateSupplier;
|
||||||
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
|
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
infoService.addListener(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,7 +65,10 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
|
||||||
*/
|
*/
|
||||||
private void warnAboutDiskIfNeeded(DiskUsage usage) {
|
private void warnAboutDiskIfNeeded(DiskUsage usage) {
|
||||||
// Check absolute disk values
|
// Check absolute disk values
|
||||||
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
|
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) {
|
||||||
|
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
|
||||||
|
diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage);
|
||||||
|
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
|
||||||
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
|
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
|
||||||
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
|
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
|
||||||
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
|
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
|
||||||
|
@ -72,6 +78,9 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
|
||||||
|
|
||||||
// Check percentage disk values
|
// Check percentage disk values
|
||||||
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
|
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
|
||||||
|
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage);
|
||||||
|
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
|
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
|
||||||
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
|
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
|
||||||
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
|
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
|
||||||
|
@ -80,7 +89,7 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNewInfo(ClusterInfo info) {
|
public void onNewInfo(ClusterInfo info) {
|
||||||
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
|
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
|
||||||
if (usages != null) {
|
if (usages != null) {
|
||||||
|
@ -95,12 +104,21 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
|
||||||
nodeHasPassedWatermark.remove(node);
|
nodeHasPassedWatermark.remove(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ClusterState state = clusterStateSupplier.get();
|
||||||
|
Set<String> indicesToMarkReadOnly = new HashSet<>();
|
||||||
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
|
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
|
||||||
String node = entry.key;
|
String node = entry.key;
|
||||||
DiskUsage usage = entry.value;
|
DiskUsage usage = entry.value;
|
||||||
warnAboutDiskIfNeeded(usage);
|
warnAboutDiskIfNeeded(usage);
|
||||||
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
|
||||||
|
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
|
||||||
|
RoutingNode routingNode = state.getRoutingNodes().node(node);
|
||||||
|
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
|
||||||
|
for (ShardRouting routing : routingNode) {
|
||||||
|
indicesToMarkReadOnly.add(routing.index().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
||||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||||
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
|
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
|
||||||
lastRunNS = System.nanoTime();
|
lastRunNS = System.nanoTime();
|
||||||
|
@ -136,9 +154,23 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
|
||||||
}
|
}
|
||||||
if (reroute) {
|
if (reroute) {
|
||||||
logger.info("rerouting shards: [{}]", explanation);
|
logger.info("rerouting shards: [{}]", explanation);
|
||||||
|
reroute();
|
||||||
|
}
|
||||||
|
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
|
||||||
|
if (indicesToMarkReadOnly.isEmpty() == false) {
|
||||||
|
markIndicesReadOnly(indicesToMarkReadOnly);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
|
||||||
|
// set read-only block but don't block on the response
|
||||||
|
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
|
||||||
|
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void reroute() {
|
||||||
// Execute an empty reroute, but don't block on the response
|
// Execute an empty reroute, but don't block on the response
|
||||||
client.admin().cluster().prepareReroute().execute();
|
client.admin().cluster().prepareReroute().execute();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,10 @@ public class DiskThresholdSettings {
|
||||||
new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%",
|
new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%",
|
||||||
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
|
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
|
||||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||||
|
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING =
|
||||||
|
new Setting<>("cluster.routing.allocation.disk.watermark.floodstage", "95%",
|
||||||
|
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.floodstage"),
|
||||||
|
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||||
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
|
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
|
||||||
Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
|
Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
|
||||||
Setting.Property.Dynamic, Setting.Property.NodeScope);;
|
Setting.Property.Dynamic, Setting.Property.NodeScope);;
|
||||||
|
@ -58,17 +62,23 @@ public class DiskThresholdSettings {
|
||||||
private volatile boolean includeRelocations;
|
private volatile boolean includeRelocations;
|
||||||
private volatile boolean enabled;
|
private volatile boolean enabled;
|
||||||
private volatile TimeValue rerouteInterval;
|
private volatile TimeValue rerouteInterval;
|
||||||
|
private volatile String floodStageRaw;
|
||||||
|
private volatile Double freeDiskThresholdFloodStage;
|
||||||
|
private volatile ByteSizeValue freeBytesThresholdFloodStage;
|
||||||
|
|
||||||
public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
|
public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
|
||||||
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
|
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
|
||||||
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
|
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
|
||||||
|
final String floodStage = CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.get(settings);
|
||||||
setHighWatermark(highWatermark);
|
setHighWatermark(highWatermark);
|
||||||
setLowWatermark(lowWatermark);
|
setLowWatermark(lowWatermark);
|
||||||
|
setFloodStageRaw(floodStage);
|
||||||
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
|
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
|
||||||
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
|
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
|
||||||
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
|
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
|
||||||
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStageRaw);
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
|
||||||
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
|
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
|
||||||
|
@ -99,7 +109,15 @@ public class DiskThresholdSettings {
|
||||||
this.highWatermarkRaw = highWatermark;
|
this.highWatermarkRaw = highWatermark;
|
||||||
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
|
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
|
||||||
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
|
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
|
||||||
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
|
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setFloodStageRaw(String floodStageRaw) {
|
||||||
|
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
||||||
|
this.floodStageRaw = floodStageRaw;
|
||||||
|
this.freeDiskThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw);
|
||||||
|
this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(floodStageRaw,
|
||||||
|
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -132,6 +150,18 @@ public class DiskThresholdSettings {
|
||||||
return freeBytesThresholdHigh;
|
return freeBytesThresholdHigh;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Double getFreeDiskThresholdFloodStage() {
|
||||||
|
return freeDiskThresholdFloodStage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteSizeValue getFreeBytesThresholdFloodStage() {
|
||||||
|
return freeBytesThresholdFloodStage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFloodStageRaw() {
|
||||||
|
return floodStageRaw;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean includeRelocations() {
|
public boolean includeRelocations() {
|
||||||
return includeRelocations;
|
return includeRelocations;
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,6 +199,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
|
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
|
||||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
|
||||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
|
||||||
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING,
|
||||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
|
||||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
|
||||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
|
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.update.UpdateHelper;
|
||||||
import org.elasticsearch.bootstrap.BootstrapCheck;
|
import org.elasticsearch.bootstrap.BootstrapCheck;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.node.NodeClient;
|
import org.elasticsearch.client.node.NodeClient;
|
||||||
|
import org.elasticsearch.cluster.ClusterInfo;
|
||||||
import org.elasticsearch.cluster.ClusterInfoService;
|
import org.elasticsearch.cluster.ClusterInfoService;
|
||||||
import org.elasticsearch.cluster.ClusterModule;
|
import org.elasticsearch.cluster.ClusterModule;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||||
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
|
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingService;
|
import org.elasticsearch.cluster.routing.RoutingService;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.StopWatch;
|
import org.elasticsearch.common.StopWatch;
|
||||||
import org.elasticsearch.common.component.Lifecycle;
|
import org.elasticsearch.common.component.Lifecycle;
|
||||||
|
@ -225,6 +227,7 @@ public class Node implements Closeable {
|
||||||
private final NodeClient client;
|
private final NodeClient client;
|
||||||
private final Collection<LifecycleComponent> pluginLifecycleComponents;
|
private final Collection<LifecycleComponent> pluginLifecycleComponents;
|
||||||
private final LocalNodeFactory localNodeFactory;
|
private final LocalNodeFactory localNodeFactory;
|
||||||
|
private final NodeService nodeService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a node with the given settings.
|
* Constructs a node with the given settings.
|
||||||
|
@ -333,7 +336,10 @@ public class Node implements Closeable {
|
||||||
resourcesToClose.add(clusterService);
|
resourcesToClose.add(clusterService);
|
||||||
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
|
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
|
||||||
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
||||||
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
|
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
|
||||||
|
clusterService.getClusterSettings(), client);
|
||||||
|
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
|
||||||
|
listener::onNewInfo);
|
||||||
final UsageService usageService = new UsageService(settings);
|
final UsageService usageService = new UsageService(settings);
|
||||||
|
|
||||||
ModulesBuilder modules = new ModulesBuilder();
|
ModulesBuilder modules = new ModulesBuilder();
|
||||||
|
@ -342,7 +348,6 @@ public class Node implements Closeable {
|
||||||
modules.add(pluginModule);
|
modules.add(pluginModule);
|
||||||
}
|
}
|
||||||
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
|
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
|
||||||
modules.add(new NodeModule(this, monitorService));
|
|
||||||
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
|
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
|
||||||
pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
|
pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
|
||||||
modules.add(clusterModule);
|
modules.add(clusterModule);
|
||||||
|
@ -438,10 +443,11 @@ public class Node implements Closeable {
|
||||||
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
|
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
|
||||||
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
|
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
|
||||||
clusterModule.getAllocationService());
|
clusterModule.getAllocationService());
|
||||||
NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
||||||
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
|
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
|
||||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
|
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
|
||||||
modules.add(b -> {
|
modules.add(b -> {
|
||||||
|
b.bind(Node.class).toInstance(this);
|
||||||
b.bind(NodeService.class).toInstance(nodeService);
|
b.bind(NodeService.class).toInstance(nodeService);
|
||||||
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
|
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
|
||||||
b.bind(PluginsService.class).toInstance(pluginsService);
|
b.bind(PluginsService.class).toInstance(pluginsService);
|
||||||
|
@ -590,7 +596,7 @@ public class Node implements Closeable {
|
||||||
injector.getInstance(SnapshotShardsService.class).start();
|
injector.getInstance(SnapshotShardsService.class).start();
|
||||||
injector.getInstance(RoutingService.class).start();
|
injector.getInstance(RoutingService.class).start();
|
||||||
injector.getInstance(SearchService.class).start();
|
injector.getInstance(SearchService.class).start();
|
||||||
injector.getInstance(MonitorService.class).start();
|
nodeService.getMonitorService().start();
|
||||||
|
|
||||||
final ClusterService clusterService = injector.getInstance(ClusterService.class);
|
final ClusterService clusterService = injector.getInstance(ClusterService.class);
|
||||||
|
|
||||||
|
@ -704,7 +710,7 @@ public class Node implements Closeable {
|
||||||
injector.getInstance(RoutingService.class).stop();
|
injector.getInstance(RoutingService.class).stop();
|
||||||
injector.getInstance(ClusterService.class).stop();
|
injector.getInstance(ClusterService.class).stop();
|
||||||
injector.getInstance(NodeConnectionsService.class).stop();
|
injector.getInstance(NodeConnectionsService.class).stop();
|
||||||
injector.getInstance(MonitorService.class).stop();
|
nodeService.getMonitorService().stop();
|
||||||
injector.getInstance(GatewayService.class).stop();
|
injector.getInstance(GatewayService.class).stop();
|
||||||
injector.getInstance(SearchService.class).stop();
|
injector.getInstance(SearchService.class).stop();
|
||||||
injector.getInstance(TransportService.class).stop();
|
injector.getInstance(TransportService.class).stop();
|
||||||
|
@ -737,7 +743,7 @@ public class Node implements Closeable {
|
||||||
toClose.add(() -> stopWatch.start("tribe"));
|
toClose.add(() -> stopWatch.start("tribe"));
|
||||||
toClose.add(injector.getInstance(TribeService.class));
|
toClose.add(injector.getInstance(TribeService.class));
|
||||||
toClose.add(() -> stopWatch.stop().start("node_service"));
|
toClose.add(() -> stopWatch.stop().start("node_service"));
|
||||||
toClose.add(injector.getInstance(NodeService.class));
|
toClose.add(nodeService);
|
||||||
toClose.add(() -> stopWatch.stop().start("http"));
|
toClose.add(() -> stopWatch.stop().start("http"));
|
||||||
if (NetworkModule.HTTP_ENABLED.get(settings)) {
|
if (NetworkModule.HTTP_ENABLED.get(settings)) {
|
||||||
toClose.add(injector.getInstance(HttpServerTransport.class));
|
toClose.add(injector.getInstance(HttpServerTransport.class));
|
||||||
|
@ -762,7 +768,7 @@ public class Node implements Closeable {
|
||||||
toClose.add(() -> stopWatch.stop().start("discovery"));
|
toClose.add(() -> stopWatch.stop().start("discovery"));
|
||||||
toClose.add(injector.getInstance(Discovery.class));
|
toClose.add(injector.getInstance(Discovery.class));
|
||||||
toClose.add(() -> stopWatch.stop().start("monitor"));
|
toClose.add(() -> stopWatch.stop().start("monitor"));
|
||||||
toClose.add(injector.getInstance(MonitorService.class));
|
toClose.add(nodeService.getMonitorService());
|
||||||
toClose.add(() -> stopWatch.stop().start("gateway"));
|
toClose.add(() -> stopWatch.stop().start("gateway"));
|
||||||
toClose.add(injector.getInstance(GatewayService.class));
|
toClose.add(injector.getInstance(GatewayService.class));
|
||||||
toClose.add(() -> stopWatch.stop().start("search"));
|
toClose.add(() -> stopWatch.stop().start("search"));
|
||||||
|
@ -917,8 +923,8 @@ public class Node implements Closeable {
|
||||||
|
|
||||||
/** Constructs a ClusterInfoService which may be mocked for tests. */
|
/** Constructs a ClusterInfoService which may be mocked for tests. */
|
||||||
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
|
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
|
||||||
ThreadPool threadPool, NodeClient client) {
|
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
|
||||||
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
|
return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
|
private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.node;
|
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
|
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
|
||||||
import org.elasticsearch.monitor.MonitorService;
|
|
||||||
|
|
||||||
public class NodeModule extends AbstractModule {
|
|
||||||
|
|
||||||
private final Node node;
|
|
||||||
private final MonitorService monitorService;
|
|
||||||
|
|
||||||
public NodeModule(Node node, MonitorService monitorService) {
|
|
||||||
this.node = node;
|
|
||||||
this.monitorService = monitorService;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void configure() {
|
|
||||||
bind(Node.class).toInstance(node);
|
|
||||||
bind(MonitorService.class).toInstance(monitorService);
|
|
||||||
bind(DiskThresholdMonitor.class).asEagerSingleton();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.node;
|
package org.elasticsearch.node;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.Build;
|
import org.elasticsearch.Build;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||||
|
@ -121,9 +122,13 @@ public class NodeService extends AbstractComponent implements Closeable {
|
||||||
return ingestService;
|
return ingestService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MonitorService getMonitorService() {
|
||||||
|
return monitorService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
indicesService.close();
|
IOUtils.close(indicesService);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.cluster.ClusterInfo;
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.DiskUsage;
|
||||||
|
import org.elasticsearch.cluster.ESAllocationTestCase;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||||
|
|
||||||
|
public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
|
|
||||||
|
public void testMarkFloodStageIndicesReadOnly() {
|
||||||
|
AllocationService allocation = createAllocationService(Settings.builder()
|
||||||
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
|
||||||
|
Settings settings = Settings.EMPTY;
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
|
||||||
|
.put("index.routing.allocation.require._id", "node2")).numberOfShards(1).numberOfReplicas(0))
|
||||||
|
.put(IndexMetaData.builder("test_1").settings(settings(Version.CURRENT)
|
||||||
|
.put("index.routing.allocation.require._id", "node1")).numberOfShards(1).numberOfReplicas(0))
|
||||||
|
.put(IndexMetaData.builder("test_2").settings(settings(Version.CURRENT)
|
||||||
|
.put("index.routing.allocation.require._id", "node1")).numberOfShards(1).numberOfReplicas(0))
|
||||||
|
.build();
|
||||||
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
.addAsNew(metaData.index("test"))
|
||||||
|
.addAsNew(metaData.index("test_1"))
|
||||||
|
.addAsNew(metaData.index("test_2"))
|
||||||
|
|
||||||
|
.build();
|
||||||
|
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||||
|
.metaData(metaData).routingTable(routingTable).build();
|
||||||
|
logger.info("adding two nodes and performing rerouting");
|
||||||
|
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))
|
||||||
|
.add(newNode("node2"))).build();
|
||||||
|
clusterState = allocation.reroute(clusterState, "reroute");
|
||||||
|
logger.info("start primary shard");
|
||||||
|
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||||
|
ClusterState finalState = clusterState;
|
||||||
|
AtomicBoolean reroute = new AtomicBoolean(false);
|
||||||
|
AtomicReference<Set<String>> indices = new AtomicReference<>();
|
||||||
|
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
|
||||||
|
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
|
||||||
|
@Override
|
||||||
|
protected void reroute() {
|
||||||
|
assertTrue(reroute.compareAndSet(false, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
|
||||||
|
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
|
||||||
|
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||||
|
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
|
||||||
|
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||||
|
assertFalse(reroute.get());
|
||||||
|
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
|
||||||
|
|
||||||
|
indices.set(null);
|
||||||
|
builder = ImmutableOpenMap.builder();
|
||||||
|
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||||
|
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
|
||||||
|
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||||
|
assertTrue(reroute.get());
|
||||||
|
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test_2")).settings(Settings.builder()
|
||||||
|
.put(clusterState.metaData()
|
||||||
|
.index("test_2").getSettings())
|
||||||
|
.put(IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();
|
||||||
|
|
||||||
|
// now we mark one index as read-only and assert that we don't mark it as such again
|
||||||
|
final ClusterState anotherFinalClusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
|
||||||
|
.put(clusterState.metaData().index("test"), false)
|
||||||
|
.put(clusterState.metaData().index("test_1"), false)
|
||||||
|
.put(indexMetaData, true).build())
|
||||||
|
.blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build();
|
||||||
|
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
|
||||||
|
|
||||||
|
monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
|
||||||
|
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
|
||||||
|
@Override
|
||||||
|
protected void reroute() {
|
||||||
|
assertTrue(reroute.compareAndSet(false, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
|
||||||
|
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
indices.set(null);
|
||||||
|
reroute.set(false);
|
||||||
|
builder = ImmutableOpenMap.builder();
|
||||||
|
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||||
|
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
|
||||||
|
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||||
|
assertTrue(reroute.get());
|
||||||
|
assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,10 +46,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
public void testInitializingHasExpectedSize() {
|
public void testInitializingHasExpectedSize() {
|
||||||
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
|
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
|
||||||
AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
|
AllocationService strategy = createAllocationService(Settings.EMPTY, () -> new ClusterInfo() {
|
||||||
@Override
|
|
||||||
public ClusterInfo getClusterInfo() {
|
|
||||||
return new ClusterInfo() {
|
|
||||||
@Override
|
@Override
|
||||||
public Long getShardSize(ShardRouting shardRouting) {
|
public Long getShardSize(ShardRouting shardRouting) {
|
||||||
if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) {
|
if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) {
|
||||||
|
@ -57,12 +54,6 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.info("Building initial routing table");
|
logger.info("Building initial routing table");
|
||||||
|
@ -101,10 +92,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
||||||
|
|
||||||
public void testExpectedSizeOnMove() {
|
public void testExpectedSizeOnMove() {
|
||||||
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
|
final long byteSize = randomIntBetween(0, Integer.MAX_VALUE);
|
||||||
final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() {
|
final AllocationService allocation = createAllocationService(Settings.EMPTY, () -> new ClusterInfo() {
|
||||||
@Override
|
|
||||||
public ClusterInfo getClusterInfo() {
|
|
||||||
return new ClusterInfo() {
|
|
||||||
@Override
|
@Override
|
||||||
public Long getShardSize(ShardRouting shardRouting) {
|
public Long getShardSize(ShardRouting shardRouting) {
|
||||||
if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) {
|
if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) {
|
||||||
|
@ -112,12 +100,6 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
logger.info("creating an index with 1 shard, no replica");
|
logger.info("creating an index with 1 shard, no replica");
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
|
|
|
@ -57,21 +57,13 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase {
|
||||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
.build(),
|
.build(),
|
||||||
new ClusterInfoService() {
|
() -> new ClusterInfo() {
|
||||||
@Override
|
|
||||||
public ClusterInfo getClusterInfo() {
|
|
||||||
return new ClusterInfo() {
|
|
||||||
@Override
|
@Override
|
||||||
public Long getShardSize(ShardRouting shardRouting) {
|
public Long getShardSize(ShardRouting shardRouting) {
|
||||||
if (shardRouting.getIndexName().equals("test")) {
|
if (shardRouting.getIndexName().equals("test")) {
|
||||||
return sizes[shardRouting.getId()];
|
return sizes[shardRouting.getId()];
|
||||||
}
|
}
|
||||||
return null; }
|
return null;
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
logger.info("Building initial routing table");
|
logger.info("Building initial routing table");
|
||||||
|
|
|
@ -104,11 +104,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
AllocationService strategy = new AllocationService(Settings.builder()
|
AllocationService strategy = new AllocationService(Settings.builder()
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
|
@ -292,11 +287,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
AllocationService strategy = new AllocationService(Settings.builder()
|
AllocationService strategy = new AllocationService(Settings.builder()
|
||||||
|
@ -350,11 +340,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo2;
|
return clusterInfo2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
strategy = new AllocationService(Settings.builder()
|
strategy = new AllocationService(Settings.builder()
|
||||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||||
|
@ -548,11 +533,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
AllocationService strategy = new AllocationService(Settings.builder()
|
AllocationService strategy = new AllocationService(Settings.builder()
|
||||||
|
@ -620,11 +600,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
AllocationService strategy = new AllocationService(Settings.builder()
|
AllocationService strategy = new AllocationService(Settings.builder()
|
||||||
|
@ -726,11 +701,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
AllocationService strategy = new AllocationService(Settings.builder()
|
AllocationService strategy = new AllocationService(Settings.builder()
|
||||||
|
@ -914,11 +884,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
// noop
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
|
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
|
||||||
new SameShardAllocationDecider(
|
new SameShardAllocationDecider(
|
||||||
|
@ -1015,10 +980,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
logger.info("--> calling fake getClusterInfo");
|
logger.info("--> calling fake getClusterInfo");
|
||||||
return clusterInfo;
|
return clusterInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addListener(Listener listener) {
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
|
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
||||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%"))
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%"))
|
||||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%"))
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%"))
|
||||||
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), randomFrom("0b", "100%"))
|
||||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get();
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get();
|
||||||
// Create an index with 10 shards so we can check allocation for it
|
// Create an index with 10 shards so we can check allocation for it
|
||||||
prepareCreate("test").setSettings(Settings.builder()
|
prepareCreate("test").setSettings(Settings.builder()
|
||||||
|
|
|
@ -28,6 +28,28 @@ file or updated dynamically on a live cluster with the
|
||||||
relocate shards once less than the configured amount of space is available on
|
relocate shards once less than the configured amount of space is available on
|
||||||
the node.
|
the node.
|
||||||
|
|
||||||
|
`cluster.routing.allocation.disk.watermark.floodstage`::
|
||||||
|
|
||||||
|
Controls the floodstage watermark. It defaults to 95%, meaning ES enforce a read-only
|
||||||
|
index block (`index.blocks.read_only_allow_delete`) on every index that has
|
||||||
|
one or more shards allocated on the node that has at least on disk exceeding the floodstage.
|
||||||
|
This is a last resort to prevent nodes from running out of disk space.
|
||||||
|
The index block must be released manually once there is enough disk space available
|
||||||
|
to allow indexing operations to continue.
|
||||||
|
|
||||||
|
An example of resetting the read-only index block on the `twitter` index:
|
||||||
|
|
||||||
|
[source,js]
|
||||||
|
--------------------------------------------------
|
||||||
|
PUT /twitter/_settings
|
||||||
|
{
|
||||||
|
"index.blocks.read_only_allow_delete": null
|
||||||
|
}
|
||||||
|
--------------------------------------------------
|
||||||
|
// CONSOLE
|
||||||
|
// TEST[setup:twitter]
|
||||||
|
|
||||||
|
|
||||||
NOTE: Percentage values refer to used disk space, while byte values refer to
|
NOTE: Percentage values refer to used disk space, while byte values refer to
|
||||||
free disk space. This can be confusing, since it flips the meaning of high and
|
free disk space. This can be confusing, since it flips the meaning of high and
|
||||||
low. For example, it makes sense to set the low watermark to 10gb and the high
|
low. For example, it makes sense to set the low watermark to 10gb and the high
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
@ -69,8 +70,9 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
||||||
null, null, null);
|
null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
|
||||||
super(settings, clusterService, threadPool, client);
|
Consumer<ClusterInfo> listener) {
|
||||||
|
super(settings, clusterService, threadPool, client, listener);
|
||||||
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
|
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
|
||||||
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
|
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
|
||||||
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
|
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.node;
|
package org.elasticsearch.node;
|
||||||
|
|
||||||
import org.elasticsearch.client.node.NodeClient;
|
import org.elasticsearch.client.node.NodeClient;
|
||||||
|
import org.elasticsearch.cluster.ClusterInfo;
|
||||||
import org.elasticsearch.cluster.ClusterInfoService;
|
import org.elasticsearch.cluster.ClusterInfoService;
|
||||||
import org.elasticsearch.cluster.MockInternalClusterInfoService;
|
import org.elasticsearch.cluster.MockInternalClusterInfoService;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -47,6 +48,7 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,11 +126,11 @@ public class MockNode extends Node {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
|
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
|
||||||
ThreadPool threadPool, NodeClient client) {
|
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listener) {
|
||||||
if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
|
if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
|
||||||
return super.newClusterInfoService(settings, clusterService, threadPool, client);
|
return super.newClusterInfoService(settings, clusterService, threadPool, client, listener);
|
||||||
} else {
|
} else {
|
||||||
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
|
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client, listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue