simplify API for ShardPath.selectNewPathForShard to enable unit testing: don't pass IndexShard
This commit is contained in:
parent
8f2ae59316
commit
4d38856f70
|
@ -22,6 +22,7 @@ package org.elasticsearch.index;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -56,8 +57,10 @@ import org.elasticsearch.plugins.PluginsService;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -314,8 +317,23 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (path == null) {
|
if (path == null) {
|
||||||
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this);
|
// TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
|
||||||
|
// that's being relocated/replicated we know how large it will become once it's done copying:
|
||||||
|
|
||||||
|
// Count up how many shards are currently on each data path:
|
||||||
|
Map<Path,Integer> dataPathToShardCount = new HashMap<>();
|
||||||
|
for(IndexShard shard : this) {
|
||||||
|
Path dataPath = NodeEnvironment.shardStatePathToDataPath(shard.shardPath().getShardStatePath());
|
||||||
|
Integer curCount = dataPathToShardCount.get(dataPath);
|
||||||
|
if (curCount == null) {
|
||||||
|
curCount = 0;
|
||||||
|
}
|
||||||
|
dataPathToShardCount.put(dataPath, curCount+1);
|
||||||
|
}
|
||||||
|
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(),
|
||||||
|
dataPathToShardCount);
|
||||||
logger.debug("{} creating using a new path [{}]", shardId, path);
|
logger.debug("{} creating using a new path [{}]", shardId, path);
|
||||||
} else {
|
} else {
|
||||||
logger.debug("{} creating using an existing path [{}]", shardId, path);
|
logger.debug("{} creating using an existing path [{}]", shardId, path);
|
||||||
|
|
|
@ -199,7 +199,7 @@ public final class ShardPath {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings,
|
public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings,
|
||||||
long avgShardSizeInBytes, Iterable<IndexShard> shards) throws IOException {
|
long avgShardSizeInBytes, Map<Path,Integer> dataPathToShardCount) throws IOException {
|
||||||
|
|
||||||
final Path dataPath;
|
final Path dataPath;
|
||||||
final Path statePath;
|
final Path statePath;
|
||||||
|
@ -211,7 +211,17 @@ public final class ShardPath {
|
||||||
statePath = env.nodePaths()[0].resolve(shardId);
|
statePath = env.nodePaths()[0].resolve(shardId);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
Map<Path,Long> estReservedBytes = getEstimatedReservedBytes(env, avgShardSizeInBytes, shards);
|
long totFreeSpace = 0;
|
||||||
|
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
|
||||||
|
totFreeSpace += nodePath.fileStore.getUsableSpace();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this is a hack!! We should instead keep track of incoming (relocated) shards since we know
|
||||||
|
// how large they will be once they're done copying, instead of a silly guess for such cases:
|
||||||
|
|
||||||
|
// Very rough heurisic of how much disk space we expect the shard will use over its lifetime, the max of current average
|
||||||
|
// shard size across the cluster and 5% of the total available free space on this node:
|
||||||
|
long estShardSizeInBytes = Math.max(avgShardSizeInBytes, (long) (totFreeSpace/20.0));
|
||||||
|
|
||||||
// TODO - do we need something more extensible? Yet, this does the job for now...
|
// TODO - do we need something more extensible? Yet, this does the job for now...
|
||||||
final NodeEnvironment.NodePath[] paths = env.nodePaths();
|
final NodeEnvironment.NodePath[] paths = env.nodePaths();
|
||||||
|
@ -220,10 +230,11 @@ public final class ShardPath {
|
||||||
for (NodeEnvironment.NodePath nodePath : paths) {
|
for (NodeEnvironment.NodePath nodePath : paths) {
|
||||||
FileStore fileStore = nodePath.fileStore;
|
FileStore fileStore = nodePath.fileStore;
|
||||||
long usableBytes = fileStore.getUsableSpace();
|
long usableBytes = fileStore.getUsableSpace();
|
||||||
Long reservedBytes = estReservedBytes.get(nodePath.path);
|
|
||||||
if (reservedBytes != null) {
|
|
||||||
// Deduct estimated reserved bytes from usable space:
|
// Deduct estimated reserved bytes from usable space:
|
||||||
usableBytes -= reservedBytes;
|
Integer count = dataPathToShardCount.get(nodePath.path);
|
||||||
|
if (count != null) {
|
||||||
|
usableBytes -= estShardSizeInBytes * count;
|
||||||
}
|
}
|
||||||
if (usableBytes > maxUsableBytes) {
|
if (usableBytes > maxUsableBytes) {
|
||||||
maxUsableBytes = usableBytes;
|
maxUsableBytes = usableBytes;
|
||||||
|
|
Loading…
Reference in New Issue