more internal refactoring in directory providers
This commit is contained in:
parent
25fe56c462
commit
c1ca21f4d5
|
@ -27,7 +27,7 @@ import java.io.IOException;
|
|||
*/
|
||||
public interface DirectoryService {
|
||||
|
||||
Directory build() throws IOException;
|
||||
Directory[] build() throws IOException;
|
||||
|
||||
void renameFile(Directory dir, String from, String to) throws IOException;
|
||||
|
||||
|
|
|
@ -20,14 +20,17 @@
|
|||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.LockFactory;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.Unicode;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.Directories;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -39,6 +42,7 @@ import org.elasticsearch.index.store.support.ForceSyncDirectory;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -131,7 +135,9 @@ public class Store extends AbstractIndexShardComponent {
|
|||
|
||||
public void fullDelete() throws IOException {
|
||||
deleteContent();
|
||||
directoryService.fullDelete(directory.delegate());
|
||||
for (Directory delegate : directory.delegates()) {
|
||||
directoryService.fullDelete(delegate);
|
||||
}
|
||||
}
|
||||
|
||||
public StoreStats stats() throws IOException {
|
||||
|
@ -143,10 +149,13 @@ public class Store extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
public void renameFile(String from, String to) throws IOException {
|
||||
directoryService.renameFile(directory.delegate(), from, to);
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one
|
||||
StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum());
|
||||
if (fromMetaData == null) {
|
||||
throw new FileNotFoundException(from);
|
||||
}
|
||||
directoryService.renameFile(fromMetaData.directory(), from, to);
|
||||
StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum(), fromMetaData.directory());
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
}
|
||||
|
@ -227,7 +236,7 @@ public class Store extends AbstractIndexShardComponent {
|
|||
// update the metadata to include the checksum and write a new checksums file
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum);
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum, metaData.directory());
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
|
||||
writeChecksums();
|
||||
}
|
||||
|
@ -238,7 +247,7 @@ public class Store extends AbstractIndexShardComponent {
|
|||
synchronized (mutex) {
|
||||
for (Map.Entry<String, String> entry : checksums.entrySet()) {
|
||||
StoreFileMetaData metaData = filesMetadata.get(entry.getKey());
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue());
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue(), metaData.directory());
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap();
|
||||
}
|
||||
writeChecksums();
|
||||
|
@ -250,13 +259,14 @@ public class Store extends AbstractIndexShardComponent {
|
|||
*/
|
||||
protected class StoreDirectory extends Directory implements ForceSyncDirectory {
|
||||
|
||||
private final Directory delegate;
|
||||
private final Directory[] delegates;
|
||||
|
||||
StoreDirectory(Directory delegate) throws IOException {
|
||||
this.delegate = delegate;
|
||||
StoreDirectory(Directory[] delegates) throws IOException {
|
||||
this.delegates = delegates;
|
||||
synchronized (mutex) {
|
||||
Map<String, String> checksums = readChecksums(delegate);
|
||||
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
|
||||
Map<String, String> checksums = readChecksums(delegates[0]);
|
||||
for (Directory delegate : delegates) {
|
||||
for (String file : delegate.listAll()) {
|
||||
// BACKWARD CKS SUPPORT
|
||||
if (file.endsWith(".cks")) { // ignore checksum files here
|
||||
|
@ -279,15 +289,16 @@ public class Store extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum));
|
||||
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum, delegate));
|
||||
}
|
||||
}
|
||||
filesMetadata = builder.immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
public Directory delegate() {
|
||||
return delegate;
|
||||
public Directory[] delegates() {
|
||||
return delegates;
|
||||
}
|
||||
|
||||
@Override public String[] listAll() throws IOException {
|
||||
|
@ -307,15 +318,15 @@ public class Store extends AbstractIndexShardComponent {
|
|||
if (metaData.lastModified() != -1) {
|
||||
return metaData.lastModified();
|
||||
}
|
||||
return delegate.fileModified(name);
|
||||
return metaData.directory().fileModified(name);
|
||||
}
|
||||
|
||||
@Override public void touchFile(String name) throws IOException {
|
||||
delegate.touchFile(name);
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
if (metaData != null) {
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum());
|
||||
metaData.directory().touchFile(name);
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.directory().fileModified(name), metaData.checksum(), metaData.directory());
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
|
||||
}
|
||||
}
|
||||
|
@ -323,9 +334,9 @@ public class Store extends AbstractIndexShardComponent {
|
|||
|
||||
public void deleteFileChecksum(String name) throws IOException {
|
||||
try {
|
||||
delegate.deleteFile(name);
|
||||
delegates[0].deleteFile(name);
|
||||
} catch (IOException e) {
|
||||
if (delegate.fileExists(name)) {
|
||||
if (delegates[0].fileExists(name)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -340,13 +351,16 @@ public class Store extends AbstractIndexShardComponent {
|
|||
if (isChecksum(name)) {
|
||||
return;
|
||||
}
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
if (metaData != null) {
|
||||
try {
|
||||
delegate.deleteFile(name);
|
||||
metaData.directory().deleteFile(name);
|
||||
} catch (IOException e) {
|
||||
if (delegate.fileExists(name)) {
|
||||
if (metaData.directory().fileExists(name)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
synchronized (mutex) {
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
|
@ -362,7 +376,7 @@ public class Store extends AbstractIndexShardComponent {
|
|||
if (metaData.length() != -1) {
|
||||
return metaData.length();
|
||||
}
|
||||
return delegate.fileLength(name);
|
||||
return metaData.directory().fileLength(name);
|
||||
}
|
||||
|
||||
@Override public IndexOutput createOutput(String name) throws IOException {
|
||||
|
@ -370,21 +384,48 @@ public class Store extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException {
|
||||
IndexOutput out = delegate.createOutput(name);
|
||||
Directory directory = null;
|
||||
if (isChecksum(name)) {
|
||||
directory = delegates[0];
|
||||
} else {
|
||||
if (delegates.length == 1) {
|
||||
directory = delegates[0];
|
||||
} else {
|
||||
long size = Long.MAX_VALUE;
|
||||
for (Directory delegate : delegates) {
|
||||
if (delegate instanceof FSDirectory) {
|
||||
long currentSize = ((FSDirectory) delegate).getDirectory().getFreeSpace();
|
||||
if (currentSize < size) {
|
||||
size = currentSize;
|
||||
directory = delegate;
|
||||
}
|
||||
} else {
|
||||
directory = delegate; // really, make sense to have multiple directories for FS
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexOutput out = directory.createOutput(name);
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
|
||||
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null, directory);
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
return new StoreIndexOutput(metaData, out, name, computeChecksum);
|
||||
}
|
||||
return new StoreIndexOutput(out, name, computeChecksum);
|
||||
}
|
||||
|
||||
@Override public IndexInput openInput(String name) throws IOException {
|
||||
return delegate.openInput(name);
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
if (metaData == null) {
|
||||
throw new FileNotFoundException(name);
|
||||
}
|
||||
return metaData.directory().openInput(name);
|
||||
}
|
||||
|
||||
@Override public void close() throws IOException {
|
||||
for (Directory delegate : delegates) {
|
||||
delegate.close();
|
||||
}
|
||||
synchronized (mutex) {
|
||||
filesMetadata = ImmutableMap.of();
|
||||
files = Strings.EMPTY_ARRAY;
|
||||
|
@ -392,32 +433,51 @@ public class Store extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
@Override public Lock makeLock(String name) {
|
||||
return delegate.makeLock(name);
|
||||
return delegates[0].makeLock(name);
|
||||
}
|
||||
|
||||
@Override public IndexInput openInput(String name, int bufferSize) throws IOException {
|
||||
return delegate.openInput(name, bufferSize);
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
if (metaData == null) {
|
||||
throw new FileNotFoundException(name);
|
||||
}
|
||||
return metaData.directory().openInput(name, bufferSize);
|
||||
}
|
||||
|
||||
@Override public void clearLock(String name) throws IOException {
|
||||
delegate.clearLock(name);
|
||||
delegates[0].clearLock(name);
|
||||
}
|
||||
|
||||
@Override public void setLockFactory(LockFactory lockFactory) throws IOException {
|
||||
delegate.setLockFactory(lockFactory);
|
||||
delegates[0].setLockFactory(lockFactory);
|
||||
}
|
||||
|
||||
@Override public LockFactory getLockFactory() {
|
||||
return delegate.getLockFactory();
|
||||
return delegates[0].getLockFactory();
|
||||
}
|
||||
|
||||
@Override public String getLockID() {
|
||||
return delegate.getLockID();
|
||||
return delegates[0].getLockID();
|
||||
}
|
||||
|
||||
@Override public void sync(Collection<String> names) throws IOException {
|
||||
if (sync) {
|
||||
delegate.sync(names);
|
||||
Map<Directory, Collection<String>> map = Maps.newHashMap();
|
||||
for (String name : names) {
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
if (metaData == null) {
|
||||
throw new FileNotFoundException(name);
|
||||
}
|
||||
Collection<String> dirNames = map.get(metaData.directory());
|
||||
if (dirNames == null) {
|
||||
dirNames = new ArrayList<String>();
|
||||
map.put(metaData.directory(), dirNames);
|
||||
}
|
||||
dirNames.add(name);
|
||||
}
|
||||
for (Map.Entry<Directory, Collection<String>> entry : map.entrySet()) {
|
||||
entry.getKey().sync(entry.getValue());
|
||||
}
|
||||
}
|
||||
for (String name : names) {
|
||||
// write the checksums file when we sync on the segments file (committed)
|
||||
|
@ -430,7 +490,7 @@ public class Store extends AbstractIndexShardComponent {
|
|||
|
||||
@Override public void sync(String name) throws IOException {
|
||||
if (sync) {
|
||||
delegate.sync(name);
|
||||
sync(ImmutableList.of(name));
|
||||
}
|
||||
// write the checksums file when we sync on the segments file (committed)
|
||||
if (!name.equals("segments.gen") && name.startsWith("segments")) {
|
||||
|
@ -439,19 +499,22 @@ public class Store extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
@Override public void forceSync(String name) throws IOException {
|
||||
delegate.sync(name);
|
||||
sync(ImmutableList.of(name));
|
||||
}
|
||||
}
|
||||
|
||||
class StoreIndexOutput extends IndexOutput {
|
||||
|
||||
private final StoreFileMetaData metaData;
|
||||
|
||||
private final IndexOutput delegate;
|
||||
|
||||
private final String name;
|
||||
|
||||
private final Checksum digest;
|
||||
|
||||
StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) {
|
||||
StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name, boolean computeChecksum) {
|
||||
this.metaData = metaData;
|
||||
this.delegate = delegate;
|
||||
this.name = name;
|
||||
if (computeChecksum) {
|
||||
|
@ -480,7 +543,7 @@ public class Store extends AbstractIndexShardComponent {
|
|||
checksum = Long.toString(digest.getValue(), Character.MAX_RADIX);
|
||||
}
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData md = new StoreFileMetaData(name, directory.delegate().fileLength(name), directory.delegate().fileModified(name), checksum);
|
||||
StoreFileMetaData md = new StoreFileMetaData(name, metaData.directory().fileLength(name), metaData.directory().fileModified(name), checksum, metaData.directory());
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -39,14 +40,25 @@ public class StoreFileMetaData implements Streamable {
|
|||
|
||||
private String checksum;
|
||||
|
||||
private transient Directory directory;
|
||||
|
||||
StoreFileMetaData() {
|
||||
}
|
||||
|
||||
public StoreFileMetaData(String name, long length, long lastModified, String checksum) {
|
||||
this(name, length, lastModified, checksum, null);
|
||||
}
|
||||
|
||||
public StoreFileMetaData(String name, long length, long lastModified, String checksum, @Nullable Directory directory) {
|
||||
this.name = name;
|
||||
this.lastModified = lastModified;
|
||||
this.length = length;
|
||||
this.checksum = checksum;
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public Directory directory() {
|
||||
return this.directory;
|
||||
}
|
||||
|
||||
public String name() {
|
||||
|
|
|
@ -39,9 +39,9 @@ public class MmapFsDirectoryService extends FsDirectoryService {
|
|||
super(shardId, indexSettings, indexStore);
|
||||
}
|
||||
|
||||
@Override public Directory build() throws IOException {
|
||||
@Override public Directory[] build() throws IOException {
|
||||
File location = indexStore.shardIndexLocation(shardId);
|
||||
FileSystemUtils.mkdirs(location);
|
||||
return new MMapDirectory(location, buildLockFactory());
|
||||
return new Directory[]{new MMapDirectory(location, buildLockFactory())};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ public class NioFsDirectoryService extends FsDirectoryService {
|
|||
super(shardId, indexSettings, indexStore);
|
||||
}
|
||||
|
||||
@Override public Directory build() throws IOException {
|
||||
@Override public Directory[] build() throws IOException {
|
||||
File location = indexStore.shardIndexLocation(shardId);
|
||||
FileSystemUtils.mkdirs(location);
|
||||
return new NIOFSDirectory(location, buildLockFactory());
|
||||
return new Directory[]{new NIOFSDirectory(location, buildLockFactory())};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ public class SimpleFsDirectoryService extends FsDirectoryService {
|
|||
super(shardId, indexSettings, indexStore);
|
||||
}
|
||||
|
||||
@Override public Directory build() throws IOException {
|
||||
@Override public Directory[] build() throws IOException {
|
||||
File location = indexStore.shardIndexLocation(shardId);
|
||||
FileSystemUtils.mkdirs(location);
|
||||
return new SimpleFSDirectory(location, buildLockFactory());
|
||||
return new Directory[]{new SimpleFSDirectory(location, buildLockFactory())};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,8 +46,8 @@ public class ByteBufferDirectoryService extends AbstractIndexShardComponent impl
|
|||
this.byteBufferCache = byteBufferCache;
|
||||
}
|
||||
|
||||
@Override public Directory build() {
|
||||
return new CustomByteBufferDirectory(byteBufferCache);
|
||||
@Override public Directory[] build() {
|
||||
return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)};
|
||||
}
|
||||
|
||||
@Override public void renameFile(Directory dir, String from, String to) throws IOException {
|
||||
|
|
|
@ -40,8 +40,8 @@ public class RamDirectoryService extends AbstractIndexShardComponent implements
|
|||
super(shardId, indexSettings);
|
||||
}
|
||||
|
||||
@Override public Directory build() {
|
||||
return new CustomRAMDirectory();
|
||||
@Override public Directory[] build() {
|
||||
return new Directory[]{new CustomRAMDirectory()};
|
||||
}
|
||||
|
||||
@Override public void renameFile(Directory dir, String from, String to) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue