diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java index f85ed364168..5687a34b1a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java @@ -27,7 +27,7 @@ import java.io.IOException; */ public interface DirectoryService { - Directory build() throws IOException; + Directory[] build() throws IOException; void renameFile(Directory dir, String from, String to) throws IOException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 04fbafa37b2..9bbd67e61a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -20,14 +20,17 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockFactory; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.settings.Settings; @@ -39,6 +42,7 @@ import org.elasticsearch.index.store.support.ForceSyncDirectory; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -131,7 +135,9 @@ public class Store extends AbstractIndexShardComponent { public void fullDelete() throws IOException { deleteContent(); - directoryService.fullDelete(directory.delegate()); + for (Directory delegate : directory.delegates()) { + directoryService.fullDelete(delegate); + } } public StoreStats stats() throws IOException { @@ -143,10 +149,13 @@ public class Store extends AbstractIndexShardComponent { } public void renameFile(String from, String to) throws IOException { - directoryService.renameFile(directory.delegate(), from, to); synchronized (mutex) { StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one - StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum()); + if (fromMetaData == null) { + throw new FileNotFoundException(from); + } + directoryService.renameFile(fromMetaData.directory(), from, to); + StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum(), fromMetaData.directory()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } @@ -227,7 +236,7 @@ public class Store extends AbstractIndexShardComponent { // update the metadata to include the checksum and write a new checksums file synchronized (mutex) { StoreFileMetaData metaData = filesMetadata.get(name); - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum, metaData.directory()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); writeChecksums(); } @@ -238,7 +247,7 @@ public class Store extends AbstractIndexShardComponent { synchronized (mutex) { for (Map.Entry entry : checksums.entrySet()) { StoreFileMetaData metaData = filesMetadata.get(entry.getKey()); - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue()); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue(), metaData.directory()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap(); } writeChecksums(); @@ -250,44 +259,46 @@ public class Store extends AbstractIndexShardComponent { */ protected class StoreDirectory extends Directory implements ForceSyncDirectory { - private final Directory delegate; + private final Directory[] delegates; - StoreDirectory(Directory delegate) throws IOException { - this.delegate = delegate; + StoreDirectory(Directory[] delegates) throws IOException { + this.delegates = delegates; synchronized (mutex) { - Map checksums = readChecksums(delegate); MapBuilder builder = MapBuilder.newMapBuilder(); - for (String file : delegate.listAll()) { - // BACKWARD CKS SUPPORT - if (file.endsWith(".cks")) { // ignore checksum files here - continue; - } - String checksum = checksums.get(file); + Map checksums = readChecksums(delegates[0]); + for (Directory delegate : delegates) { + for (String file : delegate.listAll()) { + // BACKWARD CKS SUPPORT + if (file.endsWith(".cks")) { // ignore checksum files here + continue; + } + String checksum = checksums.get(file); - // BACKWARD CKS SUPPORT - if (checksum == null) { - if (delegate.fileExists(file + ".cks")) { - IndexInput indexInput = delegate.openInput(file + ".cks"); - try { - if (indexInput.length() > 0) { - byte[] checksumBytes = new byte[(int) indexInput.length()]; - indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false); - checksum = Unicode.fromBytes(checksumBytes); + // BACKWARD CKS SUPPORT + if (checksum == null) { + if (delegate.fileExists(file + ".cks")) { + IndexInput indexInput = delegate.openInput(file + ".cks"); + try { + if (indexInput.length() > 0) { + byte[] checksumBytes = new byte[(int) indexInput.length()]; + indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false); + checksum = Unicode.fromBytes(checksumBytes); + } + } finally { + indexInput.close(); } - } finally { - indexInput.close(); } } + builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum, delegate)); } - builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum)); } filesMetadata = builder.immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } } - public Directory delegate() { - return delegate; + public Directory[] delegates() { + return delegates; } @Override public String[] listAll() throws IOException { @@ -307,15 +318,15 @@ public class Store extends AbstractIndexShardComponent { if (metaData.lastModified() != -1) { return metaData.lastModified(); } - return delegate.fileModified(name); + return metaData.directory().fileModified(name); } @Override public void touchFile(String name) throws IOException { - delegate.touchFile(name); synchronized (mutex) { StoreFileMetaData metaData = filesMetadata.get(name); if (metaData != null) { - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum()); + metaData.directory().touchFile(name); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.directory().fileModified(name), metaData.checksum(), metaData.directory()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); } } @@ -323,9 +334,9 @@ public class Store extends AbstractIndexShardComponent { public void deleteFileChecksum(String name) throws IOException { try { - delegate.deleteFile(name); + delegates[0].deleteFile(name); } catch (IOException e) { - if (delegate.fileExists(name)) { + if (delegates[0].fileExists(name)) { throw e; } } @@ -340,11 +351,14 @@ public class Store extends AbstractIndexShardComponent { if (isChecksum(name)) { return; } - try { - delegate.deleteFile(name); - } catch (IOException e) { - if (delegate.fileExists(name)) { - throw e; + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData != null) { + try { + metaData.directory().deleteFile(name); + } catch (IOException e) { + if (metaData.directory().fileExists(name)) { + throw e; + } } } synchronized (mutex) { @@ -362,7 +376,7 @@ public class Store extends AbstractIndexShardComponent { if (metaData.length() != -1) { return metaData.length(); } - return delegate.fileLength(name); + return metaData.directory().fileLength(name); } @Override public IndexOutput createOutput(String name) throws IOException { @@ -370,21 +384,48 @@ public class Store extends AbstractIndexShardComponent { } public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { - IndexOutput out = delegate.createOutput(name); + Directory directory = null; + if (isChecksum(name)) { + directory = delegates[0]; + } else { + if (delegates.length == 1) { + directory = delegates[0]; + } else { + long size = Long.MAX_VALUE; + for (Directory delegate : delegates) { + if (delegate instanceof FSDirectory) { + long currentSize = ((FSDirectory) delegate).getDirectory().getFreeSpace(); + if (currentSize < size) { + size = currentSize; + directory = delegate; + } + } else { + directory = delegate; // really, make sense to have multiple directories for FS + } + } + } + } + IndexOutput out = directory.createOutput(name); synchronized (mutex) { - StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null); + StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null, directory); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + return new StoreIndexOutput(metaData, out, name, computeChecksum); } - return new StoreIndexOutput(out, name, computeChecksum); } @Override public IndexInput openInput(String name) throws IOException { - return delegate.openInput(name); + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + return metaData.directory().openInput(name); } @Override public void close() throws IOException { - delegate.close(); + for (Directory delegate : delegates) { + delegate.close(); + } synchronized (mutex) { filesMetadata = ImmutableMap.of(); files = Strings.EMPTY_ARRAY; @@ -392,32 +433,51 @@ public class Store extends AbstractIndexShardComponent { } @Override public Lock makeLock(String name) { - return delegate.makeLock(name); + return delegates[0].makeLock(name); } @Override public IndexInput openInput(String name, int bufferSize) throws IOException { - return delegate.openInput(name, bufferSize); + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + return metaData.directory().openInput(name, bufferSize); } @Override public void clearLock(String name) throws IOException { - delegate.clearLock(name); + delegates[0].clearLock(name); } @Override public void setLockFactory(LockFactory lockFactory) throws IOException { - delegate.setLockFactory(lockFactory); + delegates[0].setLockFactory(lockFactory); } @Override public LockFactory getLockFactory() { - return delegate.getLockFactory(); + return delegates[0].getLockFactory(); } @Override public String getLockID() { - return delegate.getLockID(); + return delegates[0].getLockID(); } @Override public void sync(Collection names) throws IOException { if (sync) { - delegate.sync(names); + Map> map = Maps.newHashMap(); + for (String name : names) { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + Collection dirNames = map.get(metaData.directory()); + if (dirNames == null) { + dirNames = new ArrayList(); + map.put(metaData.directory(), dirNames); + } + dirNames.add(name); + } + for (Map.Entry> entry : map.entrySet()) { + entry.getKey().sync(entry.getValue()); + } } for (String name : names) { // write the checksums file when we sync on the segments file (committed) @@ -430,7 +490,7 @@ public class Store extends AbstractIndexShardComponent { @Override public void sync(String name) throws IOException { if (sync) { - delegate.sync(name); + sync(ImmutableList.of(name)); } // write the checksums file when we sync on the segments file (committed) if (!name.equals("segments.gen") && name.startsWith("segments")) { @@ -439,19 +499,22 @@ public class Store extends AbstractIndexShardComponent { } @Override public void forceSync(String name) throws IOException { - delegate.sync(name); + sync(ImmutableList.of(name)); } } class StoreIndexOutput extends IndexOutput { + private final StoreFileMetaData metaData; + private final IndexOutput delegate; private final String name; private final Checksum digest; - StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) { + StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name, boolean computeChecksum) { + this.metaData = metaData; this.delegate = delegate; this.name = name; if (computeChecksum) { @@ -480,7 +543,7 @@ public class Store extends AbstractIndexShardComponent { checksum = Long.toString(digest.getValue(), Character.MAX_RADIX); } synchronized (mutex) { - StoreFileMetaData md = new StoreFileMetaData(name, directory.delegate().fileLength(name), directory.delegate().fileModified(name), checksum); + StoreFileMetaData md = new StoreFileMetaData(name, metaData.directory().fileLength(name), metaData.directory().fileModified(name), checksum, metaData.directory()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java index c1b7815b5e9..8605a2d3c6f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.store; +import org.apache.lucene.store.Directory; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,14 +40,25 @@ public class StoreFileMetaData implements Streamable { private String checksum; + private transient Directory directory; + StoreFileMetaData() { } public StoreFileMetaData(String name, long length, long lastModified, String checksum) { + this(name, length, lastModified, checksum, null); + } + + public StoreFileMetaData(String name, long length, long lastModified, String checksum, @Nullable Directory directory) { this.name = name; this.lastModified = lastModified; this.length = length; this.checksum = checksum; + this.directory = directory; + } + + public Directory directory() { + return this.directory; } public String name() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java index 336e82d0ca0..c54066fc16a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java @@ -39,9 +39,9 @@ public class MmapFsDirectoryService extends FsDirectoryService { super(shardId, indexSettings, indexStore); } - @Override public Directory build() throws IOException { + @Override public Directory[] build() throws IOException { File location = indexStore.shardIndexLocation(shardId); FileSystemUtils.mkdirs(location); - return new MMapDirectory(location, buildLockFactory()); + return new Directory[]{new MMapDirectory(location, buildLockFactory())}; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java index 5d148347c75..e020b9a349e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java @@ -39,9 +39,9 @@ public class NioFsDirectoryService extends FsDirectoryService { super(shardId, indexSettings, indexStore); } - @Override public Directory build() throws IOException { + @Override public Directory[] build() throws IOException { File location = indexStore.shardIndexLocation(shardId); FileSystemUtils.mkdirs(location); - return new NIOFSDirectory(location, buildLockFactory()); + return new Directory[]{new NIOFSDirectory(location, buildLockFactory())}; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java index 7fec5e31e2e..dee62bf2637 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java @@ -39,9 +39,9 @@ public class SimpleFsDirectoryService extends FsDirectoryService { super(shardId, indexSettings, indexStore); } - @Override public Directory build() throws IOException { + @Override public Directory[] build() throws IOException { File location = indexStore.shardIndexLocation(shardId); FileSystemUtils.mkdirs(location); - return new SimpleFSDirectory(location, buildLockFactory()); + return new Directory[]{new SimpleFSDirectory(location, buildLockFactory())}; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java index 964527a5cc0..3a99a0b477b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java @@ -46,8 +46,8 @@ public class ByteBufferDirectoryService extends AbstractIndexShardComponent impl this.byteBufferCache = byteBufferCache; } - @Override public Directory build() { - return new CustomByteBufferDirectory(byteBufferCache); + @Override public Directory[] build() { + return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)}; } @Override public void renameFile(Directory dir, String from, String to) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java index 5b8dea1d99e..c598cb7c352 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java @@ -40,8 +40,8 @@ public class RamDirectoryService extends AbstractIndexShardComponent implements super(shardId, indexSettings); } - @Override public Directory build() { - return new CustomRAMDirectory(); + @Override public Directory[] build() { + return new Directory[]{new CustomRAMDirectory()}; } @Override public void renameFile(Directory dir, String from, String to) throws IOException {