Allow to stripe the data location over multiple locations, closes #1356.

This commit is contained in:
Shay Banon 2011-09-23 00:35:59 +03:00
parent c1ca21f4d5
commit 8d7aaa704a
22 changed files with 326 additions and 178 deletions

View File

@ -120,6 +120,14 @@ public class FileSystemUtils {
return false; return false;
} }
public static boolean deleteRecursively(File[] roots) {
boolean deleted = true;
for (File root : roots) {
deleted &= deleteRecursively(root);
}
return deleted;
}
public static boolean deleteRecursively(File root) { public static boolean deleteRecursively(File root) {
return deleteRecursively(root, true); return deleteRecursively(root, true);
} }

View File

@ -492,6 +492,7 @@ public class ImmutableSettings implements Settings {
* @return The builder * @return The builder
*/ */
public Builder putArray(String setting, String... values) { public Builder putArray(String setting, String... values) {
remove(setting);
int counter = 0; int counter = 0;
while (true) { while (true) {
String value = map.remove(setting + '.' + (counter++)); String value = map.remove(setting + '.' + (counter++));

View File

@ -46,9 +46,9 @@ public class Environment {
private final File workWithClusterFile; private final File workWithClusterFile;
private final File dataFile; private final File[] dataFiles;
private final File dataWithClusterFile; private final File[] dataWithClusterFiles;
private final File configFile; private final File configFile;
@ -86,12 +86,18 @@ public class Environment {
} }
workWithClusterFile = new File(workFile, ClusterName.clusterNameFromSettings(settings).value()); workWithClusterFile = new File(workFile, ClusterName.clusterNameFromSettings(settings).value());
if (settings.get("path.data") != null) { String[] dataPaths = settings.getAsArray("path.data");
dataFile = new File(cleanPath(settings.get("path.data"))); if (dataPaths.length > 0) {
dataFiles = new File[dataPaths.length];
dataWithClusterFiles = new File[dataPaths.length];
for (int i = 0; i < dataPaths.length; i++) {
dataFiles[i] = new File(dataPaths[i]);
dataWithClusterFiles[i] = new File(dataFiles[i], ClusterName.clusterNameFromSettings(settings).value());
}
} else { } else {
dataFile = new File(homeFile, "data"); dataFiles = new File[]{new File(homeFile, "data")};
dataWithClusterFiles = new File[]{new File(new File(homeFile, "data"), ClusterName.clusterNameFromSettings(settings).value())};
} }
dataWithClusterFile = new File(dataFile, ClusterName.clusterNameFromSettings(settings).value());
if (settings.get("path.logs") != null) { if (settings.get("path.logs") != null) {
logsFile = new File(cleanPath(settings.get("path.logs"))); logsFile = new File(cleanPath(settings.get("path.logs")));
@ -124,15 +130,15 @@ public class Environment {
/** /**
* The data location. * The data location.
*/ */
public File dataFile() { public File[] dataFiles() {
return dataFile; return dataFiles;
} }
/** /**
* The data location with the cluster name as a sub directory. * The data location with the cluster name as a sub directory.
*/ */
public File dataWithClusterFile() { public File[] dataWithClusterFiles() {
return dataWithClusterFile; return dataWithClusterFiles;
} }
/** /**

View File

@ -38,9 +38,10 @@ import java.io.IOException;
*/ */
public class NodeEnvironment extends AbstractComponent { public class NodeEnvironment extends AbstractComponent {
private final File nodeFile; private final File[] nodeFiles;
private final File[] nodeIndicesLocations;
private final Lock lock; private final Lock[] locks;
private final int localNodeId; private final int localNodeId;
@ -48,46 +49,83 @@ public class NodeEnvironment extends AbstractComponent {
super(settings); super(settings);
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodeFile = null; nodeFiles = null;
lock = null; nodeIndicesLocations = null;
locks = null;
localNodeId = -1; localNodeId = -1;
return; return;
} }
Lock lock = null; File[] nodesFiles = new File[environment.dataWithClusterFiles().length];
File dir = null; Lock[] locks = new Lock[environment.dataWithClusterFiles().length];
int localNodeId = -1; int localNodeId = -1;
IOException lastException = null; IOException lastException = null;
for (int i = 0; i < 50; i++) { for (int possibleLockId = 0; possibleLockId < 50; possibleLockId++) {
dir = new File(new File(environment.dataWithClusterFile(), "nodes"), Integer.toString(i)); for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
if (!dir.exists()) { File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId));
FileSystemUtils.mkdirs(dir); if (!dir.exists()) {
} FileSystemUtils.mkdirs(dir);
logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath());
try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
lock = tmpLock;
localNodeId = i;
break;
} else {
logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath());
} }
} catch (IOException e) { logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath());
logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); try {
lastException = e; NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
locks[dirIndex] = tmpLock;
nodesFiles[dirIndex] = dir;
localNodeId = possibleLockId;
} else {
logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath());
// release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) {
if (locks[i] != null) {
try {
locks[i].release();
} catch (Exception e1) {
// ignore
}
}
locks[i] = null;
}
break;
}
} catch (IOException e) {
logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath());
lastException = e;
// release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) {
if (locks[i] != null) {
try {
locks[i].release();
} catch (Exception e1) {
// ignore
}
}
locks[i] = null;
}
break;
}
}
if (locks[0] != null) {
// we found a lock, break
break;
} }
} }
if (lock == null) { if (locks[0] == null) {
throw new IOException("Failed to obtain node lock", lastException); throw new IOException("Failed to obtain node lock", lastException);
} }
this.localNodeId = localNodeId; this.localNodeId = localNodeId;
this.lock = lock; this.locks = locks;
this.nodeFile = dir; this.nodeFiles = nodesFiles;
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using node location [{}], local_node_id [{}]", dir, localNodeId); logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId);
}
this.nodeIndicesLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
nodeIndicesLocations[i] = new File(nodeFiles[i], "indices");
} }
} }
@ -96,34 +134,44 @@ public class NodeEnvironment extends AbstractComponent {
} }
public boolean hasNodeFile() { public boolean hasNodeFile() {
return nodeFile != null && lock != null; return nodeFiles != null && locks != null;
} }
public File nodeDataLocation() { public File[] nodeDataLocations() {
if (nodeFile == null || lock == null) { if (nodeFiles == null || locks == null) {
throw new ElasticSearchIllegalStateException("node is not configured to store local location"); throw new ElasticSearchIllegalStateException("node is not configured to store local location");
} }
return nodeFile; return nodeFiles;
} }
public File indicesLocation() { public File[] indicesLocations() {
return new File(nodeDataLocation(), "indices"); return nodeIndicesLocations;
} }
public File indexLocation(Index index) { public File[] indexLocations(Index index) {
return new File(indicesLocation(), index.name()); File[] indexLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name());
}
return indexLocations;
} }
public File shardLocation(ShardId shardId) { public File[] shardLocations(ShardId shardId) {
return new File(indexLocation(shardId.index()), Integer.toString(shardId.id())); File[] shardLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id()));
}
return shardLocations;
} }
public void close() { public void close() {
if (lock != null) { if (locks != null) {
try { for (Lock lock : locks) {
lock.release(); try {
} catch (IOException e) { lock.release();
// ignore } catch (IOException e) {
// ignore
}
} }
} }
} }

View File

@ -53,7 +53,7 @@ public class FsGateway extends BlobStoreGateway {
String location = componentSettings.get("location"); String location = componentSettings.get("location");
if (location == null) { if (location == null) {
logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes"); logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes");
gatewayFile = new File(environment.dataFile(), "gateway"); gatewayFile = new File(environment.dataFiles()[0], "gateway");
} else { } else {
gatewayFile = new File(location); gatewayFile = new File(location);
} }

View File

@ -180,7 +180,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
} }
@Override public void reset() throws Exception { @Override public void reset() throws Exception {
FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocation()); FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations());
} }
@Override public void clusterChanged(final ClusterChangedEvent event) { @Override public void clusterChanged(final ClusterChangedEvent event) {
@ -263,7 +263,8 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
location = null; location = null;
} else { } else {
// create the location where the state will be stored // create the location where the state will be stored
this.location = new File(nodeEnv.nodeDataLocation(), "_state"); // TODO: we might want to persist states on all data locations
this.location = new File(nodeEnv.nodeDataLocations()[0], "_state");
FileSystemUtils.mkdirs(this.location); FileSystemUtils.mkdirs(this.location);
if (clusterService.localNode().masterNode()) { if (clusterService.localNode().masterNode()) {

View File

@ -128,19 +128,30 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
// move an existing translog, if exists, to "recovering" state, and start reading from it // move an existing translog, if exists, to "recovering" state, and start reading from it
FsTranslog translog = (FsTranslog) indexShard.translog(); FsTranslog translog = (FsTranslog) indexShard.translog();
File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering"); String translogName = "translog-" + translogId;
if (!recoveringTranslogFile.exists()) { String recoverTranslogName = translogName + ".recovering";
File translogFile = new File(translog.location(), "translog-" + translogId);
if (translogFile.exists()) {
for (int i = 0; i < 3; i++) { File recoveringTranslogFile = null;
if (translogFile.renameTo(recoveringTranslogFile)) { for (File translogLocation : translog.locations()) {
break; File tmpRecoveringFile = new File(translogLocation, recoverTranslogName);
if (!tmpRecoveringFile.exists()) {
File tmpTranslogFile = new File(translogLocation, translogName);
if (tmpTranslogFile.exists()) {
for (int i = 0; i < 3; i++) {
if (tmpTranslogFile.renameTo(tmpRecoveringFile)) {
recoveringTranslogFile = tmpRecoveringFile;
break;
}
} }
} }
} else {
recoveringTranslogFile = tmpRecoveringFile;
break;
} }
} }
if (!recoveringTranslogFile.exists()) { if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail // no translog to recovery from, start and bail
// no translog files, bail // no translog files, bail
indexShard.start("post recovery from gateway, no translog"); indexShard.start("post recovery from gateway, no translog");

View File

@ -436,7 +436,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
// delete the shard location if needed // delete the shard location if needed
if (delete || indexGateway.type().equals(NoneGateway.TYPE)) { if (delete || indexGateway.type().equals(NoneGateway.TYPE)) {
FileSystemUtils.deleteRecursively(nodeEnv.shardLocation(sId)); FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(sId));
} }
} }
} }

View File

@ -35,11 +35,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory; import org.elasticsearch.index.store.support.ForceSyncDirectory;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -161,7 +163,22 @@ public class Store extends AbstractIndexShardComponent {
} }
} }
public static Map<String, String> readChecksums(Directory dir) throws IOException { public static Map<String, String> readChecksums(File[] locations) throws IOException {
for (File location : locations) {
FSDirectory directory = FSDirectory.open(location);
try {
Map<String, String> checksums = readChecksums(directory, null);
if (checksums != null) {
return checksums;
}
} finally {
directory.close();
}
}
return null;
}
static Map<String, String> readChecksums(Directory dir, Map<String, String> defaultValue) throws IOException {
long lastFound = -1; long lastFound = -1;
for (String name : dir.listAll()) { for (String name : dir.listAll()) {
if (!isChecksum(name)) { if (!isChecksum(name)) {
@ -173,7 +190,7 @@ public class Store extends AbstractIndexShardComponent {
} }
} }
if (lastFound == -1) { if (lastFound == -1) {
return ImmutableMap.of(); return defaultValue;
} }
IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound); IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound);
try { try {
@ -181,7 +198,7 @@ public class Store extends AbstractIndexShardComponent {
return indexInput.readStringStringMap(); return indexInput.readStringStringMap();
} catch (Exception e) { } catch (Exception e) {
// failed to load checksums, ignore and return an empty map // failed to load checksums, ignore and return an empty map
return new HashMap<String, String>(); return defaultValue;
} finally { } finally {
indexInput.close(); indexInput.close();
} }
@ -265,7 +282,7 @@ public class Store extends AbstractIndexShardComponent {
this.delegates = delegates; this.delegates = delegates;
synchronized (mutex) { synchronized (mutex) {
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder(); MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
Map<String, String> checksums = readChecksums(delegates[0]); Map<String, String> checksums = readChecksums(delegates[0], new HashMap<String, String>());
for (Directory delegate : delegates) { for (Directory delegate : delegates) {
for (String file : delegate.listAll()) { for (String file : delegate.listAll()) {
// BACKWARD CKS SUPPORT // BACKWARD CKS SUPPORT
@ -398,6 +415,8 @@ public class Store extends AbstractIndexShardComponent {
if (currentSize < size) { if (currentSize < size) {
size = currentSize; size = currentSize;
directory = delegate; directory = delegate;
} else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) {
directory = delegate;
} }
} else { } else {
directory = delegate; // really, make sense to have multiple directories for FS directory = delegate; // really, make sense to have multiple directories for FS

View File

@ -40,15 +40,15 @@ public abstract class FsIndexStore extends AbstractIndexStore {
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final File location; private final File[] locations;
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
this.location = nodeEnv.indexLocation(index); this.locations = nodeEnv.indexLocations(index);
} else { } else {
this.location = null; this.locations = null;
} }
} }
@ -57,58 +57,73 @@ public abstract class FsIndexStore extends AbstractIndexStore {
} }
@Override public ByteSizeValue backingStoreTotalSpace() { @Override public ByteSizeValue backingStoreTotalSpace() {
if (location == null) { if (locations == null) {
return new ByteSizeValue(0); return new ByteSizeValue(0);
} }
long totalSpace = location.getTotalSpace(); long totalSpace = 0;
if (totalSpace == 0) { for (File location : locations) {
totalSpace = 0; totalSpace += location.getTotalSpace();
} }
return new ByteSizeValue(totalSpace); return new ByteSizeValue(totalSpace);
} }
@Override public ByteSizeValue backingStoreFreeSpace() { @Override public ByteSizeValue backingStoreFreeSpace() {
if (location == null) { if (locations == null) {
return new ByteSizeValue(0); return new ByteSizeValue(0);
} }
long usableSpace = location.getUsableSpace(); long usableSpace = 0;
if (usableSpace == 0) { for (File location : locations) {
usableSpace = 0; usableSpace += location.getUsableSpace();
} }
return new ByteSizeValue(usableSpace); return new ByteSizeValue(usableSpace);
} }
@Override public boolean canDeleteUnallocated(ShardId shardId) { @Override public boolean canDeleteUnallocated(ShardId shardId) {
if (location == null) { if (locations == null) {
return false; return false;
} }
if (indexService.hasShard(shardId.id())) { if (indexService.hasShard(shardId.id())) {
return false; return false;
} }
return shardLocation(shardId).exists(); for (File location : shardLocations(shardId)) {
if (location.exists()) {
return true;
}
}
return false;
} }
@Override public void deleteUnallocated(ShardId shardId) throws IOException { @Override public void deleteUnallocated(ShardId shardId) throws IOException {
if (location == null) { if (locations == null) {
return; return;
} }
if (indexService.hasShard(shardId.id())) { if (indexService.hasShard(shardId.id())) {
throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted"); throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted");
} }
FileSystemUtils.deleteRecursively(shardLocation(shardId)); FileSystemUtils.deleteRecursively(shardLocations(shardId));
} }
public File shardLocation(ShardId shardId) { public File[] shardLocations(ShardId shardId) {
return nodeEnv.shardLocation(shardId); return nodeEnv.shardLocations(shardId);
} }
public File shardIndexLocation(ShardId shardId) { public File[] shardIndexLocations(ShardId shardId) {
return new File(shardLocation(shardId), "index"); File[] shardLocations = shardLocations(shardId);
File[] shardIndexLocations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = new File(shardLocations[i], "index");
}
return shardIndexLocations;
} }
// not used currently, but here to state that this store also defined a file based translog location // not used currently, but here to state that this store also defined a file based translog location
public File shardTranslogLocation(ShardId shardId) { public File[] shardTranslogLocations(ShardId shardId) {
return new File(shardLocation(shardId), "translog"); File[] shardLocations = shardLocations(shardId);
File[] shardTranslogLocations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardTranslogLocations[i] = new File(shardLocations[i], "translog");
}
return shardTranslogLocations;
} }
} }

View File

@ -40,8 +40,12 @@ public class MmapFsDirectoryService extends FsDirectoryService {
} }
@Override public Directory[] build() throws IOException { @Override public Directory[] build() throws IOException {
File location = indexStore.shardIndexLocation(shardId); File[] locations = indexStore.shardIndexLocations(shardId);
FileSystemUtils.mkdirs(location); Directory[] dirs = new Directory[locations.length];
return new Directory[]{new MMapDirectory(location, buildLockFactory())}; for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new MMapDirectory(locations[i], buildLockFactory());
}
return dirs;
} }
} }

View File

@ -40,8 +40,12 @@ public class NioFsDirectoryService extends FsDirectoryService {
} }
@Override public Directory[] build() throws IOException { @Override public Directory[] build() throws IOException {
File location = indexStore.shardIndexLocation(shardId); File[] locations = indexStore.shardIndexLocations(shardId);
FileSystemUtils.mkdirs(location); Directory[] dirs = new Directory[locations.length];
return new Directory[]{new NIOFSDirectory(location, buildLockFactory())}; for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new NIOFSDirectory(locations[i], buildLockFactory());
}
return dirs;
} }
} }

View File

@ -40,8 +40,12 @@ public class SimpleFsDirectoryService extends FsDirectoryService {
} }
@Override public Directory[] build() throws IOException { @Override public Directory[] build() throws IOException {
File location = indexStore.shardIndexLocation(shardId); File[] locations = indexStore.shardIndexLocations(shardId);
FileSystemUtils.mkdirs(location); Directory[] dirs = new Directory[locations.length];
return new Directory[]{new SimpleFSDirectory(location, buildLockFactory())}; for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new SimpleFSDirectory(locations[i], buildLockFactory());
}
return dirs;
} }
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -44,7 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class FsTranslog extends AbstractIndexShardComponent implements Translog { public class FsTranslog extends AbstractIndexShardComponent implements Translog {
private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final File location; private final File[] locations;
private volatile FsTranslogFile current; private volatile FsTranslogFile current;
private volatile FsTranslogFile trans; private volatile FsTranslogFile trans;
@ -53,18 +54,22 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { @Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.location = new File(nodeEnv.shardLocation(shardId), "translog"); File[] shardLocations = nodeEnv.shardLocations(shardId);
FileSystemUtils.mkdirs(this.location); this.locations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
locations[i] = new File(shardLocations[i], "translog");
FileSystemUtils.mkdirs(locations[i]);
}
} }
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) { public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.location = location; this.locations = new File[]{location};
FileSystemUtils.mkdirs(this.location); FileSystemUtils.mkdirs(location);
} }
public File location() { public File[] locations() {
return location; return locations;
} }
@Override public long currentId() { @Override public long currentId() {
@ -98,19 +103,21 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Override public void clearUnreferenced() { @Override public void clearUnreferenced() {
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
File[] files = location.listFiles(); for (File location : locations) {
if (files != null) { File[] files = location.listFiles();
for (File file : files) { if (files != null) {
if (file.getName().equals("translog-" + current.id())) { for (File file : files) {
continue; if (file.getName().equals("translog-" + current.id())) {
} continue;
if (trans != null && file.getName().equals("translog-" + trans.id())) { }
continue; if (trans != null && file.getName().equals("translog-" + trans.id())) {
} continue;
try { }
file.delete(); try {
} catch (Exception e) { file.delete();
// ignore } catch (Exception e) {
// ignore
}
} }
} }
} }
@ -123,6 +130,17 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
FsTranslogFile newFile; FsTranslogFile newFile;
long size = Long.MAX_VALUE;
File location = null;
for (File file : locations) {
long currentFree = file.getFreeSpace();
if (currentFree < size) {
size = currentFree;
location = file;
} else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) {
location = file;
}
}
try { try {
newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
} catch (IOException e) { } catch (IOException e) {
@ -147,6 +165,17 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
assert this.trans == null; assert this.trans == null;
long size = Long.MAX_VALUE;
File location = null;
for (File file : locations) {
long currentFree = file.getFreeSpace();
if (currentFree < size) {
size = currentFree;
location = file;
} else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) {
location = file;
}
}
this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
} catch (IOException e) { } catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e); throw new TranslogException(shardId, "failed to create new translog file", e);

View File

@ -352,7 +352,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indicesLifecycle.afterIndexClosed(indexService.index(), delete); indicesLifecycle.afterIndexClosed(indexService.index(), delete);
if (delete) { if (delete) {
FileSystemUtils.deleteRecursively(nodeEnv.indexLocation(new Index(index))); FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index)));
} }
} }

View File

@ -131,25 +131,28 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
if (shardCanBeDeleted) { if (shardCanBeDeleted) {
ShardId shardId = indexShardRoutingTable.shardId(); ShardId shardId = indexShardRoutingTable.shardId();
File shardLocation = nodeEnv.shardLocation(shardId); for (File shardLocation : nodeEnv.shardLocations(shardId)) {
if (shardLocation.exists()) { if (shardLocation.exists()) {
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id());
FileSystemUtils.deleteRecursively(shardLocation); FileSystemUtils.deleteRecursively(shardLocation);
}
} }
} }
} }
} }
// delete indices that are no longer part of the metadata // delete indices that are no longer part of the metadata
File[] files = nodeEnv.indicesLocation().listFiles(); for (File indicesLocation : nodeEnv.indicesLocations()) {
if (files != null) { File[] files = indicesLocation.listFiles();
for (File file : files) { if (files != null) {
// if we have the index on the metadata, don't delete it for (File file : files) {
if (event.state().metaData().hasIndex(file.getName())) { // if we have the index on the metadata, don't delete it
continue; if (event.state().metaData().hasIndex(file.getName())) {
continue;
}
logger.debug("[{}] deleting index that is no longer in the cluster meta_date from [{}]", file.getName(), file);
FileSystemUtils.deleteRecursively(file);
} }
logger.debug("[{}] deleting index that is no longer in the cluster meta_date", file.getName());
FileSystemUtils.deleteRecursively(file);
} }
} }
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
@ -33,12 +32,10 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -163,17 +160,34 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
if (!storeType.contains("fs")) { if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of()); return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
} }
File indexFile = new File(nodeEnv.shardLocation(shardId), "index"); File[] shardLocations = nodeEnv.shardLocations(shardId);
if (!indexFile.exists()) { File[] shardIndexLocations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = new File(shardLocations[i], "index");
}
boolean exists = false;
for (File shardIndexLocation : shardIndexLocations) {
if (shardIndexLocation.exists()) {
exists = true;
break;
}
}
if (!exists) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of()); return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
} }
Map<String, String> checksums = Store.readChecksums(shardIndexLocations);
if (checksums == null) {
checksums = ImmutableMap.of();
}
Map<String, StoreFileMetaData> files = Maps.newHashMap(); Map<String, StoreFileMetaData> files = Maps.newHashMap();
// read the checksums file for (File shardIndexLocation : shardIndexLocations) {
FSDirectory directory = FSDirectory.open(indexFile); File[] listedFiles = shardIndexLocation.listFiles();
Map<String, String> checksums = null; if (listedFiles == null) {
try { continue;
checksums = Store.readChecksums(directory); }
for (File file : indexFile.listFiles()) { for (File file : listedFiles) {
// BACKWARD CKS SUPPORT // BACKWARD CKS SUPPORT
if (file.getName().endsWith(".cks")) { if (file.getName().endsWith(".cks")) {
continue; continue;
@ -183,28 +197,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
} }
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName()))); files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName())));
} }
} finally {
directory.close();
}
// BACKWARD CKS SUPPORT
for (File file : indexFile.listFiles()) {
if (file.getName().endsWith(".cks")) {
continue;
}
if (file.getName().startsWith("_checksums")) {
continue;
}
// try and load the checksum
String checksum = null;
File checksumFile = new File(file.getParentFile(), file.getName() + ".cks");
if (checksumFile.exists() && (checksums == null || !checksums.containsKey(file.getName()))) {
byte[] checksumBytes = Streams.copyToByteArray(checksumFile);
if (checksumBytes.length > 0) {
checksum = Unicode.fromBytes(checksumBytes);
}
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum));
}
} }
return new StoreFilesMetaData(false, shardId, files); return new StoreFilesMetaData(false, shardId, files);

View File

@ -108,9 +108,11 @@ public class InternalSettingsPerparer {
settingsBuilder = settingsBuilder().put(v1); settingsBuilder = settingsBuilder().put(v1);
settingsBuilder.put("path.home", cleanPath(environment.homeFile().getAbsolutePath())); settingsBuilder.put("path.home", cleanPath(environment.homeFile().getAbsolutePath()));
settingsBuilder.put("path.work", cleanPath(environment.workFile().getAbsolutePath())); settingsBuilder.put("path.work", cleanPath(environment.workFile().getAbsolutePath()));
settingsBuilder.put("path.work_with_cluster", cleanPath(environment.workWithClusterFile().getAbsolutePath())); String[] paths = new String[environment.dataFiles().length];
settingsBuilder.put("path.data", cleanPath(environment.dataFile().getAbsolutePath())); for (int i = 0; i < environment.dataFiles().length; i++) {
settingsBuilder.put("path.data_with_cluster", cleanPath(environment.dataWithClusterFile().getAbsolutePath())); paths[i] = cleanPath(environment.dataFiles()[i].getAbsolutePath());
}
settingsBuilder.putArray("path.data", paths);
settingsBuilder.put("path.logs", cleanPath(environment.logsFile().getAbsolutePath())); settingsBuilder.put("path.logs", cleanPath(environment.logsFile().getAbsolutePath()));
v1 = settingsBuilder.build(); v1 = settingsBuilder.build();

View File

@ -165,7 +165,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
logger.info("Closing the server"); logger.info("Closing the server");
closeNode("server1"); closeNode("server1");
logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway");
FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles());
logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir");
startNode("server1"); startNode("server1");
@ -282,7 +282,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
closeNode("server1"); closeNode("server1");
if (fullRecovery) { if (fullRecovery) {
logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway");
FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles());
logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir");
} }

View File

@ -32,8 +32,7 @@ import org.testng.annotations.Test;
import java.io.File; import java.io.File;
import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -118,7 +117,7 @@ public class IndicesStoreTests extends AbstractNodesTests {
private File shardDirectory(String server, String index, int shard) { private File shardDirectory(String server, String index, int shard) {
InternalNode node = ((InternalNode) node(server)); InternalNode node = ((InternalNode) node(server));
NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class); NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class);
return env.shardLocation(new ShardId(index, shard)); return env.shardLocations(new ShardId(index, shard))[0];
} }

View File

@ -197,10 +197,10 @@ public class FullRestartStressTest {
client.close(); client.close();
for (Node node : nodes) { for (Node node : nodes) {
File nodeWork = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); File[] nodeDatas = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocations();
node.close(); node.close();
if (clearNodeWork && !settings.get("gateway.type").equals("local")) { if (clearNodeWork && !settings.get("gateway.type").equals("local")) {
FileSystemUtils.deleteRecursively(nodeWork); FileSystemUtils.deleteRecursively(nodeDatas);
} }
} }
@ -221,6 +221,7 @@ public class FullRestartStressTest {
.put("gateway.type", "local") .put("gateway.type", "local")
.put("gateway.recover_after_nodes", numberOfNodes) .put("gateway.recover_after_nodes", numberOfNodes)
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("path.data", "data/data1,data/data2")
.build(); .build();
FullRestartStressTest test = new FullRestartStressTest() FullRestartStressTest test = new FullRestartStressTest()

View File

@ -167,7 +167,7 @@ public class RollingRestartStressTest {
// start doing the rolling restart // start doing the rolling restart
int nodeIndex = 0; int nodeIndex = 0;
while (true) { while (true) {
File nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); File[] nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocations();
nodes[nodeIndex].close(); nodes[nodeIndex].close();
if (clearNodeData) { if (clearNodeData) {
FileSystemUtils.deleteRecursively(nodeData); FileSystemUtils.deleteRecursively(nodeData);
@ -310,7 +310,7 @@ public class RollingRestartStressTest {
} }
private void indexDoc() throws Exception { private void indexDoc() throws Exception {
StringBuffer sb = new StringBuffer(); StringBuilder sb = new StringBuilder();
XContentBuilder json = XContentFactory.jsonBuilder().startObject() XContentBuilder json = XContentFactory.jsonBuilder().startObject()
.field("field", "value" + ThreadLocalRandom.current().nextInt()); .field("field", "value" + ThreadLocalRandom.current().nextInt());
@ -341,6 +341,7 @@ public class RollingRestartStressTest {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("index.shard.check_index", true) .put("index.shard.check_index", true)
.put("gateway.type", "none") .put("gateway.type", "none")
.put("path.data", "data/data1,data/data2")
.build(); .build();
RollingRestartStressTest test = new RollingRestartStressTest() RollingRestartStressTest test = new RollingRestartStressTest()