Merge pull request #16149 from s1monw/drop_multi_path_upgrader

Drop multi data path upgrade tool
This commit is contained in:
Simon Willnauer 2016-01-21 17:50:10 +01:00
commit 11f5a2ebaf
4 changed files with 2 additions and 688 deletions

View File

@ -1,383 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardStateMetaData;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*/
public class MultiDataPathUpgrader {
private final NodeEnvironment nodeEnvironment;
private final ESLogger logger = Loggers.getLogger(getClass());
/**
* Creates a new upgrader instance
* @param nodeEnvironment the node env to operate on.
*
*/
public MultiDataPathUpgrader(NodeEnvironment nodeEnvironment) {
this.nodeEnvironment = nodeEnvironment;
}
/**
* Upgrades the given shard Id from multiple shard paths into the given target path.
*
* @see #pickShardPath(org.elasticsearch.index.shard.ShardId)
*/
public void upgrade(ShardId shard, ShardPath targetPath) throws IOException {
final Path[] paths = nodeEnvironment.availableShardPaths(shard); // custom data path doesn't need upgrading
if (isTargetPathConfigured(paths, targetPath) == false) {
throw new IllegalArgumentException("shard path must be one of the shards data paths");
}
assert needsUpgrading(shard) : "Should not upgrade a path that needs no upgrading";
logger.info("{} upgrading multi data dir to {}", shard, targetPath.getDataPath());
final ShardStateMetaData loaded = ShardStateMetaData.FORMAT.loadLatestState(logger, paths);
if (loaded == null) {
throw new IllegalStateException(shard + " no shard state found in any of: " + Arrays.toString(paths) + " please check and remove them if possible");
}
logger.info("{} loaded shard state {}", shard, loaded);
ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath());
Files.createDirectories(targetPath.resolveIndex());
try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) {
try (final Lock lock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
} catch (LockObtainFailedException ex) {
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex(), ex);
}
}
upgradeFiles(shard, targetPath, targetPath.resolveTranslog(), ShardPath.TRANSLOG_FOLDER_NAME, paths);
logger.info("{} wipe upgraded directories", shard);
for (Path path : paths) {
if (path.equals(targetPath.getShardStatePath()) == false) {
logger.info("{} wipe shard directories: [{}]", shard, path);
IOUtils.rm(path);
}
}
if (FileSystemUtils.files(targetPath.resolveIndex()).length == 0) {
throw new IllegalStateException("index folder [" + targetPath.resolveIndex() + "] is empty");
}
if (FileSystemUtils.files(targetPath.resolveTranslog()).length == 0) {
throw new IllegalStateException("translog folder [" + targetPath.resolveTranslog() + "] is empty");
}
}
/**
* Runs check-index on the target shard and throws an exception if it failed
*/
public void checkIndex(ShardPath targetPath) throws IOException {
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, StandardCharsets.UTF_8.name());
try (Directory directory = new SimpleFSDirectory(targetPath.resolveIndex());
final CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(out);
CheckIndex.Status status = checkIndex.checkIndex();
out.flush();
if (!status.clean) {
logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), StandardCharsets.UTF_8));
throw new IllegalStateException("index check failure");
}
}
}
/**
* Returns true iff the given shard needs upgrading.
*/
public boolean needsUpgrading(ShardId shard) {
final Path[] paths = nodeEnvironment.availableShardPaths(shard);
// custom data path doesn't need upgrading neither single path envs
if (paths.length > 1) {
int numPathsExist = 0;
for (Path path : paths) {
if (Files.exists(path.resolve(MetaDataStateFormat.STATE_DIR_NAME))) {
numPathsExist++;
if (numPathsExist > 1) {
return true;
}
}
}
}
return false;
}
/**
* Picks a target ShardPath to allocate and upgrade the given shard to. It picks the target based on a simple
* heuristic:
* <ul>
* <li>if the smallest datapath has 2x more space available that the shards total size the datapath with the most bytes for that shard is picked to minimize the amount of bytes to copy</li>
* <li>otherwise the largest available datapath is used as the target no matter how big of a slice of the shard it already holds.</li>
* </ul>
*/
public ShardPath pickShardPath(ShardId shard) throws IOException {
if (needsUpgrading(shard) == false) {
throw new IllegalStateException("Shard doesn't need upgrading");
}
final NodeEnvironment.NodePath[] paths = nodeEnvironment.nodePaths();
// if we need upgrading make sure we have all paths.
for (NodeEnvironment.NodePath path : paths) {
Files.createDirectories(path.resolve(shard));
}
final ShardFileInfo[] shardFileInfo = getShardFileInfo(shard, paths);
long totalBytesUsedByShard = 0;
long leastUsableSpace = Long.MAX_VALUE;
long mostUsableSpace = Long.MIN_VALUE;
assert shardFileInfo.length == nodeEnvironment.availableShardPaths(shard).length;
for (ShardFileInfo info : shardFileInfo) {
totalBytesUsedByShard += info.spaceUsedByShard;
leastUsableSpace = Math.min(leastUsableSpace, info.usableSpace + info.spaceUsedByShard);
mostUsableSpace = Math.max(mostUsableSpace, info.usableSpace + info.spaceUsedByShard);
}
if (mostUsableSpace < totalBytesUsedByShard) {
throw new IllegalStateException("Can't upgrade path available space: " + new ByteSizeValue(mostUsableSpace) + " required space: " + new ByteSizeValue(totalBytesUsedByShard));
}
ShardFileInfo target = shardFileInfo[0];
if (leastUsableSpace >= (2 * totalBytesUsedByShard)) {
for (ShardFileInfo info : shardFileInfo) {
if (info.spaceUsedByShard > target.spaceUsedByShard) {
target = info;
}
}
} else {
for (ShardFileInfo info : shardFileInfo) {
if (info.usableSpace > target.usableSpace) {
target = info;
}
}
}
return new ShardPath(false, target.path, target.path, IndexMetaData.INDEX_UUID_NA_VALUE /* we don't know */, shard);
}
private ShardFileInfo[] getShardFileInfo(ShardId shard, NodeEnvironment.NodePath[] paths) throws IOException {
final ShardFileInfo[] info = new ShardFileInfo[paths.length];
for (int i = 0; i < info.length; i++) {
Path path = paths[i].resolve(shard);
final long usabelSpace = getUsabelSpace(paths[i]);
info[i] = new ShardFileInfo(path, usabelSpace, getSpaceUsedByShard(path));
}
return info;
}
protected long getSpaceUsedByShard(Path path) throws IOException {
final long[] spaceUsedByShard = new long[] {0};
if (Files.exists(path)) {
Files.walkFileTree(path, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (attrs.isRegularFile()) {
spaceUsedByShard[0] += attrs.size();
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
});
}
return spaceUsedByShard[0];
}
protected long getUsabelSpace(NodeEnvironment.NodePath path) throws IOException {
FileStore fileStore = path.fileStore;
return fileStore.getUsableSpace();
}
static class ShardFileInfo {
final Path path;
final long usableSpace;
final long spaceUsedByShard;
ShardFileInfo(Path path, long usableSpace, long spaceUsedByShard) {
this.path = path;
this.usableSpace = usableSpace;
this.spaceUsedByShard = spaceUsedByShard;
}
}
private void upgradeFiles(ShardId shard, ShardPath targetPath, final Path targetDir, String folderName, Path[] paths) throws IOException {
List<Path> movedFiles = new ArrayList<>();
for (Path path : paths) {
if (path.equals(targetPath.getDataPath()) == false) {
final Path sourceDir = path.resolve(folderName);
if (Files.exists(sourceDir)) {
logger.info("{} upgrading [{}] from [{}] to [{}]", shard, folderName, sourceDir, targetDir);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(sourceDir)) {
Files.createDirectories(targetDir);
for (Path file : stream) {
if (IndexWriter.WRITE_LOCK_NAME.equals(file.getFileName().toString()) || Files.isDirectory(file)) {
continue; // skip write.lock
}
logger.info("{} move file [{}] size: [{}]", shard, file.getFileName(), Files.size(file));
final Path targetFile = targetDir.resolve(file.getFileName());
/* We are pessimistic and do a copy first to the other path and then and atomic move to rename it such that
in the worst case the file exists twice but is never lost or half written.*/
final Path targetTempFile = Files.createTempFile(targetDir, "upgrade_", "_" + file.getFileName().toString());
Files.copy(file, targetTempFile, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING);
Files.move(targetTempFile, targetFile, StandardCopyOption.ATOMIC_MOVE); // we are on the same FS - this must work otherwise all bets are off
Files.delete(file);
movedFiles.add(targetFile);
}
}
}
}
}
if (movedFiles.isEmpty() == false) {
// fsync later it might be on disk already
logger.info("{} fsync files", shard);
for (Path moved : movedFiles) {
logger.info("{} syncing [{}]", shard, moved.getFileName());
IOUtils.fsync(moved, false);
}
logger.info("{} syncing directory [{}]", shard, targetDir);
IOUtils.fsync(targetDir, true);
}
}
/**
* Returns <code>true</code> iff the target path is one of the given paths.
*/
private boolean isTargetPathConfigured(final Path[] paths, ShardPath targetPath) {
for (Path path : paths) {
if (path.equals(targetPath.getDataPath())) {
return true;
}
}
return false;
}
/**
* Runs an upgrade on all shards located under the given node environment if there is more than 1 data.path configured
* otherwise this method will return immediately.
*/
public static void upgradeMultiDataPath(NodeEnvironment nodeEnv, ESLogger logger) throws IOException {
if (nodeEnv.nodeDataPaths().length > 1) {
final MultiDataPathUpgrader upgrader = new MultiDataPathUpgrader(nodeEnv);
final Set<String> allIndices = nodeEnv.findAllIndices();
for (String index : allIndices) {
for (ShardId shardId : findAllShardIds(nodeEnv.indexPaths(new Index(index)))) {
try (ShardLock lock = nodeEnv.shardLock(shardId, 0)) {
if (upgrader.needsUpgrading(shardId)) {
final ShardPath shardPath = upgrader.pickShardPath(shardId);
upgrader.upgrade(shardId, shardPath);
// we have to check if the index path exists since we might
// have only upgraded the shard state that is written under /indexname/shardid/_state
// in the case we upgraded a dedicated index directory index
if (Files.exists(shardPath.resolveIndex())) {
upgrader.checkIndex(shardPath);
}
} else {
logger.debug("{} no upgrade needed - already upgraded");
}
}
}
}
}
}
private static Set<ShardId> findAllShardIds(Path... locations) throws IOException {
final Set<ShardId> shardIds = new HashSet<>();
for (final Path location : locations) {
if (Files.isDirectory(location)) {
shardIds.addAll(findAllShardsForIndex(location));
}
}
return shardIds;
}
private static Set<ShardId> findAllShardsForIndex(Path indexPath) throws IOException {
Set<ShardId> shardIds = new HashSet<>();
if (Files.isDirectory(indexPath)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
String currentIndex = indexPath.getFileName().toString();
for (Path shardPath : stream) {
String fileName = shardPath.getFileName().toString();
if (Files.isDirectory(shardPath) && fileName.chars().allMatch(Character::isDigit)) {
int shardId = Integer.parseInt(fileName);
ShardId id = new ShardId(currentIndex, shardId);
shardIds.add(id);
}
}
}
}
return shardIds;
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
@ -77,7 +76,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (DiscoveryNode.dataNode(settings)) { if (DiscoveryNode.dataNode(settings)) {
ensureNoPre019ShardState(nodeEnv); ensureNoPre019ShardState(nodeEnv);
MultiDataPathUpgrader.upgradeMultiDataPath(nodeEnv, logger);
} }
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -208,10 +207,6 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
} }
void importIndex(String indexName) throws IOException { void importIndex(String indexName) throws IOException {
final Iterable<NodeEnvironment> instances = internalCluster().getInstances(NodeEnvironment.class);
for (NodeEnvironment nodeEnv : instances) { // upgrade multidata path
MultiDataPathUpgrader.upgradeMultiDataPath(nodeEnv, logger);
}
// force reloading dangling indices with a cluster state republish // force reloading dangling indices with a cluster state republish
client().admin().cluster().prepareReroute().get(); client().admin().cluster().prepareReroute().get();
ensureGreen(indexName); ensureGreen(indexName);
@ -219,6 +214,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
// randomly distribute the files from src over dests paths // randomly distribute the files from src over dests paths
public static void copyIndex(final ESLogger logger, final Path src, final String indexName, final Path... dests) throws IOException { public static void copyIndex(final ESLogger logger, final Path src, final String indexName, final Path... dests) throws IOException {
Path destinationDataPath = dests[randomInt(dests.length - 1)];
for (Path dest : dests) { for (Path dest : dests) {
Path indexDir = dest.resolve(indexName); Path indexDir = dest.resolve(indexName);
assertFalse(Files.exists(indexDir)); assertFalse(Files.exists(indexDir));
@ -244,7 +240,7 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
} }
Path relativeFile = src.relativize(file); Path relativeFile = src.relativize(file);
Path destFile = dests[randomInt(dests.length - 1)].resolve(indexName).resolve(relativeFile); Path destFile = destinationDataPath.resolve(indexName).resolve(relativeFile);
logger.trace("--> Moving " + relativeFile.toString() + " to " + destFile.toString()); logger.trace("--> Moving " + relativeFile.toString() + " to " + destFile.toString());
Files.move(file, destFile); Files.move(file, destFile);
assertFalse(Files.exists(file)); assertFalse(Files.exists(file));

View File

@ -1,297 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.test.ESTestCase;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
*/
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class MultiDataPathUpgraderTests extends ESTestCase {
public void testUpgradeRandomPaths() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
final String uuid = Strings.base64UUID();
final ShardId shardId = new ShardId("foo", 0);
final Path[] shardDataPaths = nodeEnvironment.availableShardPaths(shardId);
if (nodeEnvironment.nodeDataPaths().length == 1) {
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
assertFalse(helper.needsUpgrading(shardId));
return;
}
int numIdxFiles = 0;
int numTranslogFiles = 0;
int metaStateVersion = 0;
for (Path shardPath : shardDataPaths) {
final Path translog = shardPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
final Path idx = shardPath.resolve(ShardPath.INDEX_FOLDER_NAME);
Files.createDirectories(translog);
Files.createDirectories(idx);
int numFiles = randomIntBetween(1, 10);
for (int i = 0; i < numFiles; i++, numIdxFiles++) {
String filename = Integer.toString(numIdxFiles);
try (BufferedWriter w = Files.newBufferedWriter(idx.resolve(filename + ".tst"), StandardCharsets.UTF_8)) {
w.write(filename);
}
}
numFiles = randomIntBetween(1, 10);
for (int i = 0; i < numFiles; i++, numTranslogFiles++) {
String filename = Integer.toString(numTranslogFiles);
try (BufferedWriter w = Files.newBufferedWriter(translog.resolve(filename + ".translog"), StandardCharsets.UTF_8)) {
w.write(filename);
}
}
++metaStateVersion;
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(metaStateVersion, true, uuid, AllocationId.newInitializing()), metaStateVersion, shardDataPaths);
}
final Path path = randomFrom(shardDataPaths);
ShardPath targetPath = new ShardPath(false, path, path, uuid, new ShardId("foo", 0));
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
helper.upgrade(shardId, targetPath);
assertFalse(helper.needsUpgrading(shardId));
if (shardDataPaths.length > 1) {
for (Path shardPath : shardDataPaths) {
if (shardPath.equals(targetPath.getDataPath())) {
continue;
}
final Path translog = shardPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
final Path idx = shardPath.resolve(ShardPath.INDEX_FOLDER_NAME);
final Path state = shardPath.resolve(MetaDataStateFormat.STATE_DIR_NAME);
assertFalse(Files.exists(translog));
assertFalse(Files.exists(idx));
assertFalse(Files.exists(state));
assertFalse(Files.exists(shardPath));
}
}
final ShardStateMetaData stateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, targetPath.getShardStatePath());
assertEquals(metaStateVersion, stateMetaData.version);
assertTrue(stateMetaData.primary);
assertEquals(uuid, stateMetaData.indexUUID);
final Path translog = targetPath.getDataPath().resolve(ShardPath.TRANSLOG_FOLDER_NAME);
final Path idx = targetPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
Files.deleteIfExists(idx.resolve("write.lock"));
assertEquals(numTranslogFiles, FileSystemUtils.files(translog).length);
assertEquals(numIdxFiles, FileSystemUtils.files(idx).length);
final HashSet<Path> translogFiles = Sets.newHashSet(FileSystemUtils.files(translog));
for (int i = 0; i < numTranslogFiles; i++) {
final String name = Integer.toString(i);
translogFiles.contains(translog.resolve(name + ".translog"));
byte[] content = Files.readAllBytes(translog.resolve(name + ".translog"));
assertEquals(name , new String(content, StandardCharsets.UTF_8));
}
final HashSet<Path> idxFiles = Sets.newHashSet(FileSystemUtils.files(idx));
for (int i = 0; i < numIdxFiles; i++) {
final String name = Integer.toString(i);
idxFiles.contains(idx.resolve(name + ".tst"));
byte[] content = Files.readAllBytes(idx.resolve(name + ".tst"));
assertEquals(name , new String(content, StandardCharsets.UTF_8));
}
}
}
/**
* Run upgrade on a real bwc index
*/
public void testUpgradeRealIndex() throws IOException, URISyntaxException {
List<Path> indexes = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(getBwcIndicesPath(), "index-*.zip")) {
for (Path path : stream) {
indexes.add(path);
}
}
CollectionUtil.introSort(indexes, new Comparator<Path>() {
@Override
public int compare(Path o1, Path o2) {
return o1.getFileName().compareTo(o2.getFileName());
}
});
final ShardId shardId = new ShardId("test", 0);
final Path path = randomFrom(indexes);
final Path indexFile = path;
final String indexName = indexFile.getFileName().toString().replace(".zip", "").toLowerCase(Locale.ROOT);
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
if (nodeEnvironment.nodeDataPaths().length == 1) {
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
assertFalse(helper.needsUpgrading(shardId));
return;
}
Path unzipDir = createTempDir();
Path unzipDataDir = unzipDir.resolve("data");
// decompress the index
try (InputStream stream = Files.newInputStream(indexFile)) {
TestUtil.unzip(stream, unzipDir);
}
// check it is unique
assertTrue(Files.exists(unzipDataDir));
Path[] list = FileSystemUtils.files(unzipDataDir);
if (list.length != 1) {
throw new IllegalStateException("Backwards index must contain exactly one cluster but was " + list.length);
}
// the bwc scripts packs the indices under this path
Path src = list[0].resolve("nodes/0/indices/" + indexName);
assertTrue("[" + indexFile + "] missing index dir: " + src.toString(), Files.exists(src));
Path[] multiDataPath = new Path[nodeEnvironment.nodeDataPaths().length];
int i = 0;
for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) {
multiDataPath[i++] = nodePath.indicesPath;
}
logger.info("--> injecting index [{}] into multiple data paths", indexName);
OldIndexBackwardsCompatibilityIT.copyIndex(logger, src, indexName, multiDataPath);
final ShardPath shardPath = new ShardPath(false, nodeEnvironment.availableShardPaths(new ShardId(indexName, 0))[0], nodeEnvironment.availableShardPaths(new ShardId(indexName, 0))[0], IndexMetaData.INDEX_UUID_NA_VALUE, new ShardId(indexName, 0));
logger.info("{}", (Object)FileSystemUtils.files(shardPath.resolveIndex()));
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
helper.upgrade(new ShardId(indexName, 0), shardPath);
helper.checkIndex(shardPath);
assertFalse(helper.needsUpgrading(new ShardId(indexName, 0)));
}
}
public void testNeedsUpgrade() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
String uuid = Strings.randomBase64UUID();
final ShardId shardId = new ShardId("foo", 0);
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid, AllocationId.newInitializing()), 1, nodeEnvironment.availableShardPaths(shardId));
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
boolean multiDataPaths = nodeEnvironment.nodeDataPaths().length > 1;
boolean needsUpgrading = helper.needsUpgrading(shardId);
if (multiDataPaths) {
assertTrue(needsUpgrading);
} else {
assertFalse(needsUpgrading);
}
}
}
public void testPickTargetShardPath() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
final ShardId shard = new ShardId("foo", 0);
final Path[] paths = nodeEnvironment.availableShardPaths(shard);
if (paths.length == 1) {
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
try {
helper.pickShardPath(new ShardId("foo", 0));
fail("one path needs no upgrading");
} catch (IllegalStateException ex) {
// only one path
}
} else {
final Map<Path, Tuple<Long, Long>> pathToSpace = new HashMap<>();
final Path expectedPath;
if (randomBoolean()) { // path with most of the file bytes
expectedPath = randomFrom(paths);
long[] used = new long[paths.length];
long sumSpaceUsed = 0;
for (int i = 0; i < used.length; i++) {
long spaceUsed = paths[i] == expectedPath ? randomIntBetween(101, 200) : randomIntBetween(10, 100);
sumSpaceUsed += spaceUsed;
used[i] = spaceUsed;
}
for (int i = 0; i < used.length; i++) {
long availalbe = randomIntBetween((int)(2*sumSpaceUsed-used[i]), 4 * (int)sumSpaceUsed);
pathToSpace.put(paths[i], new Tuple<>(availalbe, used[i]));
}
} else { // path with largest available space
expectedPath = randomFrom(paths);
long[] used = new long[paths.length];
long sumSpaceUsed = 0;
for (int i = 0; i < used.length; i++) {
long spaceUsed = randomIntBetween(10, 100);
sumSpaceUsed += spaceUsed;
used[i] = spaceUsed;
}
for (int i = 0; i < used.length; i++) {
long availalbe = paths[i] == expectedPath ? randomIntBetween((int)(sumSpaceUsed), (int)(2*sumSpaceUsed)) : randomIntBetween(0, (int)(sumSpaceUsed) - 1) ;
pathToSpace.put(paths[i], new Tuple<>(availalbe, used[i]));
}
}
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment) {
@Override
protected long getUsabelSpace(NodeEnvironment.NodePath path) throws IOException {
return pathToSpace.get(path.resolve(shard)).v1();
}
@Override
protected long getSpaceUsedByShard(Path path) throws IOException {
return pathToSpace.get(path).v2();
}
};
String uuid = Strings.randomBase64UUID();
ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid, AllocationId.newInitializing()), 1, paths);
final ShardPath shardPath = helper.pickShardPath(new ShardId("foo", 0));
assertEquals(expectedPath, shardPath.getDataPath());
assertEquals(expectedPath, shardPath.getShardStatePath());
}
MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment) {
@Override
protected long getUsabelSpace(NodeEnvironment.NodePath path) throws IOException {
return randomIntBetween(0, 10);
}
@Override
protected long getSpaceUsedByShard(Path path) throws IOException {
return randomIntBetween(11, 20);
}
};
try {
helper.pickShardPath(new ShardId("foo", 0));
fail("not enough space");
} catch (IllegalStateException ex) {
// not enough space
}
}
}
}