Balance shards for an index more evenly across multiple data paths (#26654)
* Balance shards for an index more evenly across multiple data paths When a node has multiple data paths configured, and is assigned all of the shards for a particular index, it's possible now that all shards will be assigned to the same path (see #16763). This change keeps the same behavior around determining the "best" path for a shard based on space, however, it enforces limits for the number of shards on a path for an index from the single-node perspective. For example: Assume you had a node with 4 data paths, where `/path1` has a tremendously high amount of disk space available compared to the other paths. If you create an index with 5 primary shards, the previous behavior would be to assign all 5 shards to `/path1`. This change would enforce a limit of 2 shards to each data path for that particular node, so you would end up with the following distribution: - `/path1` - 2 shards (because it has the most usable space) - `/path2` - 1 shard - `/path3` - 1 shard - `/path4` - 1 shard Note, however, that this limit is only enforced at the local node level for simplicity in implementation, so if you had multiple nodes, the "limit" for the node is still 2, so assuming you had enough nodes that there was only 2 shards for this index assigned to this node, they would still both be assigned to `/path1`. * Switch from ObjectLongHashMap to regular HashMap * Remove unneeded Files.isDirectory check * Skip iterating directories when not necessary * Add message to assert * Implement different (better) ranking for node paths This is the method we discussed * Remove unused pathHasEnoughSpace method * Use findFirst instead of .get(0); * Update for master merge to fix compilation Settings.putArray -> Settings.putList
This commit is contained in:
parent
62bf3c11a9
commit
78c54c4560
|
@ -845,6 +845,29 @@ public final class NodeEnvironment implements Closeable {
|
||||||
return shardIds;
|
return shardIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find all the shards for this index, returning a map of the {@code NodePath} to the number of shards on that path
|
||||||
|
* @param index the index by which to filter shards
|
||||||
|
* @return a map of NodePath to count of the shards for the index on that path
|
||||||
|
* @throws IOException if an IOException occurs
|
||||||
|
*/
|
||||||
|
public Map<NodePath, Long> shardCountPerPath(final Index index) throws IOException {
|
||||||
|
assert index != null;
|
||||||
|
if (nodePaths == null || locks == null) {
|
||||||
|
throw new IllegalStateException("node is not configured to store local location");
|
||||||
|
}
|
||||||
|
assertEnvIsLocked();
|
||||||
|
final Map<NodePath, Long> shardCountPerPath = new HashMap<>();
|
||||||
|
final String indexUniquePathId = index.getUUID();
|
||||||
|
for (final NodePath nodePath : nodePaths) {
|
||||||
|
Path indexLocation = nodePath.indicesPath.resolve(indexUniquePathId);
|
||||||
|
if (Files.isDirectory(indexLocation)) {
|
||||||
|
shardCountPerPath.put(nodePath, (long) findAllShardsForIndex(indexLocation, index).size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shardCountPerPath;
|
||||||
|
}
|
||||||
|
|
||||||
private static Set<ShardId> findAllShardsForIndex(Path indexPath, Index index) throws IOException {
|
private static Set<ShardId> findAllShardsForIndex(Path indexPath, Index index) throws IOException {
|
||||||
assert indexPath.getFileName().toString().equals(index.getUUID());
|
assert indexPath.getFileName().toString().equals(index.getUUID());
|
||||||
Set<ShardId> shardIds = new HashSet<>();
|
Set<ShardId> shardIds = new HashSet<>();
|
||||||
|
|
|
@ -31,7 +31,11 @@ import java.math.BigInteger;
|
||||||
import java.nio.file.FileStore;
|
import java.nio.file.FileStore;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public final class ShardPath {
|
public final class ShardPath {
|
||||||
public static final String INDEX_FOLDER_NAME = "index";
|
public static final String INDEX_FOLDER_NAME = "index";
|
||||||
|
@ -189,23 +193,44 @@ public final class ShardPath {
|
||||||
|
|
||||||
// TODO - do we need something more extensible? Yet, this does the job for now...
|
// TODO - do we need something more extensible? Yet, this does the job for now...
|
||||||
final NodeEnvironment.NodePath[] paths = env.nodePaths();
|
final NodeEnvironment.NodePath[] paths = env.nodePaths();
|
||||||
NodeEnvironment.NodePath bestPath = null;
|
|
||||||
BigInteger maxUsableBytes = BigInteger.valueOf(Long.MIN_VALUE);
|
// If no better path is chosen, use the one with the most space by default
|
||||||
|
NodeEnvironment.NodePath bestPath = getPathWithMostFreeSpace(env);
|
||||||
|
|
||||||
|
if (paths.length != 1) {
|
||||||
|
int shardCount = indexSettings.getNumberOfShards();
|
||||||
|
// Maximum number of shards that a path should have for a particular index assuming
|
||||||
|
// all the shards were assigned to this node. For example, with a node with 4 data
|
||||||
|
// paths and an index with 9 primary shards, the maximum number of shards per path
|
||||||
|
// would be 3.
|
||||||
|
int maxShardsPerPath = Math.floorDiv(shardCount, paths.length) + ((shardCount % paths.length) == 0 ? 0 : 1);
|
||||||
|
|
||||||
|
Map<NodeEnvironment.NodePath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());
|
||||||
|
|
||||||
|
// Compute how much space there is on each path
|
||||||
|
final Map<NodeEnvironment.NodePath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
|
||||||
for (NodeEnvironment.NodePath nodePath : paths) {
|
for (NodeEnvironment.NodePath nodePath : paths) {
|
||||||
FileStore fileStore = nodePath.fileStore;
|
FileStore fileStore = nodePath.fileStore;
|
||||||
|
|
||||||
BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
|
BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
|
||||||
assert usableBytes.compareTo(BigInteger.ZERO) >= 0;
|
pathsToSpace.put(nodePath, usableBytes);
|
||||||
|
}
|
||||||
|
|
||||||
// Deduct estimated reserved bytes from usable space:
|
bestPath = Arrays.stream(paths)
|
||||||
Integer count = dataPathToShardCount.get(nodePath.path);
|
// Filter out paths that have enough space
|
||||||
if (count != null) {
|
.filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
|
||||||
usableBytes = usableBytes.subtract(estShardSizeInBytes.multiply(BigInteger.valueOf(count)));
|
// Sort by the number of shards for this index
|
||||||
}
|
.sorted((p1, p2) -> {
|
||||||
if (bestPath == null || usableBytes.compareTo(maxUsableBytes) > 0) {
|
int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L), pathToShardCount.getOrDefault(p2, 0L));
|
||||||
maxUsableBytes = usableBytes;
|
if (cmp == 0) {
|
||||||
bestPath = nodePath;
|
// if the number of shards is equal, tie-break with the usable bytes
|
||||||
|
cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
|
||||||
}
|
}
|
||||||
|
return cmp;
|
||||||
|
})
|
||||||
|
// Return the first result
|
||||||
|
.findFirst()
|
||||||
|
// Or the existing best path if there aren't any that fit the criteria
|
||||||
|
.orElse(bestPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
statePath = bestPath.resolve(shardId);
|
statePath = bestPath.resolve(shardId);
|
||||||
|
@ -214,6 +239,24 @@ public final class ShardPath {
|
||||||
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
|
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) throws IOException {
|
||||||
|
final NodeEnvironment.NodePath[] paths = env.nodePaths();
|
||||||
|
NodeEnvironment.NodePath bestPath = null;
|
||||||
|
long maxUsableBytes = Long.MIN_VALUE;
|
||||||
|
for (NodeEnvironment.NodePath nodePath : paths) {
|
||||||
|
FileStore fileStore = nodePath.fileStore;
|
||||||
|
long usableBytes = fileStore.getUsableSpace();
|
||||||
|
assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes;
|
||||||
|
|
||||||
|
if (bestPath == null || usableBytes > maxUsableBytes) {
|
||||||
|
// This path has been determined to be "better" based on the usable bytes
|
||||||
|
maxUsableBytes = usableBytes;
|
||||||
|
bestPath = nodePath;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bestPath;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.lucene.mockfile.FilterFileSystemProvider;
|
import org.apache.lucene.mockfile.FilterFileSystemProvider;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
import org.elasticsearch.common.io.PathUtils;
|
||||||
import org.elasticsearch.common.io.PathUtilsForTesting;
|
import org.elasticsearch.common.io.PathUtilsForTesting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -33,8 +34,10 @@ import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.math.BigInteger;
|
||||||
import java.nio.file.FileStore;
|
import java.nio.file.FileStore;
|
||||||
import java.nio.file.FileSystem;
|
import java.nio.file.FileSystem;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.attribute.FileAttributeView;
|
import java.nio.file.attribute.FileAttributeView;
|
||||||
import java.nio.file.attribute.FileStoreAttributeView;
|
import java.nio.file.attribute.FileStoreAttributeView;
|
||||||
|
@ -45,6 +48,9 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
/** Separate test class from ShardPathTests because we need static (BeforeClass) setup to install mock filesystems... */
|
/** Separate test class from ShardPathTests because we need static (BeforeClass) setup to install mock filesystems... */
|
||||||
public class NewPathForShardTests extends ESTestCase {
|
public class NewPathForShardTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -158,6 +164,10 @@ public class NewPathForShardTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void createFakeShard(ShardPath path) throws IOException {
|
||||||
|
Files.createDirectories(path.resolveIndex().getParent());
|
||||||
|
}
|
||||||
|
|
||||||
public void testSelectNewPathForShard() throws Exception {
|
public void testSelectNewPathForShard() throws Exception {
|
||||||
Path path = PathUtils.get(createTempDir().toString());
|
Path path = PathUtils.get(createTempDir().toString());
|
||||||
|
|
||||||
|
@ -199,8 +209,10 @@ public class NewPathForShardTests extends ESTestCase {
|
||||||
|
|
||||||
Map<Path,Integer> dataPathToShardCount = new HashMap<>();
|
Map<Path,Integer> dataPathToShardCount = new HashMap<>();
|
||||||
ShardPath result1 = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, dataPathToShardCount);
|
ShardPath result1 = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, dataPathToShardCount);
|
||||||
|
createFakeShard(result1);
|
||||||
dataPathToShardCount.put(NodeEnvironment.shardStatePathToDataPath(result1.getDataPath()), 1);
|
dataPathToShardCount.put(NodeEnvironment.shardStatePathToDataPath(result1.getDataPath()), 1);
|
||||||
ShardPath result2 = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, dataPathToShardCount);
|
ShardPath result2 = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, dataPathToShardCount);
|
||||||
|
createFakeShard(result2);
|
||||||
|
|
||||||
// #11122: this was the original failure: on a node with 2 disks that have nearly equal
|
// #11122: this was the original failure: on a node with 2 disks that have nearly equal
|
||||||
// free space, we would always allocate all N incoming shards to the one path that
|
// free space, we would always allocate all N incoming shards to the one path that
|
||||||
|
@ -210,4 +222,86 @@ public class NewPathForShardTests extends ESTestCase {
|
||||||
|
|
||||||
nodeEnv.close();
|
nodeEnv.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSelectNewPathForShardEvenly() throws Exception {
|
||||||
|
Path path = PathUtils.get(createTempDir().toString());
|
||||||
|
|
||||||
|
// Use 2 data paths:
|
||||||
|
String[] paths = new String[] {path.resolve("a").toString(),
|
||||||
|
path.resolve("b").toString()};
|
||||||
|
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(Environment.PATH_HOME_SETTING.getKey(), path)
|
||||||
|
.putList(Environment.PATH_DATA_SETTING.getKey(), paths).build();
|
||||||
|
NodeEnvironment nodeEnv = new NodeEnvironment(settings, new Environment(settings));
|
||||||
|
|
||||||
|
// Make sure all our mocking above actually worked:
|
||||||
|
NodePath[] nodePaths = nodeEnv.nodePaths();
|
||||||
|
assertEquals(2, nodePaths.length);
|
||||||
|
|
||||||
|
assertEquals("mocka", nodePaths[0].fileStore.name());
|
||||||
|
assertEquals("mockb", nodePaths[1].fileStore.name());
|
||||||
|
|
||||||
|
// Path a has lots of free space, but b has little, so new shard should go to a:
|
||||||
|
aFileStore.usableSpace = 100000;
|
||||||
|
bFileStore.usableSpace = 10000;
|
||||||
|
|
||||||
|
ShardId shardId = new ShardId("index", "uid1", 0);
|
||||||
|
ShardPath result = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, Collections.<Path,Integer>emptyMap());
|
||||||
|
createFakeShard(result);
|
||||||
|
// First shard should go to a
|
||||||
|
assertThat(result.getDataPath().toString(), containsString(aPathPart));
|
||||||
|
|
||||||
|
shardId = new ShardId("index", "uid1", 1);
|
||||||
|
result = ShardPath.selectNewPathForShard(nodeEnv, shardId, INDEX_SETTINGS, 100, Collections.<Path,Integer>emptyMap());
|
||||||
|
createFakeShard(result);
|
||||||
|
// Second shard should go to b
|
||||||
|
assertThat(result.getDataPath().toString(), containsString(bPathPart));
|
||||||
|
|
||||||
|
Map<Path,Integer> dataPathToShardCount = new HashMap<>();
|
||||||
|
shardId = new ShardId("index2", "uid2", 0);
|
||||||
|
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index2",
|
||||||
|
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3).build());
|
||||||
|
ShardPath result1 = ShardPath.selectNewPathForShard(nodeEnv, shardId, idxSettings, 100, dataPathToShardCount);
|
||||||
|
createFakeShard(result1);
|
||||||
|
dataPathToShardCount.put(NodeEnvironment.shardStatePathToDataPath(result1.getDataPath()), 1);
|
||||||
|
shardId = new ShardId("index2", "uid2", 1);
|
||||||
|
ShardPath result2 = ShardPath.selectNewPathForShard(nodeEnv, shardId, idxSettings, 100, dataPathToShardCount);
|
||||||
|
createFakeShard(result2);
|
||||||
|
dataPathToShardCount.put(NodeEnvironment.shardStatePathToDataPath(result2.getDataPath()), 1);
|
||||||
|
shardId = new ShardId("index2", "uid2", 2);
|
||||||
|
ShardPath result3 = ShardPath.selectNewPathForShard(nodeEnv, shardId, idxSettings, 100, dataPathToShardCount);
|
||||||
|
createFakeShard(result3);
|
||||||
|
// 2 shards go to 'a' and 1 to 'b'
|
||||||
|
assertThat(result1.getDataPath().toString(), containsString(aPathPart));
|
||||||
|
assertThat(result2.getDataPath().toString(), containsString(bPathPart));
|
||||||
|
assertThat(result3.getDataPath().toString(), containsString(aPathPart));
|
||||||
|
|
||||||
|
nodeEnv.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGettingPathWithMostFreeSpace() throws Exception {
|
||||||
|
Path path = PathUtils.get(createTempDir().toString());
|
||||||
|
|
||||||
|
// Use 2 data paths:
|
||||||
|
String[] paths = new String[] {path.resolve("a").toString(),
|
||||||
|
path.resolve("b").toString()};
|
||||||
|
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(Environment.PATH_HOME_SETTING.getKey(), path)
|
||||||
|
.putList(Environment.PATH_DATA_SETTING.getKey(), paths).build();
|
||||||
|
NodeEnvironment nodeEnv = new NodeEnvironment(settings, new Environment(settings));
|
||||||
|
|
||||||
|
aFileStore.usableSpace = 100000;
|
||||||
|
bFileStore.usableSpace = 1000;
|
||||||
|
|
||||||
|
assertThat(ShardPath.getPathWithMostFreeSpace(nodeEnv), equalTo(nodeEnv.nodePaths()[0]));
|
||||||
|
|
||||||
|
aFileStore.usableSpace = 10000;
|
||||||
|
bFileStore.usableSpace = 20000;
|
||||||
|
|
||||||
|
assertThat(ShardPath.getPathWithMostFreeSpace(nodeEnv), equalTo(nodeEnv.nodePaths()[1]));
|
||||||
|
|
||||||
|
nodeEnv.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue