mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
Account for remaining recovery in disk allocator (#58800)
Today the disk-based shard allocator accounts for incoming shards by subtracting the estimated size of the incoming shard from the free space on the node. This is an overly conservative estimate if the incoming shard has almost finished its recovery since in that case it is already consuming most of the disk space it needs. This change adds to the shard stats a measure of how much larger each store is expected to grow, computed from the ongoing recovery, and uses this to account for the disk usage of incoming shards more accurately. Backport of #58029 to 7.x * Picky picky * Missing type
This commit is contained in:
parent
27d52d4d23
commit
3a234d2669
@ -245,6 +245,18 @@ Total size of all shards assigned to the node.
|
||||
`size_in_bytes`::
|
||||
(integer)
|
||||
Total size, in bytes, of all shards assigned to the node.
|
||||
|
||||
`reserved`::
|
||||
(<<byte-units,byte value>>)
|
||||
A prediction of how much larger the shard stores on this node will eventually
|
||||
grow due to ongoing peer recoveries, restoring snapshots, and similar
|
||||
activities. A value of `-1b` indicates that this is not available.
|
||||
|
||||
`reserved_in_bytes`::
|
||||
(integer)
|
||||
A prediction, in bytes, of how much larger the shard stores on this node will
|
||||
eventually grow due to ongoing peer recoveries, restoring snapshots, and
|
||||
similar activities. A value of `-1` indicates that this is not available.
|
||||
=======
|
||||
|
||||
`indexing`::
|
||||
|
@ -231,6 +231,17 @@ Total size of all shards assigned to selected nodes.
|
||||
`size_in_bytes`::
|
||||
(integer)
|
||||
Total size, in bytes, of all shards assigned to selected nodes.
|
||||
|
||||
`reserved`::
|
||||
(<<byte-units,byte value>>)
|
||||
A prediction of how much larger the shard stores will eventually grow due to
|
||||
ongoing peer recoveries, restoring snapshots, and similar activities.
|
||||
|
||||
`reserved_in_bytes`::
|
||||
(integer)
|
||||
A prediction, in bytes, of how much larger the shard stores will eventually
|
||||
grow due to ongoing peer recoveries, restoring snapshots, and similar
|
||||
activities.
|
||||
=====
|
||||
|
||||
`fielddata`::
|
||||
@ -1135,7 +1146,9 @@ The API returns the following response:
|
||||
},
|
||||
"store": {
|
||||
"size": "16.2kb",
|
||||
"size_in_bytes": 16684
|
||||
"size_in_bytes": 16684,
|
||||
"reserved": "0b",
|
||||
"reserved_in_bytes": 0
|
||||
},
|
||||
"fielddata": {
|
||||
"memory_size": "0b",
|
||||
|
@ -0,0 +1,22 @@
|
||||
---
|
||||
"Store stats":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: "reserved_in_bytes field is not returned in prior versions"
|
||||
features: [arbitrary_key]
|
||||
|
||||
- do:
|
||||
nodes.info:
|
||||
node_id: _master
|
||||
- set:
|
||||
nodes._arbitrary_key_: master
|
||||
|
||||
- do:
|
||||
nodes.stats:
|
||||
metric: [ indices ]
|
||||
index_metric: [ store ]
|
||||
|
||||
- is_false: nodes.$master.discovery
|
||||
- is_true: nodes.$master.indices.store
|
||||
- gte: { nodes.$master.indices.store.size_in_bytes: 0 }
|
||||
- gte: { nodes.$master.indices.store.reserved_in_bytes: -1 }
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
@ -162,6 +163,8 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
||||
IndexService indexService = indicesService.indexService(shard.index());
|
||||
IndexShard indexShard = indexService.getShardOrNull(shard.id());
|
||||
assertEquals(indexShard.shardPath().getRootDataPath().toString(), dataPath);
|
||||
|
||||
assertTrue(info.getReservedSpace(nodeId, dataPath).containsShardId(shard.shardId()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -232,6 +235,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
||||
assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0));
|
||||
assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(0));
|
||||
assertThat(info.shardSizes.size(), equalTo(0));
|
||||
assertThat(info.reservedSpace.size(), equalTo(0));
|
||||
|
||||
// check we recover
|
||||
blockingActionFilter.blockActions();
|
||||
@ -242,5 +246,10 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
||||
assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(2));
|
||||
assertThat(info.shardSizes.size(), greaterThan(0));
|
||||
|
||||
RoutingTable routingTable = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState().routingTable();
|
||||
for (ShardRouting shard : routingTable.allShards()) {
|
||||
assertTrue(info.getReservedSpace(shard.currentNodeId(), info.getDataPath(shard)).containsShardId(shard.shardId()));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -83,7 +83,9 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.NodeIndicesStats;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.flush.SyncedFlushUtil;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
|
||||
@ -146,6 +148,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
@ -1834,4 +1837,59 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
|
||||
internalCluster().startNode();
|
||||
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
|
||||
String indexName = "test-index";
|
||||
createIndex(indexName, Settings.builder()
|
||||
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
|
||||
.put("index.routing.allocation.include._name", String.join(",", dataNodes)).build());
|
||||
ensureGreen(indexName);
|
||||
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
|
||||
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("foo", "bar"))
|
||||
.collect(Collectors.toList());
|
||||
indexRandom(randomBoolean(), true, true, indexRequests);
|
||||
assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0));
|
||||
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
DiscoveryNode nodeWithPrimary = clusterState.nodes().get(clusterState.routingTable()
|
||||
.index(indexName).shard(0).primaryShard().currentNodeId());
|
||||
MockTransportService transportService = (MockTransportService) internalCluster()
|
||||
.getInstance(TransportService.class, nodeWithPrimary.getName());
|
||||
|
||||
final AtomicBoolean fileInfoIntercepted = new AtomicBoolean();
|
||||
final AtomicBoolean fileChunkIntercepted = new AtomicBoolean();
|
||||
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||
if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
|
||||
if (fileInfoIntercepted.compareAndSet(false, true)) {
|
||||
final NodeIndicesStats nodeIndicesStats = client().admin().cluster().prepareNodesStats(connection.getNode().getId())
|
||||
.clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)).get().getNodes().get(0).getIndices();
|
||||
assertThat(nodeIndicesStats.getStore().getReservedSize().getBytes(), equalTo(0L));
|
||||
assertThat(nodeIndicesStats.getShardStats(clusterState.metadata().index(indexName).getIndex())
|
||||
.stream().flatMap(s -> Arrays.stream(s.getShards())).map(s -> s.getStats().getStore().getReservedSize().getBytes())
|
||||
.collect(Collectors.toList()),
|
||||
everyItem(equalTo(StoreStats.UNKNOWN_RESERVED_BYTES)));
|
||||
}
|
||||
} else if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
|
||||
if (fileChunkIntercepted.compareAndSet(false, true)) {
|
||||
assertThat(client().admin().cluster().prepareNodesStats(connection.getNode().getId()).clear()
|
||||
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)).get().getNodes().get(0)
|
||||
.getIndices().getStore().getReservedSize().getBytes(),
|
||||
greaterThan(0L));
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
|
||||
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
|
||||
ensureGreen();
|
||||
assertTrue(fileInfoIntercepted.get());
|
||||
assertTrue(fileChunkIntercepted.get());
|
||||
|
||||
assertThat(client().admin().cluster().prepareNodesStats().get().getNodes().stream()
|
||||
.mapToLong(n -> n.getIndices().getStore().getReservedSize().getBytes()).sum(), equalTo(0L));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,8 @@
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectHashSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
@ -29,9 +31,13 @@ import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
|
||||
* and a map of shard ids to shard sizes, see
|
||||
@ -44,9 +50,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
final ImmutableOpenMap<String, Long> shardSizes;
|
||||
public static final ClusterInfo EMPTY = new ClusterInfo();
|
||||
final ImmutableOpenMap<ShardRouting, String> routingToDataPath;
|
||||
final ImmutableOpenMap<NodeAndPath, ReservedSpace> reservedSpace;
|
||||
|
||||
protected ClusterInfo() {
|
||||
this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
|
||||
this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -56,15 +63,18 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
|
||||
* @param shardSizes a shardkey to size in bytes mapping per shard.
|
||||
* @param routingToDataPath the shard routing to datapath mapping
|
||||
* @param reservedSpace reserved space per shard broken down by node and data path
|
||||
* @see #shardIdentifierFromRouting
|
||||
*/
|
||||
public ClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes,
|
||||
ImmutableOpenMap<ShardRouting, String> routingToDataPath) {
|
||||
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes,
|
||||
ImmutableOpenMap<ShardRouting, String> routingToDataPath,
|
||||
ImmutableOpenMap<NodeAndPath, ReservedSpace> reservedSpace) {
|
||||
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
|
||||
this.shardSizes = shardSizes;
|
||||
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
|
||||
this.routingToDataPath = routingToDataPath;
|
||||
this.reservedSpace = reservedSpace;
|
||||
}
|
||||
|
||||
public ClusterInfo(StreamInput in) throws IOException {
|
||||
@ -72,6 +82,12 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
Map<String, DiskUsage> mostMap = in.readMap(StreamInput::readString, DiskUsage::new);
|
||||
Map<String, Long> sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong);
|
||||
Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);
|
||||
Map<NodeAndPath, ReservedSpace> reservedSpaceMap;
|
||||
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
|
||||
reservedSpaceMap = in.readMap(NodeAndPath::new, ReservedSpace::new);
|
||||
} else {
|
||||
reservedSpaceMap = org.elasticsearch.common.collect.Map.of();
|
||||
}
|
||||
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> leastBuilder = ImmutableOpenMap.builder();
|
||||
this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build();
|
||||
@ -81,6 +97,8 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
this.shardSizes = sizeBuilder.putAll(sizeMap).build();
|
||||
ImmutableOpenMap.Builder<ShardRouting, String> routingBuilder = ImmutableOpenMap.builder();
|
||||
this.routingToDataPath = routingBuilder.putAll(routingMap).build();
|
||||
ImmutableOpenMap.Builder<NodeAndPath, ReservedSpace> reservedSpaceBuilder = ImmutableOpenMap.builder();
|
||||
this.reservedSpace = reservedSpaceBuilder.putAll(reservedSpaceMap).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -109,6 +127,14 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
c.key.writeTo(out);
|
||||
out.writeString(c.value);
|
||||
}
|
||||
|
||||
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
|
||||
out.writeVInt(this.reservedSpace.size());
|
||||
for (ObjectObjectCursor<NodeAndPath, ReservedSpace> c : this.reservedSpace) {
|
||||
c.key.writeTo(out);
|
||||
c.value.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
@ -144,11 +170,23 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
}
|
||||
}
|
||||
builder.endObject(); // end "shard_paths"
|
||||
builder.startArray("reserved_sizes"); {
|
||||
for (ObjectObjectCursor<NodeAndPath, ReservedSpace> c : this.reservedSpace) {
|
||||
builder.startObject(); {
|
||||
builder.field("node_id", c.key.nodeId);
|
||||
builder.field("path", c.key.path);
|
||||
c.value.toXContent(builder, params);
|
||||
}
|
||||
builder.endObject(); // NodeAndPath
|
||||
}
|
||||
}
|
||||
builder.endArray(); // end "reserved_sizes"
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
|
||||
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
|
||||
*/
|
||||
public ImmutableOpenMap<String, DiskUsage> getNodeLeastAvailableDiskUsages() {
|
||||
return this.leastAvailableSpaceUsage;
|
||||
@ -156,6 +194,7 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
|
||||
/**
|
||||
* Returns a node id to disk usage mapping for the path that has the most available space on the node.
|
||||
* Note that this does not take account of reserved space: there may be another path with more available _and unreserved_ space.
|
||||
*/
|
||||
public ImmutableOpenMap<String, DiskUsage> getNodeMostAvailableDiskUsages() {
|
||||
return this.mostAvailableSpaceUsage;
|
||||
@ -183,6 +222,14 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
return shardSize == null ? defaultValue : shardSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the reserved space for each shard on the given node/path pair
|
||||
*/
|
||||
public ReservedSpace getReservedSpace(String nodeId, String dataPath) {
|
||||
final ReservedSpace result = reservedSpace.get(new NodeAndPath(nodeId, dataPath));
|
||||
return result == null ? ReservedSpace.EMPTY : result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that incorporates the ShardId for the shard into a string that
|
||||
* includes a 'p' or 'r' depending on whether the shard is a primary.
|
||||
@ -190,4 +237,126 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
|
||||
static String shardIdentifierFromRouting(ShardRouting shardRouting) {
|
||||
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a data path on a node
|
||||
*/
|
||||
public static class NodeAndPath {
|
||||
public final String nodeId;
|
||||
public final String path;
|
||||
|
||||
public NodeAndPath(String nodeId, String path) {
|
||||
this.nodeId = Objects.requireNonNull(nodeId);
|
||||
this.path = Objects.requireNonNull(path);
|
||||
}
|
||||
|
||||
public NodeAndPath(StreamInput in) throws IOException {
|
||||
this.nodeId = in.readString();
|
||||
this.path = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
NodeAndPath that = (NodeAndPath) o;
|
||||
return nodeId.equals(that.nodeId) && path.equals(that.path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(nodeId, path);
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(nodeId);
|
||||
out.writeString(path);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the total amount of "reserved" space on a particular data path, together with the set of shards considered.
|
||||
*/
|
||||
public static class ReservedSpace {
|
||||
|
||||
public static final ReservedSpace EMPTY = new ReservedSpace(0, new ObjectHashSet<>());
|
||||
|
||||
private final long total;
|
||||
private final ObjectHashSet<ShardId> shardIds;
|
||||
|
||||
private ReservedSpace(long total, ObjectHashSet<ShardId> shardIds) {
|
||||
this.total = total;
|
||||
this.shardIds = shardIds;
|
||||
}
|
||||
|
||||
ReservedSpace(StreamInput in) throws IOException {
|
||||
total = in.readVLong();
|
||||
final int shardIdCount = in.readVInt();
|
||||
shardIds = new ObjectHashSet<>(shardIdCount);
|
||||
for (int i = 0; i < shardIdCount; i++) {
|
||||
shardIds.add(new ShardId(in));
|
||||
}
|
||||
}
|
||||
|
||||
void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(total);
|
||||
out.writeVInt(shardIds.size());
|
||||
for (ObjectCursor<ShardId> shardIdCursor : shardIds) {
|
||||
shardIdCursor.value.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
public long getTotal() {
|
||||
return total;
|
||||
}
|
||||
|
||||
public boolean containsShardId(ShardId shardId) {
|
||||
return shardIds.contains(shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ReservedSpace that = (ReservedSpace) o;
|
||||
return total == that.total &&
|
||||
shardIds.equals(that.shardIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(total, shardIds);
|
||||
}
|
||||
|
||||
void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("total", total);
|
||||
builder.startArray("shards"); {
|
||||
for (ObjectCursor<ShardId> shardIdCursor : shardIds) {
|
||||
shardIdCursor.value.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray(); // end "shards"
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private long total;
|
||||
private ObjectHashSet<ShardId> shardIds = new ObjectHashSet<>();
|
||||
|
||||
public ReservedSpace build() {
|
||||
assert shardIds != null : "already built";
|
||||
final ReservedSpace reservedSpace = new ReservedSpace(total, shardIds);
|
||||
shardIds = null;
|
||||
return reservedSpace;
|
||||
}
|
||||
|
||||
public Builder add(ShardId shardId, long reservedBytes) {
|
||||
assert shardIds != null : "already built";
|
||||
assert reservedBytes >= 0 : reservedBytes;
|
||||
shardIds.add(shardId);
|
||||
total += reservedBytes;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -44,11 +44,14 @@ import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -80,8 +83,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
|
||||
private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
|
||||
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
|
||||
private volatile ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
|
||||
private volatile ImmutableOpenMap<String, Long> shardSizes;
|
||||
private volatile IndicesStatsSummary indicesStatsSummary;
|
||||
private volatile boolean isMaster = false;
|
||||
private volatile boolean enabled;
|
||||
private volatile TimeValue fetchTimeout;
|
||||
@ -93,8 +95,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
||||
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||
this.shardRoutingToDataPath = ImmutableOpenMap.of();
|
||||
this.shardSizes = ImmutableOpenMap.of();
|
||||
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.client = client;
|
||||
@ -200,7 +201,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
|
||||
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
|
||||
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages,
|
||||
indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -321,12 +324,19 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<IndicesStatsResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
|
||||
ShardStats[] stats = indicesStatsResponse.getShards();
|
||||
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
|
||||
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
|
||||
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
|
||||
shardSizes = newShardSizes.build();
|
||||
shardRoutingToDataPath = newShardRoutingToDataPath.build();
|
||||
final ShardStats[] stats = indicesStatsResponse.getShards();
|
||||
final ImmutableOpenMap.Builder<String, Long> shardSizeByIdentifierBuilder = ImmutableOpenMap.builder();
|
||||
final ImmutableOpenMap.Builder<ShardRouting, String> dataPathByShardRoutingBuilder = ImmutableOpenMap.builder();
|
||||
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<>();
|
||||
buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);
|
||||
|
||||
final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> rsrvdSpace = ImmutableOpenMap.builder();
|
||||
reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));
|
||||
|
||||
indicesStatsSummary = new IndicesStatsSummary(
|
||||
shardSizeByIdentifierBuilder.build(),
|
||||
dataPathByShardRoutingBuilder.build(),
|
||||
rsrvdSpace.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -342,8 +352,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
|
||||
}
|
||||
// we empty the usages list, to be safe - we don't know what's going on.
|
||||
shardSizes = ImmutableOpenMap.of();
|
||||
shardRoutingToDataPath = ImmutableOpenMap.of();
|
||||
indicesStatsSummary = IndicesStatsSummary.EMPTY;
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -383,16 +392,27 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
listeners.add(clusterInfoConsumer);
|
||||
}
|
||||
|
||||
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
|
||||
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
|
||||
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> shardSizes,
|
||||
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath,
|
||||
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard) {
|
||||
for (ShardStats s : stats) {
|
||||
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
|
||||
long size = s.getStats().getStore().sizeInBytes();
|
||||
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("shard: {} size: {}", sid, size);
|
||||
final ShardRouting shardRouting = s.getShardRouting();
|
||||
newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
|
||||
|
||||
final StoreStats storeStats = s.getStats().getStore();
|
||||
final long size = storeStats.sizeInBytes();
|
||||
final long reserved = storeStats.getReservedSize().getBytes();
|
||||
|
||||
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
|
||||
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
|
||||
shardSizes.put(shardIdentifier, size);
|
||||
|
||||
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) {
|
||||
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(
|
||||
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()),
|
||||
t -> new ClusterInfo.ReservedSpace.Builder());
|
||||
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
|
||||
}
|
||||
newShardSizes.put(sid, size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -446,5 +466,21 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
}
|
||||
}
|
||||
|
||||
private static class IndicesStatsSummary {
|
||||
static final IndicesStatsSummary EMPTY
|
||||
= new IndicesStatsSummary(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
|
||||
|
||||
final ImmutableOpenMap<String, Long> shardSizes;
|
||||
final ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
|
||||
final ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace;
|
||||
|
||||
IndicesStatsSummary(ImmutableOpenMap<String, Long> shardSizes,
|
||||
ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath,
|
||||
ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace) {
|
||||
this.shardSizes = shardSizes;
|
||||
this.shardRoutingToDataPath = shardRoutingToDataPath;
|
||||
this.reservedSpace = reservedSpace;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -166,11 +166,11 @@ public class DiskThresholdMonitor {
|
||||
logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only",
|
||||
diskThresholdSettings.describeFloodStageThreshold(), usage);
|
||||
|
||||
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
nodesOverLowThreshold.add(node);
|
||||
nodesOverHighThreshold.add(node);
|
||||
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||
|
||||
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
|
||||
for (ShardRouting routing : routingNode) {
|
||||
@ -178,6 +178,18 @@ public class DiskThresholdMonitor {
|
||||
indicesNotToAutoRelease.add(indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final long reservedSpace = info.getReservedSpace(usage.getNodeId(), usage.getPath()).getTotal();
|
||||
final DiskUsage usageWithReservedSpace = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(),
|
||||
usage.getTotalBytes(), Math.max(0L, usage.getFreeBytes() - reservedSpace));
|
||||
|
||||
if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
|
||||
usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
|
||||
|
||||
nodesOverLowThreshold.add(node);
|
||||
nodesOverHighThreshold.add(node);
|
||||
|
||||
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
|
||||
reroute = true;
|
||||
explanation = "high disk watermark exceeded on one or more nodes";
|
||||
@ -189,8 +201,8 @@ public class DiskThresholdMonitor {
|
||||
node, diskThresholdSettings.getRerouteInterval());
|
||||
}
|
||||
|
||||
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
|
||||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
|
||||
} else if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
|
||||
usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
|
||||
|
||||
nodesOverHighThresholdAndRelocating.remove(node);
|
||||
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
|
||||
@ -98,9 +99,16 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||
*/
|
||||
public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo,
|
||||
Metadata metadata, RoutingTable routingTable) {
|
||||
long totalSize = 0L;
|
||||
// Account for reserved space wherever it is available
|
||||
final ClusterInfo.ReservedSpace reservedSpace = clusterInfo.getReservedSpace(node.nodeId(), dataPath);
|
||||
long totalSize = reservedSpace.getTotal();
|
||||
// NB this counts all shards on the node when the ClusterInfoService retrieved the node stats, which may include shards that are
|
||||
// no longer initializing because their recovery failed or was cancelled.
|
||||
|
||||
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
|
||||
// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
|
||||
final List<ShardRouting> initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING);
|
||||
initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId()));
|
||||
for (ShardRouting routing : initializingShards) {
|
||||
if (routing.relocatingNodeId() == null) {
|
||||
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
|
||||
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
|
||||
|
@ -372,7 +372,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
||||
long sum = 0;
|
||||
int count = 0;
|
||||
for (IndexShard indexShard : this) {
|
||||
sum += indexShard.store().stats().sizeInBytes();
|
||||
sum += indexShard.store().stats(0L).sizeInBytes();
|
||||
count++;
|
||||
}
|
||||
if (count == 0) {
|
||||
|
@ -236,7 +236,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
private final RetentionLeaseSyncer retentionLeaseSyncer;
|
||||
|
||||
@Nullable
|
||||
private RecoveryState recoveryState;
|
||||
private volatile RecoveryState recoveryState;
|
||||
|
||||
private final RecoveryStats recoveryStats = new RecoveryStats();
|
||||
private final MeanMetric refreshMetric = new MeanMetric();
|
||||
@ -1033,7 +1033,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
|
||||
public StoreStats storeStats() {
|
||||
try {
|
||||
return store.stats();
|
||||
final RecoveryState recoveryState = this.recoveryState;
|
||||
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
|
||||
return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover);
|
||||
} catch (IOException e) {
|
||||
failShard("Failing shard because of exception during storeStats", e);
|
||||
throw new ElasticsearchException("io exception while building 'store stats'", e);
|
||||
|
@ -266,7 +266,7 @@ public final class ShardPath {
|
||||
long maxUsableBytes = Long.MIN_VALUE;
|
||||
for (NodeEnvironment.NodePath nodePath : paths) {
|
||||
FileStore fileStore = nodePath.fileStore;
|
||||
long usableBytes = fileStore.getUsableSpace();
|
||||
long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries)
|
||||
assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes;
|
||||
|
||||
if (bestPath == null || usableBytes > maxUsableBytes) {
|
||||
|
@ -175,6 +175,7 @@ final class StoreRecovery {
|
||||
|
||||
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
|
||||
writer.addIndexes(sources);
|
||||
indexRecoveryStats.setFileDetailsComplete();
|
||||
if (split) {
|
||||
writer.deleteDocuments(new ShardSplittingQuery(indexMetadata, shardId, hasNested));
|
||||
}
|
||||
@ -414,14 +415,15 @@ final class StoreRecovery {
|
||||
writeEmptyRetentionLeasesFile(indexShard);
|
||||
}
|
||||
// since we recover from local, just fill the files and size
|
||||
final RecoveryState.Index index = recoveryState.getIndex();
|
||||
try {
|
||||
final RecoveryState.Index index = recoveryState.getIndex();
|
||||
if (si != null) {
|
||||
addRecoveredFileDetails(si, store, index);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to list file details", e);
|
||||
}
|
||||
index.setFileDetailsComplete();
|
||||
} else {
|
||||
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
@ -429,6 +431,7 @@ final class StoreRecovery {
|
||||
indexShard.getPendingPrimaryTerm());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
writeEmptyRetentionLeasesFile(indexShard);
|
||||
indexShard.recoveryState().getIndex().setFileDetailsComplete();
|
||||
}
|
||||
indexShard.openEngineAndRecoverFromTranslog();
|
||||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
|
||||
|
@ -355,9 +355,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
}
|
||||
}
|
||||
|
||||
public StoreStats stats() throws IOException {
|
||||
/**
|
||||
* @param reservedBytes a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
|
||||
*/
|
||||
public StoreStats stats(long reservedBytes) throws IOException {
|
||||
ensureOpen();
|
||||
return new StoreStats(directory.estimateSize());
|
||||
return new StoreStats(directory.estimateSize(), reservedBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -31,7 +31,16 @@ import java.io.IOException;
|
||||
|
||||
public class StoreStats implements Writeable, ToXContentFragment {
|
||||
|
||||
/**
|
||||
* Sentinel value for cases where the shard does not yet know its reserved size so we must fall back to an estimate, for instance
|
||||
* prior to receiving the list of files in a peer recovery.
|
||||
*/
|
||||
public static final long UNKNOWN_RESERVED_BYTES = -1L;
|
||||
|
||||
public static final Version RESERVED_BYTES_VERSION = Version.V_7_9_0;
|
||||
|
||||
private long sizeInBytes;
|
||||
private long reservedSize;
|
||||
|
||||
public StoreStats() {
|
||||
|
||||
@ -42,10 +51,21 @@ public class StoreStats implements Writeable, ToXContentFragment {
|
||||
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
|
||||
in.readVLong(); // throttleTimeInNanos
|
||||
}
|
||||
if (in.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
|
||||
reservedSize = in.readZLong();
|
||||
} else {
|
||||
reservedSize = UNKNOWN_RESERVED_BYTES;
|
||||
}
|
||||
}
|
||||
|
||||
public StoreStats(long sizeInBytes) {
|
||||
/**
|
||||
* @param sizeInBytes the size of the store in bytes
|
||||
* @param reservedSize a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
|
||||
*/
|
||||
public StoreStats(long sizeInBytes, long reservedSize) {
|
||||
assert reservedSize == UNKNOWN_RESERVED_BYTES || reservedSize >= 0 : reservedSize;
|
||||
this.sizeInBytes = sizeInBytes;
|
||||
this.reservedSize = reservedSize;
|
||||
}
|
||||
|
||||
public void add(StoreStats stats) {
|
||||
@ -53,8 +73,12 @@ public class StoreStats implements Writeable, ToXContentFragment {
|
||||
return;
|
||||
}
|
||||
sizeInBytes += stats.sizeInBytes;
|
||||
reservedSize = ignoreIfUnknown(reservedSize) + ignoreIfUnknown(stats.reservedSize);
|
||||
}
|
||||
|
||||
private static long ignoreIfUnknown(long reservedSize) {
|
||||
return reservedSize == UNKNOWN_RESERVED_BYTES ? 0L : reservedSize;
|
||||
}
|
||||
|
||||
public long sizeInBytes() {
|
||||
return sizeInBytes;
|
||||
@ -72,18 +96,31 @@ public class StoreStats implements Writeable, ToXContentFragment {
|
||||
return size();
|
||||
}
|
||||
|
||||
/**
|
||||
* A prediction of how much larger this store will eventually grow. For instance, if we are currently doing a peer recovery or restoring
|
||||
* a snapshot into this store then we can account for the rest of the recovery using this field. A value of {@code -1B} indicates that
|
||||
* the reserved size is unknown.
|
||||
*/
|
||||
public ByteSizeValue getReservedSize() {
|
||||
return new ByteSizeValue(reservedSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(sizeInBytes);
|
||||
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
|
||||
out.writeVLong(0L); // throttleTimeInNanos
|
||||
}
|
||||
if (out.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
|
||||
out.writeZLong(reservedSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.STORE);
|
||||
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, size());
|
||||
builder.humanReadableField(Fields.RESERVED_IN_BYTES, Fields.RESERVED, getReservedSize());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -92,5 +129,7 @@ public class StoreStats implements Writeable, ToXContentFragment {
|
||||
static final String STORE = "store";
|
||||
static final String SIZE = "size";
|
||||
static final String SIZE_IN_BYTES = "size_in_bytes";
|
||||
static final String RESERVED = "reserved";
|
||||
static final String RESERVED_IN_BYTES = "reserved_in_bytes";
|
||||
}
|
||||
}
|
||||
|
@ -247,6 +247,14 @@ public class NodeIndicesStats implements Writeable, ToXContentFragment {
|
||||
return statsMap;
|
||||
}
|
||||
|
||||
public List<IndexShardStats> getShardStats(Index index) {
|
||||
if (statsByShard == null) {
|
||||
return null;
|
||||
} else {
|
||||
return statsByShard.get(index);
|
||||
}
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String INDICES = "indices";
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -203,6 +204,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
getTranslog().start();
|
||||
break;
|
||||
case FINALIZE:
|
||||
assert getIndex().bytesStillToRecover() >= 0 : "moving to stage FINALIZE without completing file details";
|
||||
validateAndSetStage(Stage.TRANSLOG, stage);
|
||||
getTranslog().stop();
|
||||
break;
|
||||
@ -693,6 +695,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
public static class Index extends Timer implements ToXContentFragment, Writeable {
|
||||
|
||||
private final Map<String, File> fileDetails = new HashMap<>();
|
||||
private boolean fileDetailsComplete;
|
||||
|
||||
public static final long UNKNOWN = -1L;
|
||||
|
||||
@ -709,6 +712,15 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
File file = new File(in);
|
||||
fileDetails.put(file.name, file);
|
||||
}
|
||||
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
|
||||
fileDetailsComplete = in.readBoolean();
|
||||
} else {
|
||||
// This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not
|
||||
// then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete
|
||||
// so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path
|
||||
// anyway since they always use IndexShard#getRecoveryState which is never transported over the wire.
|
||||
fileDetailsComplete = fileDetails.isEmpty() == false;
|
||||
}
|
||||
sourceThrottlingInNanos = in.readLong();
|
||||
targetThrottleTimeInNanos = in.readLong();
|
||||
}
|
||||
@ -721,6 +733,9 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
for (File file : files) {
|
||||
file.writeTo(out);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
|
||||
out.writeBoolean(fileDetailsComplete);
|
||||
}
|
||||
out.writeLong(sourceThrottlingInNanos);
|
||||
out.writeLong(targetThrottleTimeInNanos);
|
||||
}
|
||||
@ -732,16 +747,22 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
public synchronized void reset() {
|
||||
super.reset();
|
||||
fileDetails.clear();
|
||||
fileDetailsComplete = false;
|
||||
sourceThrottlingInNanos = UNKNOWN;
|
||||
targetThrottleTimeInNanos = UNKNOWN;
|
||||
}
|
||||
|
||||
public synchronized void addFileDetail(String name, long length, boolean reused) {
|
||||
assert fileDetailsComplete == false : "addFileDetail for [" + name + "] when file details are already complete";
|
||||
File file = new File(name, length, reused);
|
||||
File existing = fileDetails.put(name, file);
|
||||
assert existing == null : "file [" + name + "] is already reported";
|
||||
}
|
||||
|
||||
public synchronized void setFileDetailsComplete() {
|
||||
fileDetailsComplete = true;
|
||||
}
|
||||
|
||||
public synchronized void addRecoveredBytesToFile(String name, long bytes) {
|
||||
File file = fileDetails.get(name);
|
||||
file.addRecoveredBytes(bytes);
|
||||
@ -865,6 +886,23 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of bytes still to recover, i.e. {@link Index#totalRecoverBytes()} minus {@link Index#recoveredBytes()}, or
|
||||
* {@code -1} if the full set of files to recover is not yet known
|
||||
*/
|
||||
public synchronized long bytesStillToRecover() {
|
||||
if (fileDetailsComplete == false) {
|
||||
return -1L;
|
||||
}
|
||||
long total = 0L;
|
||||
for (File file : fileDetails.values()) {
|
||||
if (file.reused() == false) {
|
||||
total += file.length() - file.recovered();
|
||||
}
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* percent of bytes recovered out of total files bytes *to be* recovered
|
||||
*/
|
||||
|
@ -288,6 +288,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
||||
@Override
|
||||
public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details
|
||||
state().getTranslog().totalOperations(totalTranslogOps);
|
||||
indexShard().openEngineAndSkipTranslogRecovery();
|
||||
return null;
|
||||
@ -403,6 +404,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
||||
for (int i = 0; i < phase1FileNames.size(); i++) {
|
||||
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
|
||||
}
|
||||
index.setFileDetailsComplete();
|
||||
state().getTranslog().totalOperations(totalTranslogOps);
|
||||
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
|
||||
return null;
|
||||
|
@ -129,6 +129,8 @@ public abstract class FileRestoreContext {
|
||||
}
|
||||
}
|
||||
|
||||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
|
||||
if (filesToRecover.isEmpty()) {
|
||||
logger.trace("[{}] [{}] no files to recover, all exist within the local store", shardId, snapshotId);
|
||||
}
|
||||
|
@ -30,8 +30,8 @@ public class ClusterInfoTests extends ESTestCase {
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
ClusterInfo clusterInfo = new ClusterInfo(
|
||||
randomDiskUsage(), randomDiskUsage(), randomShardSizes(), randomRoutingToDataPath()
|
||||
);
|
||||
randomDiskUsage(), randomDiskUsage(), randomShardSizes(), randomRoutingToDataPath(),
|
||||
randomReservedSpace());
|
||||
BytesStreamOutput output = new BytesStreamOutput();
|
||||
clusterInfo.writeTo(output);
|
||||
|
||||
@ -40,6 +40,7 @@ public class ClusterInfoTests extends ESTestCase {
|
||||
assertEquals(clusterInfo.getNodeMostAvailableDiskUsages(), result.getNodeMostAvailableDiskUsages());
|
||||
assertEquals(clusterInfo.shardSizes, result.shardSizes);
|
||||
assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
|
||||
assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
|
||||
}
|
||||
|
||||
private static ImmutableOpenMap<String, DiskUsage> randomDiskUsage() {
|
||||
@ -78,4 +79,19 @@ public class ClusterInfoTests extends ESTestCase {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> randomReservedSpace() {
|
||||
int numEntries = randomIntBetween(0, 128);
|
||||
ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> builder = ImmutableOpenMap.builder(numEntries);
|
||||
for (int i = 0; i < numEntries; i++) {
|
||||
final ClusterInfo.NodeAndPath key = new ClusterInfo.NodeAndPath(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
final ClusterInfo.ReservedSpace.Builder valueBuilder = new ClusterInfo.ReservedSpace.Builder();
|
||||
for (int j = between(0,10); j > 0; j--) {
|
||||
ShardId shardId = new ShardId(randomAlphaOfLength(32), randomAlphaOfLength(32), randomIntBetween(0, Integer.MAX_VALUE));
|
||||
valueBuilder.add(shardId, between(0, Integer.MAX_VALUE));
|
||||
}
|
||||
builder.put(key, valueBuilder.build());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
@ -104,14 +105,14 @@ public class DiskUsageTests extends ESTestCase {
|
||||
test_0 = ShardRoutingHelper.moveToStarted(test_0);
|
||||
Path test0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0");
|
||||
CommonStats commonStats0 = new CommonStats();
|
||||
commonStats0.store = new StoreStats(100);
|
||||
commonStats0.store = new StoreStats(100, 0L);
|
||||
ShardRouting test_1 = ShardRouting.newUnassigned(new ShardId(index, 1), false, PeerRecoverySource.INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
|
||||
test_1 = ShardRoutingHelper.initialize(test_1, "node2");
|
||||
test_1 = ShardRoutingHelper.moveToStarted(test_1);
|
||||
Path test1Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("1");
|
||||
CommonStats commonStats1 = new CommonStats();
|
||||
commonStats1.store = new StoreStats(1000);
|
||||
commonStats1.store = new StoreStats(1000, 0L);
|
||||
ShardStats[] stats = new ShardStats[] {
|
||||
new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null, null),
|
||||
new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null, null)
|
||||
@ -119,7 +120,7 @@ public class DiskUsageTests extends ESTestCase {
|
||||
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
|
||||
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
|
||||
ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build();
|
||||
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
|
||||
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, new HashMap<>());
|
||||
assertEquals(2, shardSizes.size());
|
||||
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
|
||||
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));
|
||||
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
@ -100,7 +101,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertFalse(reroute.get());
|
||||
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
|
||||
|
||||
@ -109,7 +110,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
|
||||
currentTime.addAndGet(randomLongBetween(60001, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertTrue(reroute.get());
|
||||
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder()
|
||||
@ -145,7 +146,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
|
||||
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertTrue(reroute.get());
|
||||
assertEquals(Collections.singleton("test_1"), indices.get());
|
||||
}
|
||||
@ -181,12 +182,12 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
|
||||
// should not reroute when all disks are ok
|
||||
currentTime.addAndGet(randomLongBetween(0, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNull(listenerReference.get());
|
||||
|
||||
// should reroute when one disk goes over the watermark
|
||||
currentTime.addAndGet(randomLongBetween(0, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
|
||||
assertNotNull(listenerReference.get());
|
||||
listenerReference.getAndSet(null).onResponse(clusterState);
|
||||
|
||||
@ -194,20 +195,20 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
// should not re-route again within the reroute interval
|
||||
currentTime.addAndGet(randomLongBetween(0,
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNull(listenerReference.get());
|
||||
}
|
||||
|
||||
// should reroute again when one disk is still over the watermark
|
||||
currentTime.addAndGet(randomLongBetween(
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark));
|
||||
assertNotNull(listenerReference.get());
|
||||
final ActionListener<ClusterState> rerouteListener1 = listenerReference.getAndSet(null);
|
||||
|
||||
// should not re-route again before reroute has completed
|
||||
currentTime.addAndGet(randomLongBetween(0, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNull(listenerReference.get());
|
||||
|
||||
// complete reroute
|
||||
@ -217,21 +218,34 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
// should not re-route again within the reroute interval
|
||||
currentTime.addAndGet(randomLongBetween(0,
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNull(listenerReference.get());
|
||||
}
|
||||
|
||||
// should reroute again after the reroute interval
|
||||
currentTime.addAndGet(randomLongBetween(
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNotNull(listenerReference.get());
|
||||
listenerReference.getAndSet(null).onResponse(null);
|
||||
|
||||
// should not reroute again when it is not required
|
||||
currentTime.addAndGet(randomLongBetween(0, 120000));
|
||||
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk));
|
||||
assertNull(listenerReference.get());
|
||||
|
||||
// should reroute again when one disk has reserved space that pushes it over the high watermark
|
||||
final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> builder = ImmutableOpenMap.builder(1);
|
||||
builder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"),
|
||||
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("baz", "quux", 0), between(41, 100)).build());
|
||||
final ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = builder.build();
|
||||
|
||||
currentTime.addAndGet(randomLongBetween(
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
|
||||
monitor.onNewInfo(clusterInfo(allDisksOk, reservedSpaces));
|
||||
assertNotNull(listenerReference.get());
|
||||
listenerReference.getAndSet(null).onResponse(null);
|
||||
|
||||
}
|
||||
|
||||
public void testAutoReleaseIndices() {
|
||||
@ -253,6 +267,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation);
|
||||
assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8));
|
||||
|
||||
final ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpacesBuilder
|
||||
= ImmutableOpenMap.builder();
|
||||
final int reservedSpaceNode1 = between(0, 10);
|
||||
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"),
|
||||
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build());
|
||||
final int reservedSpaceNode2 = between(0, 10);
|
||||
reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"),
|
||||
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build());
|
||||
ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpaces = reservedSpacesBuilder.build();
|
||||
|
||||
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L,
|
||||
(reason, priority, listener) -> {
|
||||
@ -275,10 +299,20 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
||||
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// Reserved space is ignored when applying block
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90)));
|
||||
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// Change cluster state so that "test_2" index is blocked (read only)
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder()
|
||||
.put(clusterState.metadata()
|
||||
@ -313,17 +347,17 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
// When free disk on node1 and node2 goes above 10% high watermark, then only release index block
|
||||
// When free disk on node1 and node2 goes above 10% high watermark then release index block, ignoring reserved space
|
||||
indicesToMarkReadOnly.set(null);
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100)));
|
||||
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertThat(indicesToRelease.get(), contains("test_2"));
|
||||
|
||||
@ -332,7 +366,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
indicesToRelease.set(null);
|
||||
builder = ImmutableOpenMap.builder();
|
||||
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
@ -345,7 +379,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
@ -357,7 +391,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertNull(indicesToMarkReadOnly.get());
|
||||
assertNull(indicesToRelease.get());
|
||||
|
||||
@ -369,7 +403,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
if (randomBoolean()) {
|
||||
builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100)));
|
||||
}
|
||||
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(builder.build()));
|
||||
assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
|
||||
assertNull(indicesToRelease.get());
|
||||
}
|
||||
@ -492,7 +526,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
|
||||
assertSingleInfoMessage(monitor, aboveLowWatermark,
|
||||
"high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded");
|
||||
|
||||
}
|
||||
|
||||
private void assertNoLogging(DiskThresholdMonitor monitor,
|
||||
@ -514,7 +547,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender);
|
||||
|
||||
for (int i = between(1, 3); i >= 0; i--) {
|
||||
monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(diskUsages));
|
||||
}
|
||||
|
||||
mockAppender.assertAllExpectationsMatched();
|
||||
@ -564,10 +597,20 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
|
||||
Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class);
|
||||
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender);
|
||||
|
||||
monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null));
|
||||
monitor.onNewInfo(clusterInfo(diskUsages));
|
||||
|
||||
mockAppender.assertAllExpectationsMatched();
|
||||
Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
}
|
||||
|
||||
private static ClusterInfo clusterInfo(ImmutableOpenMap<String, DiskUsage> diskUsages) {
|
||||
return clusterInfo(diskUsages, ImmutableOpenMap.of());
|
||||
}
|
||||
|
||||
private static ClusterInfo clusterInfo(ImmutableOpenMap<String, DiskUsage> diskUsages,
|
||||
ImmutableOpenMap<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace) {
|
||||
return new ClusterInfo(diskUsages, null, null, null, reservedSpace);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -69,6 +70,7 @@ import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocat
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.oneOf;
|
||||
|
||||
public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||
|
||||
@ -763,6 +765,30 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||
|
||||
strategy.reroute(clusterState, "foo"); // ensure reroute doesn't fail even though there is negative free space
|
||||
}
|
||||
|
||||
{
|
||||
clusterInfoReference.set(overfullClusterInfo);
|
||||
clusterState = applyStartedShardsUntilNoChange(clusterState, strategy);
|
||||
final List<ShardRouting> startedShardsWithOverfullDisk = clusterState.getRoutingNodes().shardsWithState(STARTED);
|
||||
assertThat(startedShardsWithOverfullDisk.size(), equalTo(4));
|
||||
for (ShardRouting shardRouting : startedShardsWithOverfullDisk) {
|
||||
// no shards on node3 since it has no free space
|
||||
assertThat(shardRouting.toString(), shardRouting.currentNodeId(), oneOf("node1", "node2"));
|
||||
}
|
||||
|
||||
// reset free space on node 3 and reserve space on node1
|
||||
clusterInfoReference.set(new DevNullClusterInfo(usages, usages, shardSizes,
|
||||
(new ImmutableOpenMap.Builder<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace>()).fPut(
|
||||
new ClusterInfo.NodeAndPath("node1", "/dev/null"),
|
||||
new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), between(51, 200)).build()).build()));
|
||||
clusterState = applyStartedShardsUntilNoChange(clusterState, strategy);
|
||||
final List<ShardRouting> startedShardsWithReservedSpace = clusterState.getRoutingNodes().shardsWithState(STARTED);
|
||||
assertThat(startedShardsWithReservedSpace.size(), equalTo(4));
|
||||
for (ShardRouting shardRouting : startedShardsWithReservedSpace) {
|
||||
// no shards on node1 since all its free space is reserved
|
||||
assertThat(shardRouting.toString(), shardRouting.currentNodeId(), oneOf("node2", "node3"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testCanRemainWithShardRelocatingAway() {
|
||||
@ -1123,7 +1149,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, Long> shardSizes) {
|
||||
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
|
||||
this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, ImmutableOpenMap.of());
|
||||
}
|
||||
|
||||
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
|
||||
ImmutableOpenMap<String, Long> shardSizes, ImmutableOpenMap<NodeAndPath, ReservedSpace> reservedSpace) {
|
||||
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,7 +104,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
|
||||
shardSizes.put("[test][0][p]", 10L); // 10 bytes
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(),
|
||||
mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
|
||||
mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)),
|
||||
clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime());
|
||||
allocation.debugDecision(true);
|
||||
@ -159,7 +159,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||
final long shardSize = randomIntBetween(110, 1000);
|
||||
shardSizes.put("[test][0][p]", shardSize);
|
||||
ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(),
|
||||
shardSizes.build(), ImmutableOpenMap.of());
|
||||
shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)),
|
||||
clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime());
|
||||
allocation.debugDecision(true);
|
||||
@ -240,7 +240,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
|
||||
shardSizes.put("[test][2][p]", 10L);
|
||||
|
||||
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(),
|
||||
shardSizes.build(), shardRoutingMap.build());
|
||||
shardSizes.build(), shardRoutingMap.build(), ImmutableOpenMap.of());
|
||||
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)),
|
||||
clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime());
|
||||
allocation.debugDecision(true);
|
||||
|
@ -2365,6 +2365,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
}
|
||||
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
|
||||
}
|
||||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
@ -2659,6 +2660,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
shard.prepareForIndexRecovery();
|
||||
// Shard is still inactive since we haven't started recovering yet
|
||||
assertFalse(shard.isActive());
|
||||
shard.recoveryState().getIndex().setFileDetailsComplete();
|
||||
shard.openEngineAndRecoverFromTranslog();
|
||||
// Shard should now be active since we did recover:
|
||||
assertTrue(shard.isActive());
|
||||
|
@ -463,7 +463,7 @@ public class StoreTests extends ESTestCase {
|
||||
public void assertDeleteContent(Store store, Directory dir) throws IOException {
|
||||
deleteContent(store.directory());
|
||||
assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));
|
||||
assertThat(store.stats().sizeInBytes(), equalTo(0L));
|
||||
assertThat(store.stats(0L).sizeInBytes(), equalTo(0L));
|
||||
assertThat(dir.listAll().length, equalTo(0));
|
||||
}
|
||||
|
||||
@ -748,8 +748,20 @@ public class StoreTests extends ESTestCase {
|
||||
assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra"));
|
||||
initialStoreSize += store.directory().fileLength(extraFiles);
|
||||
}
|
||||
StoreStats stats = store.stats();
|
||||
assertEquals(stats.getSize().getBytes(), initialStoreSize);
|
||||
final long reservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES :randomLongBetween(0L, Integer.MAX_VALUE);
|
||||
StoreStats stats = store.stats(reservedBytes);
|
||||
assertEquals(initialStoreSize, stats.getSize().getBytes());
|
||||
assertEquals(reservedBytes, stats.getReservedSize().getBytes());
|
||||
|
||||
stats.add(null);
|
||||
assertEquals(initialStoreSize, stats.getSize().getBytes());
|
||||
assertEquals(reservedBytes, stats.getReservedSize().getBytes());
|
||||
|
||||
final long otherStatsBytes = randomLongBetween(0L, Integer.MAX_VALUE);
|
||||
final long otherStatsReservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES :randomLongBetween(0L, Integer.MAX_VALUE);
|
||||
stats.add(new StoreStats(otherStatsBytes, otherStatsReservedBytes));
|
||||
assertEquals(initialStoreSize + otherStatsBytes, stats.getSize().getBytes());
|
||||
assertEquals(Math.max(reservedBytes, 0L) + Math.max(otherStatsReservedBytes, 0L), stats.getReservedSize().getBytes());
|
||||
|
||||
Directory dir = store.directory();
|
||||
final long length;
|
||||
@ -763,7 +775,7 @@ public class StoreTests extends ESTestCase {
|
||||
}
|
||||
|
||||
assertTrue(numNonExtraFiles(store) > 0);
|
||||
stats = store.stats();
|
||||
stats = store.stats(0L);
|
||||
assertEquals(stats.getSizeInBytes(), length + initialStoreSize);
|
||||
|
||||
deleteContent(store.directory());
|
||||
|
@ -200,6 +200,7 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
|
||||
Collections.shuffle(Arrays.asList(files), random());
|
||||
final RecoveryState.Index index = new RecoveryState.Index();
|
||||
assertThat(index.bytesStillToRecover(), equalTo(-1L));
|
||||
|
||||
if (randomBoolean()) {
|
||||
// initialize with some data and then reset
|
||||
@ -213,13 +214,15 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
index.addTargetThrottling(randomIntBetween(0, 20));
|
||||
}
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
index.setFileDetailsComplete();
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
index.stop();
|
||||
}
|
||||
index.reset();
|
||||
}
|
||||
|
||||
|
||||
// before we start we must report 0
|
||||
assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0));
|
||||
assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0));
|
||||
@ -242,7 +245,10 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
assertThat(index.recoveredBytes(), equalTo(0L));
|
||||
assertThat(index.recoveredFilesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f));
|
||||
assertThat(index.recoveredBytesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f));
|
||||
assertThat(index.bytesStillToRecover(), equalTo(-1L));
|
||||
|
||||
index.setFileDetailsComplete();
|
||||
assertThat(index.bytesStillToRecover(), equalTo(totalFileBytes - totalReusedBytes));
|
||||
|
||||
long bytesToRecover = totalFileBytes - totalReusedBytes;
|
||||
boolean completeRecovery = bytesToRecover == 0 || randomBoolean();
|
||||
@ -322,6 +328,7 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
assertThat(index.recoveredBytes(), equalTo(recoveredBytes));
|
||||
assertThat(index.targetThrottling().nanos(), equalTo(targetThrottling));
|
||||
assertThat(index.sourceThrottling().nanos(), equalTo(sourceThrottling));
|
||||
assertThat(index.bytesStillToRecover(), equalTo(totalFileBytes - totalReusedBytes - recoveredBytes));
|
||||
if (index.totalRecoverFiles() == 0) {
|
||||
assertThat((double) index.recoveredFilesPercent(), equalTo(100.0));
|
||||
assertThat((double) index.recoveredBytesPercent(), equalTo(100.0));
|
||||
@ -351,6 +358,9 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
RecoveryState state = new RecoveryState(shardRouting, discoveryNode,
|
||||
shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null);
|
||||
for (Stage stage : stages) {
|
||||
if (stage == Stage.FINALIZE) {
|
||||
state.getIndex().setFileDetailsComplete();
|
||||
}
|
||||
state.setStage(stage);
|
||||
}
|
||||
fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]");
|
||||
@ -369,6 +379,9 @@ public class RecoveryTargetTests extends ESTestCase {
|
||||
shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null);
|
||||
for (Stage stage : list) {
|
||||
state.setStage(stage);
|
||||
if (stage == Stage.INDEX) {
|
||||
state.getIndex().setFileDetailsComplete();
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(state.getStage(), equalTo(Stage.DONE));
|
||||
|
@ -91,7 +91,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
|
||||
class SizeFakingClusterInfo extends ClusterInfo {
|
||||
SizeFakingClusterInfo(ClusterInfo delegate) {
|
||||
super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(),
|
||||
delegate.shardSizes, delegate.routingToDataPath);
|
||||
delegate.shardSizes, delegate.routingToDataPath, delegate.reservedSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -473,6 +473,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||
leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT);
|
||||
}
|
||||
}
|
||||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
@ -142,6 +142,7 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
|
||||
}
|
||||
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
|
||||
}
|
||||
recoveryState.getIndex().setFileDetailsComplete();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
@ -409,7 +409,8 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
|
||||
+ "\"deleted\":0"
|
||||
+ "},"
|
||||
+ "\"store\":{"
|
||||
+ "\"size_in_bytes\":0"
|
||||
+ "\"size_in_bytes\":0,"
|
||||
+ "\"reserved_in_bytes\":0"
|
||||
+ "},"
|
||||
+ "\"fielddata\":{"
|
||||
+ "\"memory_size_in_bytes\":0,"
|
||||
|
@ -325,7 +325,7 @@ public class IndexStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestC
|
||||
commonStats.getMerge().add(no, no, no, ++iota, no, no, no, no, no, no);
|
||||
commonStats.getQueryCache().add(new QueryCacheStats(++iota, ++iota, ++iota, ++iota, no));
|
||||
commonStats.getRequestCache().add(new RequestCacheStats(++iota, ++iota, ++iota, ++iota));
|
||||
commonStats.getStore().add(new StoreStats(++iota));
|
||||
commonStats.getStore().add(new StoreStats(++iota, no));
|
||||
commonStats.getRefresh().add(new RefreshStats(no, ++iota, no, ++iota, (int) no));
|
||||
|
||||
final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota);
|
||||
|
@ -144,7 +144,7 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes
|
||||
private CommonStats mockCommonStats() {
|
||||
final CommonStats commonStats = new CommonStats(CommonStatsFlags.ALL);
|
||||
commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong()));
|
||||
commonStats.getStore().add(new StoreStats(2L));
|
||||
commonStats.getStore().add(new StoreStats(2L, 0L));
|
||||
|
||||
final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L);
|
||||
commonStats.getIndexing().add(new IndexingStats(indexingStats, null));
|
||||
|
@ -291,7 +291,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
|
||||
final CommonStats indicesCommonStats = new CommonStats(CommonStatsFlags.ALL);
|
||||
indicesCommonStats.getDocs().add(new DocsStats(++iota, no, randomNonNegativeLong()));
|
||||
indicesCommonStats.getFieldData().add(new FieldDataStats(++iota, ++iota, null));
|
||||
indicesCommonStats.getStore().add(new StoreStats(++iota));
|
||||
indicesCommonStats.getStore().add(new StoreStats(++iota, no));
|
||||
|
||||
final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, ++iota, no, no, no, no, no, false, ++iota);
|
||||
indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats, null));
|
||||
|
Loading…
x
Reference in New Issue
Block a user