diff --git a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 66be2de2750..44090eff69d 100644 --- a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -781,4 +781,17 @@ public class NodeEnvironment extends AbstractComponent implements Closeable { public Path resolveCustomLocation(@IndexSettings Settings indexSettings, final ShardId shardId) { return resolveCustomLocation(indexSettings, shardId.index().name()).resolve(Integer.toString(shardId.id())); } + + /** + * Returns the {@code NodePath.path} for this shard. + */ + public static Path shardStatePathToDataPath(Path shardPath) { + int count = shardPath.getNameCount(); + + // Sanity check: + assert Integer.parseInt(shardPath.getName(count-1).toString()) >= 0; + assert "indices".equals(shardPath.getName(count-3).toString()); + + return shardPath.getParent().getParent().getParent(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index bfffbc551ae..009b1a81571 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -60,8 +60,10 @@ import org.elasticsearch.plugins.ShardsPluginsModule; import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,6 +116,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexSettingsService settingsService, IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) { + super(index, indexSettings); this.injector = injector; this.indexSettings = indexSettings; @@ -255,6 +258,21 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); } + // NOTE: O(numShards) cost, but numShards should be smallish? + private long getAvgShardSizeInBytes() throws IOException { + long sum = 0; + int count = 0; + for(IndexShard indexShard : this) { + sum += indexShard.store().stats().sizeInBytes(); + count++; + } + if (count == 0) { + return -1L; + } else { + return sum / count; + } + } + public synchronized IndexShard createShard(int sShardId, boolean primary) { /* * TODO: we execute this in parallel but it's a synced method. Yet, we might @@ -272,7 +290,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone ShardPath path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); if (path == null) { - path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings); + path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this); logger.debug("{} creating using a new path [{}]", shardId, path); } else { logger.debug("{} creating using an existing path [{}]", shardId, path); diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/core/src/main/java/org/elasticsearch/index/shard/ShardPath.java index 870283017c8..109a15c9099 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -30,7 +30,9 @@ import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public final class ShardPath { public static final String INDEX_FOLDER_NAME = "index"; @@ -110,35 +112,76 @@ public final class ShardPath { } else { dataPath = statePath; } - logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath); + logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath); return new ShardPath(dataPath, statePath, indexUUID, shardId); } } - // TODO - do we need something more extensible? Yet, this does the job for now... - public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings) throws IOException { - final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); - final NodeEnvironment.NodePath[] paths = env.nodePaths(); - final List> minUsedPaths = new ArrayList<>(); - for (NodeEnvironment.NodePath nodePath : paths) { - final Path shardPath = nodePath.resolve(shardId); - FileStore fileStore = nodePath.fileStore; - long usableSpace = fileStore.getUsableSpace(); - if (minUsedPaths.isEmpty() || minUsedPaths.get(0).v2() == usableSpace) { - minUsedPaths.add(new Tuple<>(shardPath, usableSpace)); - } else if (minUsedPaths.get(0).v2() < usableSpace) { - minUsedPaths.clear(); - minUsedPaths.add(new Tuple<>(shardPath, usableSpace)); - } + /** Maps each path.data path to a "guess" of how many bytes the shards allocated to that path might additionally use over their + * lifetime; we do this so a bunch of newly allocated shards won't just all go the path with the most free space at this moment. */ + private static Map getEstimatedReservedBytes(NodeEnvironment env, long avgShardSizeInBytes, Iterable shards) throws IOException { + long totFreeSpace = 0; + for (NodeEnvironment.NodePath nodePath : env.nodePaths()) { + totFreeSpace += nodePath.fileStore.getUsableSpace(); } - Path minUsed = minUsedPaths.get(shardId.id() % minUsedPaths.size()).v1(); + + // Very rough heurisic of how much disk space we expect the shard will use over its lifetime, the max of current average + // shard size across the cluster and 5% of the total available free space on this node: + long estShardSizeInBytes = Math.max(avgShardSizeInBytes, (long) (totFreeSpace/20.0)); + + // Collate predicted (guessed!) disk usage on each path.data: + Map reservedBytes = new HashMap<>(); + for (IndexShard shard : shards) { + Path dataPath = NodeEnvironment.shardStatePathToDataPath(shard.shardPath().getShardStatePath()); + + // Remove indices// subdirs from the statePath to get back to the path.data/: + Long curBytes = reservedBytes.get(dataPath); + if (curBytes == null) { + curBytes = 0L; + } + reservedBytes.put(dataPath, curBytes + estShardSizeInBytes); + } + + return reservedBytes; + } + + public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings, + long avgShardSizeInBytes, Iterable shards) throws IOException { + final Path dataPath; - final Path statePath = minUsed; + final Path statePath; + + final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); + if (NodeEnvironment.hasCustomDataPath(indexSettings)) { dataPath = env.resolveCustomLocation(indexSettings, shardId); + statePath = env.nodePaths()[0].resolve(shardId); } else { + + Map estReservedBytes = getEstimatedReservedBytes(env, avgShardSizeInBytes, shards); + + // TODO - do we need something more extensible? Yet, this does the job for now... + final NodeEnvironment.NodePath[] paths = env.nodePaths(); + NodeEnvironment.NodePath bestPath = null; + long maxUsableBytes = Long.MIN_VALUE; + for (NodeEnvironment.NodePath nodePath : paths) { + FileStore fileStore = nodePath.fileStore; + long usableBytes = fileStore.getUsableSpace(); + Long reservedBytes = estReservedBytes.get(nodePath.path); + if (reservedBytes != null) { + // Deduct estimated reserved bytes from usable space: + usableBytes -= reservedBytes; + } + if (usableBytes > maxUsableBytes) { + maxUsableBytes = usableBytes; + bestPath = nodePath; + } + } + + statePath = bestPath.resolve(shardId); dataPath = statePath; } + return new ShardPath(dataPath, statePath, indexUUID, shardId); }