diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java index a5efd0e9113..d8cad05f09d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java @@ -120,6 +120,14 @@ public class FileSystemUtils { return false; } + public static boolean deleteRecursively(File[] roots) { + boolean deleted = true; + for (File root : roots) { + deleted &= deleteRecursively(root); + } + return deleted; + } + public static boolean deleteRecursively(File root) { return deleteRecursively(root, true); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java index d259c5dad61..e917cf8437b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java @@ -492,6 +492,7 @@ public class ImmutableSettings implements Settings { * @return The builder */ public Builder putArray(String setting, String... values) { + remove(setting); int counter = 0; while (true) { String value = map.remove(setting + '.' + (counter++)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java index 953d3de29be..ae1826bc12c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java @@ -46,9 +46,9 @@ public class Environment { private final File workWithClusterFile; - private final File dataFile; + private final File[] dataFiles; - private final File dataWithClusterFile; + private final File[] dataWithClusterFiles; private final File configFile; @@ -86,12 +86,18 @@ public class Environment { } workWithClusterFile = new File(workFile, ClusterName.clusterNameFromSettings(settings).value()); - if (settings.get("path.data") != null) { - dataFile = new File(cleanPath(settings.get("path.data"))); + String[] dataPaths = settings.getAsArray("path.data"); + if (dataPaths.length > 0) { + dataFiles = new File[dataPaths.length]; + dataWithClusterFiles = new File[dataPaths.length]; + for (int i = 0; i < dataPaths.length; i++) { + dataFiles[i] = new File(dataPaths[i]); + dataWithClusterFiles[i] = new File(dataFiles[i], ClusterName.clusterNameFromSettings(settings).value()); + } } else { - dataFile = new File(homeFile, "data"); + dataFiles = new File[]{new File(homeFile, "data")}; + dataWithClusterFiles = new File[]{new File(new File(homeFile, "data"), ClusterName.clusterNameFromSettings(settings).value())}; } - dataWithClusterFile = new File(dataFile, ClusterName.clusterNameFromSettings(settings).value()); if (settings.get("path.logs") != null) { logsFile = new File(cleanPath(settings.get("path.logs"))); @@ -124,15 +130,15 @@ public class Environment { /** * The data location. */ - public File dataFile() { - return dataFile; + public File[] dataFiles() { + return dataFiles; } /** * The data location with the cluster name as a sub directory. */ - public File dataWithClusterFile() { - return dataWithClusterFile; + public File[] dataWithClusterFiles() { + return dataWithClusterFiles; } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java index f7bf85f32c5..3a4c273f673 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -38,9 +38,10 @@ import java.io.IOException; */ public class NodeEnvironment extends AbstractComponent { - private final File nodeFile; + private final File[] nodeFiles; + private final File[] nodeIndicesLocations; - private final Lock lock; + private final Lock[] locks; private final int localNodeId; @@ -48,46 +49,83 @@ public class NodeEnvironment extends AbstractComponent { super(settings); if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { - nodeFile = null; - lock = null; + nodeFiles = null; + nodeIndicesLocations = null; + locks = null; localNodeId = -1; return; } - Lock lock = null; - File dir = null; + File[] nodesFiles = new File[environment.dataWithClusterFiles().length]; + Lock[] locks = new Lock[environment.dataWithClusterFiles().length]; int localNodeId = -1; IOException lastException = null; - for (int i = 0; i < 50; i++) { - dir = new File(new File(environment.dataWithClusterFile(), "nodes"), Integer.toString(i)); - if (!dir.exists()) { - FileSystemUtils.mkdirs(dir); - } - logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); - try { - NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); - Lock tmpLock = lockFactory.makeLock("node.lock"); - boolean obtained = tmpLock.obtain(); - if (obtained) { - lock = tmpLock; - localNodeId = i; - break; - } else { - logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); + for (int possibleLockId = 0; possibleLockId < 50; possibleLockId++) { + for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) { + File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId)); + if (!dir.exists()) { + FileSystemUtils.mkdirs(dir); } - } catch (IOException e) { - logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); - lastException = e; + logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); + try { + NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); + Lock tmpLock = lockFactory.makeLock("node.lock"); + boolean obtained = tmpLock.obtain(); + if (obtained) { + locks[dirIndex] = tmpLock; + nodesFiles[dirIndex] = dir; + localNodeId = possibleLockId; + } else { + logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); + // release all the ones that were obtained up until now + for (int i = 0; i < locks.length; i++) { + if (locks[i] != null) { + try { + locks[i].release(); + } catch (Exception e1) { + // ignore + } + } + locks[i] = null; + } + break; + } + } catch (IOException e) { + logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); + lastException = e; + // release all the ones that were obtained up until now + for (int i = 0; i < locks.length; i++) { + if (locks[i] != null) { + try { + locks[i].release(); + } catch (Exception e1) { + // ignore + } + } + locks[i] = null; + } + break; + } + } + if (locks[0] != null) { + // we found a lock, break + break; } } - if (lock == null) { + if (locks[0] == null) { throw new IOException("Failed to obtain node lock", lastException); } + this.localNodeId = localNodeId; - this.lock = lock; - this.nodeFile = dir; + this.locks = locks; + this.nodeFiles = nodesFiles; if (logger.isDebugEnabled()) { - logger.debug("using node location [{}], local_node_id [{}]", dir, localNodeId); + logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId); + } + + this.nodeIndicesLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + nodeIndicesLocations[i] = new File(nodeFiles[i], "indices"); } } @@ -96,34 +134,44 @@ public class NodeEnvironment extends AbstractComponent { } public boolean hasNodeFile() { - return nodeFile != null && lock != null; + return nodeFiles != null && locks != null; } - public File nodeDataLocation() { - if (nodeFile == null || lock == null) { + public File[] nodeDataLocations() { + if (nodeFiles == null || locks == null) { throw new ElasticSearchIllegalStateException("node is not configured to store local location"); } - return nodeFile; + return nodeFiles; } - public File indicesLocation() { - return new File(nodeDataLocation(), "indices"); + public File[] indicesLocations() { + return nodeIndicesLocations; } - public File indexLocation(Index index) { - return new File(indicesLocation(), index.name()); + public File[] indexLocations(Index index) { + File[] indexLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name()); + } + return indexLocations; } - public File shardLocation(ShardId shardId) { - return new File(indexLocation(shardId.index()), Integer.toString(shardId.id())); + public File[] shardLocations(ShardId shardId) { + File[] shardLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id())); + } + return shardLocations; } public void close() { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - // ignore + if (locks != null) { + for (Lock lock : locks) { + try { + lock.release(); + } catch (IOException e) { + // ignore + } } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index e4ff0e3432c..102b0888bce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -53,7 +53,7 @@ public class FsGateway extends BlobStoreGateway { String location = componentSettings.get("location"); if (location == null) { logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes"); - gatewayFile = new File(environment.dataFile(), "gateway"); + gatewayFile = new File(environment.dataFiles()[0], "gateway"); } else { gatewayFile = new File(location); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 758d9d77266..1a96c949495 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -180,7 +180,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements } @Override public void reset() throws Exception { - FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocation()); + FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations()); } @Override public void clusterChanged(final ClusterChangedEvent event) { @@ -263,7 +263,8 @@ public class LocalGateway extends AbstractLifecycleComponent implements location = null; } else { // create the location where the state will be stored - this.location = new File(nodeEnv.nodeDataLocation(), "_state"); + // TODO: we might want to persist states on all data locations + this.location = new File(nodeEnv.nodeDataLocations()[0], "_state"); FileSystemUtils.mkdirs(this.location); if (clusterService.localNode().masterNode()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index f008039b28f..6e42255ff6e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -128,19 +128,30 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen // move an existing translog, if exists, to "recovering" state, and start reading from it FsTranslog translog = (FsTranslog) indexShard.translog(); - File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering"); - if (!recoveringTranslogFile.exists()) { - File translogFile = new File(translog.location(), "translog-" + translogId); - if (translogFile.exists()) { - for (int i = 0; i < 3; i++) { - if (translogFile.renameTo(recoveringTranslogFile)) { - break; + String translogName = "translog-" + translogId; + String recoverTranslogName = translogName + ".recovering"; + + + File recoveringTranslogFile = null; + for (File translogLocation : translog.locations()) { + File tmpRecoveringFile = new File(translogLocation, recoverTranslogName); + if (!tmpRecoveringFile.exists()) { + File tmpTranslogFile = new File(translogLocation, translogName); + if (tmpTranslogFile.exists()) { + for (int i = 0; i < 3; i++) { + if (tmpTranslogFile.renameTo(tmpRecoveringFile)) { + recoveringTranslogFile = tmpRecoveringFile; + break; + } } } + } else { + recoveringTranslogFile = tmpRecoveringFile; + break; } } - if (!recoveringTranslogFile.exists()) { + if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) { // no translog to recovery from, start and bail // no translog files, bail indexShard.start("post recovery from gateway, no translog"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index ce3882a5020..12cf19c415c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -436,7 +436,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde // delete the shard location if needed if (delete || indexGateway.type().equals(NoneGateway.TYPE)) { - FileSystemUtils.deleteRecursively(nodeEnv.shardLocation(sId)); + FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(sId)); } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 9bbd67e61a9..d890a03b365 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -35,11 +35,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.support.ForceSyncDirectory; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -161,7 +163,22 @@ public class Store extends AbstractIndexShardComponent { } } - public static Map readChecksums(Directory dir) throws IOException { + public static Map readChecksums(File[] locations) throws IOException { + for (File location : locations) { + FSDirectory directory = FSDirectory.open(location); + try { + Map checksums = readChecksums(directory, null); + if (checksums != null) { + return checksums; + } + } finally { + directory.close(); + } + } + return null; + } + + static Map readChecksums(Directory dir, Map defaultValue) throws IOException { long lastFound = -1; for (String name : dir.listAll()) { if (!isChecksum(name)) { @@ -173,7 +190,7 @@ public class Store extends AbstractIndexShardComponent { } } if (lastFound == -1) { - return ImmutableMap.of(); + return defaultValue; } IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound); try { @@ -181,7 +198,7 @@ public class Store extends AbstractIndexShardComponent { return indexInput.readStringStringMap(); } catch (Exception e) { // failed to load checksums, ignore and return an empty map - return new HashMap(); + return defaultValue; } finally { indexInput.close(); } @@ -265,7 +282,7 @@ public class Store extends AbstractIndexShardComponent { this.delegates = delegates; synchronized (mutex) { MapBuilder builder = MapBuilder.newMapBuilder(); - Map checksums = readChecksums(delegates[0]); + Map checksums = readChecksums(delegates[0], new HashMap()); for (Directory delegate : delegates) { for (String file : delegate.listAll()) { // BACKWARD CKS SUPPORT @@ -398,6 +415,8 @@ public class Store extends AbstractIndexShardComponent { if (currentSize < size) { size = currentSize; directory = delegate; + } else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) { + directory = delegate; } } else { directory = delegate; // really, make sense to have multiple directories for FS diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java index 46135e70282..c184135572a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java @@ -40,15 +40,15 @@ public abstract class FsIndexStore extends AbstractIndexStore { private final NodeEnvironment nodeEnv; - private final File location; + private final File[] locations; public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { super(index, indexSettings, indexService); this.nodeEnv = nodeEnv; if (nodeEnv.hasNodeFile()) { - this.location = nodeEnv.indexLocation(index); + this.locations = nodeEnv.indexLocations(index); } else { - this.location = null; + this.locations = null; } } @@ -57,58 +57,73 @@ public abstract class FsIndexStore extends AbstractIndexStore { } @Override public ByteSizeValue backingStoreTotalSpace() { - if (location == null) { + if (locations == null) { return new ByteSizeValue(0); } - long totalSpace = location.getTotalSpace(); - if (totalSpace == 0) { - totalSpace = 0; + long totalSpace = 0; + for (File location : locations) { + totalSpace += location.getTotalSpace(); } return new ByteSizeValue(totalSpace); } @Override public ByteSizeValue backingStoreFreeSpace() { - if (location == null) { + if (locations == null) { return new ByteSizeValue(0); } - long usableSpace = location.getUsableSpace(); - if (usableSpace == 0) { - usableSpace = 0; + long usableSpace = 0; + for (File location : locations) { + usableSpace += location.getUsableSpace(); } return new ByteSizeValue(usableSpace); } @Override public boolean canDeleteUnallocated(ShardId shardId) { - if (location == null) { + if (locations == null) { return false; } if (indexService.hasShard(shardId.id())) { return false; } - return shardLocation(shardId).exists(); + for (File location : shardLocations(shardId)) { + if (location.exists()) { + return true; + } + } + return false; } @Override public void deleteUnallocated(ShardId shardId) throws IOException { - if (location == null) { + if (locations == null) { return; } if (indexService.hasShard(shardId.id())) { throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted"); } - FileSystemUtils.deleteRecursively(shardLocation(shardId)); + FileSystemUtils.deleteRecursively(shardLocations(shardId)); } - public File shardLocation(ShardId shardId) { - return nodeEnv.shardLocation(shardId); + public File[] shardLocations(ShardId shardId) { + return nodeEnv.shardLocations(shardId); } - public File shardIndexLocation(ShardId shardId) { - return new File(shardLocation(shardId), "index"); + public File[] shardIndexLocations(ShardId shardId) { + File[] shardLocations = shardLocations(shardId); + File[] shardIndexLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardIndexLocations[i] = new File(shardLocations[i], "index"); + } + return shardIndexLocations; } // not used currently, but here to state that this store also defined a file based translog location - public File shardTranslogLocation(ShardId shardId) { - return new File(shardLocation(shardId), "translog"); + public File[] shardTranslogLocations(ShardId shardId) { + File[] shardLocations = shardLocations(shardId); + File[] shardTranslogLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardTranslogLocations[i] = new File(shardLocations[i], "translog"); + } + return shardTranslogLocations; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java index c54066fc16a..7a12ca5754e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java @@ -40,8 +40,12 @@ public class MmapFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new MMapDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new MMapDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java index e020b9a349e..5f1efb09c22 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java @@ -40,8 +40,12 @@ public class NioFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new NIOFSDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new NIOFSDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java index dee62bf2637..16514438bc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java @@ -40,8 +40,12 @@ public class SimpleFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new SimpleFSDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new SimpleFSDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 324713782fd..5489ab6824f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -44,7 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class FsTranslog extends AbstractIndexShardComponent implements Translog { private final ReadWriteLock rwl = new ReentrantReadWriteLock(); - private final File location; + private final File[] locations; private volatile FsTranslogFile current; private volatile FsTranslogFile trans; @@ -53,18 +54,22 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { super(shardId, indexSettings); - this.location = new File(nodeEnv.shardLocation(shardId), "translog"); - FileSystemUtils.mkdirs(this.location); + File[] shardLocations = nodeEnv.shardLocations(shardId); + this.locations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + locations[i] = new File(shardLocations[i], "translog"); + FileSystemUtils.mkdirs(locations[i]); + } } public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) { super(shardId, indexSettings); - this.location = location; - FileSystemUtils.mkdirs(this.location); + this.locations = new File[]{location}; + FileSystemUtils.mkdirs(location); } - public File location() { - return location; + public File[] locations() { + return locations; } @Override public long currentId() { @@ -98,19 +103,21 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Override public void clearUnreferenced() { rwl.writeLock().lock(); try { - File[] files = location.listFiles(); - if (files != null) { - for (File file : files) { - if (file.getName().equals("translog-" + current.id())) { - continue; - } - if (trans != null && file.getName().equals("translog-" + trans.id())) { - continue; - } - try { - file.delete(); - } catch (Exception e) { - // ignore + for (File location : locations) { + File[] files = location.listFiles(); + if (files != null) { + for (File file : files) { + if (file.getName().equals("translog-" + current.id())) { + continue; + } + if (trans != null && file.getName().equals("translog-" + trans.id())) { + continue; + } + try { + file.delete(); + } catch (Exception e) { + // ignore + } } } } @@ -123,6 +130,17 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog rwl.writeLock().lock(); try { FsTranslogFile newFile; + long size = Long.MAX_VALUE; + File location = null; + for (File file : locations) { + long currentFree = file.getFreeSpace(); + if (currentFree < size) { + size = currentFree; + location = file; + } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) { + location = file; + } + } try { newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { @@ -147,6 +165,17 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog rwl.writeLock().lock(); try { assert this.trans == null; + long size = Long.MAX_VALUE; + File location = null; + for (File file : locations) { + long currentFree = file.getFreeSpace(); + if (currentFree < size) { + size = currentFree; + location = file; + } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) { + location = file; + } + } this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 1ac25b53553..44c1a66cd2b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -352,7 +352,7 @@ public class InternalIndicesService extends AbstractLifecycleComponentof()); } - File indexFile = new File(nodeEnv.shardLocation(shardId), "index"); - if (!indexFile.exists()) { + File[] shardLocations = nodeEnv.shardLocations(shardId); + File[] shardIndexLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardIndexLocations[i] = new File(shardLocations[i], "index"); + } + boolean exists = false; + for (File shardIndexLocation : shardIndexLocations) { + if (shardIndexLocation.exists()) { + exists = true; + break; + } + } + if (!exists) { return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); } + + Map checksums = Store.readChecksums(shardIndexLocations); + if (checksums == null) { + checksums = ImmutableMap.of(); + } + Map files = Maps.newHashMap(); - // read the checksums file - FSDirectory directory = FSDirectory.open(indexFile); - Map checksums = null; - try { - checksums = Store.readChecksums(directory); - for (File file : indexFile.listFiles()) { + for (File shardIndexLocation : shardIndexLocations) { + File[] listedFiles = shardIndexLocation.listFiles(); + if (listedFiles == null) { + continue; + } + for (File file : listedFiles) { // BACKWARD CKS SUPPORT if (file.getName().endsWith(".cks")) { continue; @@ -183,28 +197,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio } files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName()))); } - } finally { - directory.close(); - } - - // BACKWARD CKS SUPPORT - for (File file : indexFile.listFiles()) { - if (file.getName().endsWith(".cks")) { - continue; - } - if (file.getName().startsWith("_checksums")) { - continue; - } - // try and load the checksum - String checksum = null; - File checksumFile = new File(file.getParentFile(), file.getName() + ".cks"); - if (checksumFile.exists() && (checksums == null || !checksums.containsKey(file.getName()))) { - byte[] checksumBytes = Streams.copyToByteArray(checksumFile); - if (checksumBytes.length > 0) { - checksum = Unicode.fromBytes(checksumBytes); - } - files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum)); - } } return new StoreFilesMetaData(false, shardId, files); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java index 50194a4a913..735a30db7ee 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java @@ -108,9 +108,11 @@ public class InternalSettingsPerparer { settingsBuilder = settingsBuilder().put(v1); settingsBuilder.put("path.home", cleanPath(environment.homeFile().getAbsolutePath())); settingsBuilder.put("path.work", cleanPath(environment.workFile().getAbsolutePath())); - settingsBuilder.put("path.work_with_cluster", cleanPath(environment.workWithClusterFile().getAbsolutePath())); - settingsBuilder.put("path.data", cleanPath(environment.dataFile().getAbsolutePath())); - settingsBuilder.put("path.data_with_cluster", cleanPath(environment.dataWithClusterFile().getAbsolutePath())); + String[] paths = new String[environment.dataFiles().length]; + for (int i = 0; i < environment.dataFiles().length; i++) { + paths[i] = cleanPath(environment.dataFiles()[i].getAbsolutePath()); + } + settingsBuilder.putArray("path.data", paths); settingsBuilder.put("path.logs", cleanPath(environment.logsFile().getAbsolutePath())); v1 = settingsBuilder.build(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java index 3cccbebe0b0..a75ce07e12d 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java @@ -165,7 +165,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests logger.info("Closing the server"); closeNode("server1"); logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); - FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); + FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); startNode("server1"); @@ -282,7 +282,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests closeNode("server1"); if (fullRecovery) { logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); - FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); + FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java index 133ca1d8313..7a46104c97c 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java @@ -32,8 +32,7 @@ import org.testng.annotations.Test; import java.io.File; import static org.elasticsearch.client.Requests.*; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; - +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -118,7 +117,7 @@ public class IndicesStoreTests extends AbstractNodesTests { private File shardDirectory(String server, String index, int shard) { InternalNode node = ((InternalNode) node(server)); NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class); - return env.shardLocation(new ShardId(index, shard)); + return env.shardLocations(new ShardId(index, shard))[0]; } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java index ac8e60afada..41d1b9588fc 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java @@ -197,10 +197,10 @@ public class FullRestartStressTest { client.close(); for (Node node : nodes) { - File nodeWork = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); + File[] nodeDatas = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocations(); node.close(); if (clearNodeWork && !settings.get("gateway.type").equals("local")) { - FileSystemUtils.deleteRecursively(nodeWork); + FileSystemUtils.deleteRecursively(nodeDatas); } } @@ -221,6 +221,7 @@ public class FullRestartStressTest { .put("gateway.type", "local") .put("gateway.recover_after_nodes", numberOfNodes) .put("index.number_of_shards", 1) + .put("path.data", "data/data1,data/data2") .build(); FullRestartStressTest test = new FullRestartStressTest() diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java index ed730d00576..8e6fb3567a4 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java @@ -167,7 +167,7 @@ public class RollingRestartStressTest { // start doing the rolling restart int nodeIndex = 0; while (true) { - File nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); + File[] nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocations(); nodes[nodeIndex].close(); if (clearNodeData) { FileSystemUtils.deleteRecursively(nodeData); @@ -310,7 +310,7 @@ public class RollingRestartStressTest { } private void indexDoc() throws Exception { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); XContentBuilder json = XContentFactory.jsonBuilder().startObject() .field("field", "value" + ThreadLocalRandom.current().nextInt()); @@ -341,6 +341,7 @@ public class RollingRestartStressTest { Settings settings = settingsBuilder() .put("index.shard.check_index", true) .put("gateway.type", "none") + .put("path.data", "data/data1,data/data2") .build(); RollingRestartStressTest test = new RollingRestartStressTest()