Balance new shard allocations more evenly across multiple path.data

This change adds a simplistic heuristic to try to balance new shard
allocations across multiple data paths on one node so that e.g. if
there are two path.data and both have roughly the same free space, if
10 shards are suddenly allocated, we will put 5 on one path and 5 on
the other (vs 10 on a single path today).

Closes #11185

Closes #11122
This commit is contained in:
Michael McCandless 2015-06-17 10:28:38 -04:00 committed by mikemccand
parent ba3540675a
commit 6fd893e10d
3 changed files with 93 additions and 19 deletions

View File

@ -781,4 +781,17 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
public Path resolveCustomLocation(@IndexSettings Settings indexSettings, final ShardId shardId) {
return resolveCustomLocation(indexSettings, shardId.index().name()).resolve(Integer.toString(shardId.id()));
}
/**
* Returns the {@code NodePath.path} for this shard.
*/
public static Path shardStatePathToDataPath(Path shardPath) {
int count = shardPath.getNameCount();
// Sanity check:
assert Integer.parseInt(shardPath.getName(count-1).toString()) >= 0;
assert "indices".equals(shardPath.getName(count-3).toString());
return shardPath.getParent().getParent().getParent();
}
}

View File

@ -60,8 +60,10 @@ import org.elasticsearch.plugins.ShardsPluginsModule;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -114,6 +116,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache,
IndexSettingsService settingsService,
IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) {
super(index, indexSettings);
this.injector = injector;
this.indexSettings = indexSettings;
@ -255,6 +258,21 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
}
// NOTE: O(numShards) cost, but numShards should be smallish?
private long getAvgShardSizeInBytes() throws IOException {
long sum = 0;
int count = 0;
for(IndexShard indexShard : this) {
sum += indexShard.store().stats().sizeInBytes();
count++;
}
if (count == 0) {
return -1L;
} else {
return sum / count;
}
}
public synchronized IndexShard createShard(int sShardId, boolean primary) {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
@ -272,7 +290,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
ShardPath path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (path == null) {
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings);
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);

View File

@ -30,7 +30,9 @@ import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class ShardPath {
public static final String INDEX_FOLDER_NAME = "index";
@ -110,35 +112,76 @@ public final class ShardPath {
} else {
dataPath = statePath;
}
logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath);
logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath);
return new ShardPath(dataPath, statePath, indexUUID, shardId);
}
}
// TODO - do we need something more extensible? Yet, this does the job for now...
public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
final NodeEnvironment.NodePath[] paths = env.nodePaths();
final List<Tuple<Path, Long>> minUsedPaths = new ArrayList<>();
for (NodeEnvironment.NodePath nodePath : paths) {
final Path shardPath = nodePath.resolve(shardId);
FileStore fileStore = nodePath.fileStore;
long usableSpace = fileStore.getUsableSpace();
if (minUsedPaths.isEmpty() || minUsedPaths.get(0).v2() == usableSpace) {
minUsedPaths.add(new Tuple<>(shardPath, usableSpace));
} else if (minUsedPaths.get(0).v2() < usableSpace) {
minUsedPaths.clear();
minUsedPaths.add(new Tuple<>(shardPath, usableSpace));
}
/** Maps each path.data path to a "guess" of how many bytes the shards allocated to that path might additionally use over their
* lifetime; we do this so a bunch of newly allocated shards won't just all go the path with the most free space at this moment. */
private static Map<Path,Long> getEstimatedReservedBytes(NodeEnvironment env, long avgShardSizeInBytes, Iterable<IndexShard> shards) throws IOException {
long totFreeSpace = 0;
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
totFreeSpace += nodePath.fileStore.getUsableSpace();
}
Path minUsed = minUsedPaths.get(shardId.id() % minUsedPaths.size()).v1();
// Very rough heurisic of how much disk space we expect the shard will use over its lifetime, the max of current average
// shard size across the cluster and 5% of the total available free space on this node:
long estShardSizeInBytes = Math.max(avgShardSizeInBytes, (long) (totFreeSpace/20.0));
// Collate predicted (guessed!) disk usage on each path.data:
Map<Path,Long> reservedBytes = new HashMap<>();
for (IndexShard shard : shards) {
Path dataPath = NodeEnvironment.shardStatePathToDataPath(shard.shardPath().getShardStatePath());
// Remove indices/<index>/<shardID> subdirs from the statePath to get back to the path.data/<lockID>:
Long curBytes = reservedBytes.get(dataPath);
if (curBytes == null) {
curBytes = 0L;
}
reservedBytes.put(dataPath, curBytes + estShardSizeInBytes);
}
return reservedBytes;
}
public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings,
long avgShardSizeInBytes, Iterable<IndexShard> shards) throws IOException {
final Path dataPath;
final Path statePath = minUsed;
final Path statePath;
final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
if (NodeEnvironment.hasCustomDataPath(indexSettings)) {
dataPath = env.resolveCustomLocation(indexSettings, shardId);
statePath = env.nodePaths()[0].resolve(shardId);
} else {
Map<Path,Long> estReservedBytes = getEstimatedReservedBytes(env, avgShardSizeInBytes, shards);
// TODO - do we need something more extensible? Yet, this does the job for now...
final NodeEnvironment.NodePath[] paths = env.nodePaths();
NodeEnvironment.NodePath bestPath = null;
long maxUsableBytes = Long.MIN_VALUE;
for (NodeEnvironment.NodePath nodePath : paths) {
FileStore fileStore = nodePath.fileStore;
long usableBytes = fileStore.getUsableSpace();
Long reservedBytes = estReservedBytes.get(nodePath.path);
if (reservedBytes != null) {
// Deduct estimated reserved bytes from usable space:
usableBytes -= reservedBytes;
}
if (usableBytes > maxUsableBytes) {
maxUsableBytes = usableBytes;
bestPath = nodePath;
}
}
statePath = bestPath.resolve(shardId);
dataPath = statePath;
}
return new ShardPath(dataPath, statePath, indexUUID, shardId);
}