Auto-release flood-stage write block (#42559)
If a node exceeds the flood-stage disk watermark then we add a block to all of its indices to prevent further writes as a last-ditch attempt to prevent the node completely exhausting its disk space. However today this block remains in place until manually removed, and this block is a source of confusion for users who current have ample disk space and did not even realise they nearly ran out at some point in the past. This commit changes our behaviour to automatically remove this block when a node drops below the high watermark again. The expectation is that the high watermark is some distance below the flood-stage watermark and therefore the disk space problem is truly resolved. Fixes #39334
This commit is contained in:
parent
a869342910
commit
cd304c4def
|
@ -91,3 +91,22 @@ _cluster/health?wait_for_no_relocating_shards` APIs would return only once all
|
|||
pending reroutes have completed too, but starting in version 7.4 if you want to
|
||||
wait for the rerouting process to completely finish you should add the
|
||||
`wait_for_events=languid` query parameter when calling these APIs.
|
||||
|
||||
[float]
|
||||
[[breaking_74_allocation_changes]]
|
||||
=== Allocation changes
|
||||
|
||||
[float]
|
||||
==== Auto-release of read-only-allow-delete block
|
||||
|
||||
If a node exceeds the flood-stage disk watermark then we add a block to all of
|
||||
its indices to prevent further writes as a last-ditch attempt to prevent the
|
||||
node completely exhausting its disk space. In earlier versions this block would
|
||||
remain in place until manually removed, causing confusion for users who
|
||||
currently have ample disk space and are not aware that they nearly ran out at
|
||||
some point in the past. From 7.4 onwards the block is automatically removed
|
||||
when a node drops below the high watermark again, with the expectation that the
|
||||
high watermark is some distance below the flood-stage watermark and therefore
|
||||
the disk space problem is truly resolved. This behaviour can be disabled by
|
||||
setting the system property `es.disk.auto_release_flood_stage_block` to
|
||||
`false`.
|
||||
|
|
|
@ -40,8 +40,10 @@ Elasticsearch enforces a read-only index block
|
|||
(`index.blocks.read_only_allow_delete`) on every index that has one or more
|
||||
shards allocated on the node that has at least one disk exceeding the flood
|
||||
stage. This is a last resort to prevent nodes from running out of disk space.
|
||||
The index block must be released manually once there is enough disk space
|
||||
available to allow indexing operations to continue.
|
||||
The index block is automatically released once the disk utilization falls below
|
||||
the high watermark.
|
||||
The automatic release can however be disabled in 7.x through a system property
|
||||
`es.disk.auto_release_flood_stage_block`
|
||||
|
||||
NOTE: You can not mix the usage of percentage values and byte values within
|
||||
these settings. Either all are set to percentage values, or all are set to byte
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.carrotsearch.hppc.ObjectLookupContainer;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -33,10 +35,12 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RerouteService;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
|
@ -47,6 +51,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* Listens for a node to go over the high watermark and kicks off an empty
|
||||
|
@ -65,6 +71,7 @@ public class DiskThresholdMonitor {
|
|||
private final RerouteService rerouteService;
|
||||
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
|
||||
private final AtomicBoolean checkInProgress = new AtomicBoolean();
|
||||
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
|
||||
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
|
||||
Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
|
||||
|
@ -73,6 +80,10 @@ public class DiskThresholdMonitor {
|
|||
this.rerouteService = rerouteService;
|
||||
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
|
||||
this.client = client;
|
||||
if (diskThresholdSettings.isAutoReleaseIndexEnabled() == false) {
|
||||
deprecationLogger.deprecated("[{}] will be removed in version {}",
|
||||
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -136,21 +147,33 @@ public class DiskThresholdMonitor {
|
|||
}
|
||||
final ClusterState state = clusterStateSupplier.get();
|
||||
final Set<String> indicesToMarkReadOnly = new HashSet<>();
|
||||
RoutingNodes routingNodes = state.getRoutingNodes();
|
||||
Set<String> indicesNotToAutoRelease = new HashSet<>();
|
||||
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);
|
||||
|
||||
for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
|
||||
final String node = entry.key;
|
||||
final DiskUsage usage = entry.value;
|
||||
warnAboutDiskIfNeeded(usage);
|
||||
RoutingNode routingNode = routingNodes.node(node);
|
||||
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
|
||||
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
|
||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
|
||||
final RoutingNode routingNode = state.getRoutingNodes().node(node);
|
||||
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
|
||||
for (ShardRouting routing : routingNode) {
|
||||
indicesToMarkReadOnly.add(routing.index().getName());
|
||||
String indexName = routing.index().getName();
|
||||
indicesToMarkReadOnly.add(indexName);
|
||||
indicesNotToAutoRelease.add(indexName);
|
||||
}
|
||||
}
|
||||
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||
if (routingNode != null) {
|
||||
for (ShardRouting routing : routingNode) {
|
||||
String indexName = routing.index().getName();
|
||||
indicesNotToAutoRelease.add(indexName);
|
||||
}
|
||||
}
|
||||
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
|
||||
reroute = true;
|
||||
explanation = "high disk watermark exceeded on one or more nodes";
|
||||
|
@ -182,7 +205,7 @@ public class DiskThresholdMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
|
||||
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3);
|
||||
|
||||
if (reroute) {
|
||||
logger.info("rerouting shards: [{}]", explanation);
|
||||
|
@ -197,30 +220,70 @@ public class DiskThresholdMonitor {
|
|||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting()
|
||||
.spliterator(), false)
|
||||
.map(c -> c.key)
|
||||
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
|
||||
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
|
||||
if (indicesToMarkReadOnly.isEmpty() == false) {
|
||||
markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> {
|
||||
setLastRunTimeMillis();
|
||||
listener.onResponse(r);
|
||||
}, e -> {
|
||||
logger.debug("marking indices readonly failed", e);
|
||||
setLastRunTimeMillis();
|
||||
listener.onFailure(e);
|
||||
}));
|
||||
if (indicesToAutoRelease.isEmpty() == false) {
|
||||
if (diskThresholdSettings.isAutoReleaseIndexEnabled()) {
|
||||
logger.info("releasing read-only-allow-delete block on indices: [{}]", indicesToAutoRelease);
|
||||
updateIndicesReadOnly(indicesToAutoRelease, listener, false);
|
||||
} else {
|
||||
deprecationLogger.deprecated("[{}] will be removed in version {}",
|
||||
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1);
|
||||
logger.debug("[{}] disabled, not releasing read-only-allow-delete block on indices: [{}]",
|
||||
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, indicesToAutoRelease);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
|
||||
if (indicesToMarkReadOnly.isEmpty() == false) {
|
||||
updateIndicesReadOnly(indicesToMarkReadOnly, listener, true);
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
|
||||
Set<String> indicesToMarkIneligibleForAutoRelease) {
|
||||
for (RoutingNode routingNode : routingNodes) {
|
||||
if (usages.containsKey(routingNode.nodeId()) == false) {
|
||||
if (routingNode != null) {
|
||||
for (ShardRouting routing : routingNode) {
|
||||
String indexName = routing.index().getName();
|
||||
indicesToMarkIneligibleForAutoRelease.add(indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void setLastRunTimeMillis() {
|
||||
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
|
||||
}
|
||||
|
||||
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
|
||||
// set read-only block but don't block on the response
|
||||
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
|
||||
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
|
||||
.execute(ActionListener.map(listener, r -> null));
|
||||
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
|
||||
setLastRunTimeMillis();
|
||||
listener.onResponse(r);
|
||||
}, e -> {
|
||||
logger.debug(new ParameterizedMessage("setting indices [{}] read-only failed", readOnly), e);
|
||||
setLastRunTimeMillis();
|
||||
listener.onFailure(e);
|
||||
});
|
||||
Settings readOnlySettings = readOnly ? Settings.builder()
|
||||
.put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()).build() :
|
||||
Settings.builder().putNull(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE).build();
|
||||
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY))
|
||||
.setSettings(readOnlySettings)
|
||||
.execute(ActionListener.map(wrappedListener, r -> null));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,6 +72,20 @@ public class DiskThresholdSettings {
|
|||
private volatile TimeValue rerouteInterval;
|
||||
private volatile Double freeDiskThresholdFloodStage;
|
||||
private volatile ByteSizeValue freeBytesThresholdFloodStage;
|
||||
private static final boolean autoReleaseIndexEnabled;
|
||||
public static final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";
|
||||
|
||||
static {
|
||||
final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY);
|
||||
if (property == null) {
|
||||
autoReleaseIndexEnabled = true;
|
||||
} else if (Boolean.FALSE.toString().equals(property)){
|
||||
autoReleaseIndexEnabled = false;
|
||||
} else {
|
||||
throw new IllegalArgumentException(AUTO_RELEASE_INDEX_ENABLED_KEY + " may only be unset or set to [false] but was [" +
|
||||
property + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
|
||||
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
|
||||
|
@ -286,6 +300,10 @@ public class DiskThresholdSettings {
|
|||
return freeBytesThresholdFloodStage;
|
||||
}
|
||||
|
||||
public boolean isAutoReleaseIndexEnabled() {
|
||||
return autoReleaseIndexEnabled;
|
||||
}
|
||||
|
||||
public boolean includeRelocations() {
|
||||
return includeRelocations;
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
|
@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
|
@ -51,7 +53,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
public void testMarkFloodStageIndicesReadOnly() {
|
||||
AllocationService allocation = createAllocationService(Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
|
||||
Settings settings = Settings.EMPTY;
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
|
||||
.put("index.routing.allocation.require._id", "node2")).numberOfShards(1).numberOfReplicas(0))
|
||||
|
@ -65,28 +66,25 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
.addAsNew(metaData.index("test_1"))
|
||||
.addAsNew(metaData.index("test_2"))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metaData(metaData).routingTable(routingTable).build();
|
||||
logger.info("adding two nodes and performing rerouting");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1"))
|
||||
.add(newNode("node2"))).build();
|
||||
clusterState = allocation.reroute(clusterState, "reroute");
|
||||
logger.info("start primary shard");
|
||||
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
|
||||
ClusterState finalState = clusterState;
|
||||
final ClusterState clusterState = applyStartedShardsUntilNoChange(
|
||||
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metaData(metaData).routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation);
|
||||
AtomicBoolean reroute = new AtomicBoolean(false);
|
||||
AtomicReference<Set<String>> indices = new AtomicReference<>();
|
||||
AtomicLong currentTime = new AtomicLong();
|
||||
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
|
||||
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
|
||||
(reason, priority, listener) -> {
|
||||
assertTrue(reroute.compareAndSet(false, true));
|
||||
assertThat(priority, equalTo(Priority.HIGH));
|
||||
listener.onResponse(null);
|
||||
}) {
|
||||
|
||||
@Override
|
||||
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
|
||||
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
|
||||
assertTrue(readOnly);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
};
|
||||
|
@ -119,7 +117,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
.blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build();
|
||||
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
|
||||
|
||||
monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
|
||||
monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> anotherFinalClusterState,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
|
||||
(reason, priority, listener) -> {
|
||||
assertTrue(reroute.compareAndSet(false, true));
|
||||
|
@ -127,8 +125,9 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
listener.onResponse(null);
|
||||
}) {
|
||||
@Override
|
||||
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
|
||||
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
|
||||
assertTrue(readOnly);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
};
|
||||
|
@ -156,15 +155,15 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
assertTrue(listenerReference.compareAndSet(null, listener));
|
||||
}) {
|
||||
@Override
|
||||
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
|
||||
throw new AssertionError("unexpected");
|
||||
}
|
||||
};
|
||||
|
||||
final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
|
||||
allDisksOkBuilder = ImmutableOpenMap.builder();
|
||||
allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50));
|
||||
allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50));
|
||||
allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 50));
|
||||
allDisksOkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50));
|
||||
final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();
|
||||
|
||||
final ImmutableOpenMap.Builder<String, DiskUsage> oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder();
|
||||
|
@ -226,4 +225,144 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
|||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
assertNull(listenerReference.get());
|
||||
}
|
||||
|
||||
public void testAutoReleaseIndices() {
|
||||
AtomicReference<Set<String>> indicesToMarkReadOnly = new AtomicReference<>();
|
||||
AtomicReference<Set<String>> indicesToRelease = new AtomicReference<>();
|
||||
AllocationService allocation = createAllocationService(Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsNew(metaData.index("test_1"))
|
||||
.addAsNew(metaData.index("test_2"))
|
||||
.build();
|
||||
final ClusterState clusterState = applyStartedShardsUntilNoChange(
|
||||
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metaData(metaData).routingTable(routingTable)
|
||||
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation);
|
||||
assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));
|
||||
|
||||
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
|
||||
(reason, priority, listener) -> {
|
||||
assertNotNull(listener);
|
||||
assertThat(priority, equalTo(Priority.HIGH));
|
||||
listener.onResponse(null);
|
||||
}) {
|
||||
@Override
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
|
||||
if (readOnly) {
|
||||
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
|
||||
} else {
|
||||
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
};
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// Change cluster state so that "test_2" index is blocked (read only)
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test_2")).settings(Settings.builder()
|
||||
.put(clusterState.metaData()
|
||||
.index("test_2").getSettings())
|
||||
.put(IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();
|
||||
|
||||
ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
|
||||
.put(indexMetaData, true).build())
|
||||
.blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build();
|
||||
|
||||
assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
|
||||
monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterStateWithBlocks,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
|
||||
(reason, priority, listener) -> {
|
||||
assertNotNull(listener);
|
||||
assertThat(priority, equalTo(Priority.HIGH));
|
||||
listener.onResponse(null);
|
||||
}) {
|
||||
@Override
|
||||
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
|
||||
if (readOnly) {
|
||||
assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate));
|
||||
} else {
|
||||
assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate));
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
};
|
||||
// When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// When free disk on node1 and node2 goes above 10% high watermark, then only release index block
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertThat(indicesToRelease.get(), contains("test_2"));
|
||||
|
||||
// When no usage information is present for node2, we don't release the block
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// When disk usage on one node is between the high and flood-stage watermarks, nothing changes
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 100)));
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// When disk usage on one node is missing and the other is below the high watermark, nothing changes
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 100)));
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// When disk usage on one node is missing and the other is above the flood-stage watermark, affected indices are blocked
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,9 @@ public class DiskThresholdSettingsTests extends ESTestCase {
|
|||
assertEquals(60L, diskThresholdSettings.getRerouteInterval().seconds());
|
||||
assertTrue(diskThresholdSettings.isEnabled());
|
||||
assertTrue(diskThresholdSettings.includeRelocations());
|
||||
assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdFloodStage());
|
||||
assertEquals(5.0D, diskThresholdSettings.getFreeDiskThresholdFloodStage(), 0.0D);
|
||||
assertTrue(diskThresholdSettings.isAutoReleaseIndexEnabled());
|
||||
}
|
||||
|
||||
public void testUpdate() {
|
||||
|
|
|
@ -19,15 +19,21 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.cluster.ClusterInfo;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.DiskUsage;
|
||||
import org.elasticsearch.cluster.MockInternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
|
@ -35,12 +41,15 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class MockDiskUsagesIT extends ESIntegTestCase {
|
||||
|
@ -133,4 +142,91 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
|||
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
|
||||
});
|
||||
}
|
||||
|
||||
public void testAutomaticReleaseOfIndexBlock() throws Exception {
|
||||
List<String> nodes = internalCluster().startNodes(3);
|
||||
|
||||
// Wait for all 3 nodes to be up
|
||||
assertBusy(() -> {
|
||||
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
|
||||
assertThat(resp.getNodes().size(), equalTo(3));
|
||||
});
|
||||
|
||||
// Start with all nodes at 50% usage
|
||||
final MockInternalClusterInfoService cis = (MockInternalClusterInfoService)
|
||||
internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName());
|
||||
cis.setUpdateFrequency(TimeValue.timeValueMillis(100));
|
||||
cis.onMaster();
|
||||
cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50));
|
||||
cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50));
|
||||
cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50));
|
||||
|
||||
final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "15b" : "85%")
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%")
|
||||
.put(
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
|
||||
watermarkBytes ? "5b" : "95%")
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms")).get();
|
||||
// Create an index with 6 shards so we can check allocation for it
|
||||
prepareCreate("test").setSettings(Settings.builder()
|
||||
.put("number_of_shards", 6)
|
||||
.put("number_of_replicas", 0)).get();
|
||||
ensureGreen("test");
|
||||
|
||||
// Block until the "fake" cluster info is retrieved at least once
|
||||
assertBusy(() -> {
|
||||
ClusterInfo info = cis.getClusterInfo();
|
||||
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
|
||||
});
|
||||
|
||||
final List<String> realNodeNames = new ArrayList<>();
|
||||
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
|
||||
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
|
||||
while (iter.hasNext()) {
|
||||
RoutingNode node = iter.next();
|
||||
realNodeNames.add(node.nodeId());
|
||||
logger.info("--> node {} has {} shards",
|
||||
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
|
||||
}
|
||||
|
||||
client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
|
||||
assertSearchHits(client().prepareSearch().get(), "1");
|
||||
|
||||
// Block all nodes so that re-balancing does not occur (BalancedShardsAllocator)
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 3));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 3));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 3));
|
||||
|
||||
// Wait until index "test" is blocked
|
||||
assertBusy(() -> {
|
||||
assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"),
|
||||
IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
|
||||
});
|
||||
|
||||
// Cannot add further documents
|
||||
assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"),
|
||||
IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
|
||||
assertSearchHits(client().prepareSearch().get(), "1");
|
||||
|
||||
// Update the disk usages so all nodes are back under the high and flood watermarks
|
||||
cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 11));
|
||||
cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 11));
|
||||
cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 11));
|
||||
|
||||
// Attempt to create a new document until DiskUsageMonitor unblocks the index
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
client().prepareIndex("test", "doc", "3").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
|
||||
} catch (ClusterBlockException e) {
|
||||
throw new AssertionError("retrying", e);
|
||||
}
|
||||
});
|
||||
assertSearchHits(client().prepareSearch().get(), "1", "3");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue