refactor creation of lucene directory and simplify different directories implemenation strcture

This commit is contained in:
Shay Banon 2011-09-16 18:39:23 +03:00
parent a7e43005bb
commit bdfa07934e
25 changed files with 741 additions and 1557 deletions

View File

@ -1,333 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.index.engine;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LoadFirstFieldSelector;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bloom.none.NoneBloomCache;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.lucene.DocumentBuilder.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (Shay Banon)
*/
public class SimpleEngineBenchmark {
private final Store store;
private final Engine engine;
private final AtomicInteger idGenerator = new AtomicInteger();
private String[] contentItems = new String[]{"test1", "test2", "test3"};
private static byte[] TRANSLOG_PAYLOAD = new byte[12];
private volatile int lastRefreshedId = 0;
private boolean create = false;
private int searcherIterations = 10;
private Thread[] searcherThreads = new Thread[1];
private int writerIterations = 10;
private Thread[] writerThreads = new Thread[1];
private TimeValue refreshSchedule = new TimeValue(1, TimeUnit.SECONDS);
private TimeValue flushSchedule = new TimeValue(1, TimeUnit.MINUTES);
private CountDownLatch latch;
private CyclicBarrier barrier1;
private CyclicBarrier barrier2;
// scheduled thread pool for both refresh and flush operations
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
public SimpleEngineBenchmark(Store store, Engine engine) {
this.store = store;
this.engine = engine;
}
public SimpleEngineBenchmark numberOfContentItems(int numberOfContentItems) {
contentItems = new String[numberOfContentItems];
for (int i = 0; i < contentItems.length; i++) {
contentItems[i] = "content" + i;
}
return this;
}
public SimpleEngineBenchmark searcherThreads(int numberOfSearcherThreads) {
searcherThreads = new Thread[numberOfSearcherThreads];
return this;
}
public SimpleEngineBenchmark searcherIterations(int searcherIterations) {
this.searcherIterations = searcherIterations;
return this;
}
public SimpleEngineBenchmark writerThreads(int numberOfWriterThreads) {
writerThreads = new Thread[numberOfWriterThreads];
return this;
}
public SimpleEngineBenchmark writerIterations(int writerIterations) {
this.writerIterations = writerIterations;
return this;
}
public SimpleEngineBenchmark refreshSchedule(TimeValue refreshSchedule) {
this.refreshSchedule = refreshSchedule;
return this;
}
public SimpleEngineBenchmark flushSchedule(TimeValue flushSchedule) {
this.flushSchedule = flushSchedule;
return this;
}
public SimpleEngineBenchmark create(boolean create) {
this.create = create;
return this;
}
public SimpleEngineBenchmark build() {
for (int i = 0; i < searcherThreads.length; i++) {
searcherThreads[i] = new Thread(new SearcherThread(), "Searcher[" + i + "]");
}
for (int i = 0; i < writerThreads.length; i++) {
writerThreads[i] = new Thread(new WriterThread(), "Writer[" + i + "]");
}
latch = new CountDownLatch(searcherThreads.length + writerThreads.length);
barrier1 = new CyclicBarrier(searcherThreads.length + writerThreads.length + 1);
barrier2 = new CyclicBarrier(searcherThreads.length + writerThreads.length + 1);
// warmup by indexing all content items
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (String contentItem : contentItems) {
int id = idGenerator.incrementAndGet();
String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId))
.add(field("content", contentItem)).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else {
engine.index(new Engine.Index(null, new Term("_id", sId), pDoc));
}
}
engine.refresh(new Engine.Refresh(true));
stopWatch.stop();
System.out.println("Warmup of [" + contentItems.length + "] content items, took " + stopWatch.totalTime());
return this;
}
public void run() throws Exception {
for (Thread t : searcherThreads) {
t.start();
}
for (Thread t : writerThreads) {
t.start();
}
barrier1.await();
Refresher refresher = new Refresher();
scheduledExecutorService.scheduleWithFixedDelay(refresher, refreshSchedule.millis(), refreshSchedule.millis(), TimeUnit.MILLISECONDS);
Flusher flusher = new Flusher();
scheduledExecutorService.scheduleWithFixedDelay(flusher, flushSchedule.millis(), flushSchedule.millis(), TimeUnit.MILLISECONDS);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
barrier2.await();
latch.await();
stopWatch.stop();
System.out.println("Summary");
System.out.println(" -- Readers [" + searcherThreads.length + "] with [" + searcherIterations + "] iterations");
System.out.println(" -- Writers [" + writerThreads.length + "] with [" + writerIterations + "] iterations");
System.out.println(" -- Took: " + stopWatch.totalTime());
System.out.println(" -- Refresh [" + refresher.id + "] took: " + refresher.stopWatch.totalTime());
System.out.println(" -- Flush [" + flusher.id + "] took: " + flusher.stopWatch.totalTime());
System.out.println(" -- Store size " + store.estimateSize());
scheduledExecutorService.shutdown();
engine.refresh(new Engine.Refresh(true));
stopWatch = new StopWatch();
stopWatch.start();
Engine.Searcher searcher = engine.searcher();
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), idGenerator.get() + 1);
stopWatch.stop();
System.out.println(" -- Indexed [" + idGenerator.get() + "] docs, found [" + topDocs.totalHits + "] hits, took " + stopWatch.totalTime());
searcher.release();
}
private String content(long number) {
return contentItems[((int) (number % contentItems.length))];
}
private class Flusher implements Runnable {
StopWatch stopWatch = new StopWatch();
private int id;
@Override public void run() {
stopWatch.start("" + ++id);
engine.flush(new Engine.Flush());
stopWatch.stop();
}
}
private class Refresher implements Runnable {
StopWatch stopWatch = new StopWatch();
private int id;
@Override public synchronized void run() {
stopWatch.start("" + ++id);
int lastId = idGenerator.get();
engine.refresh(new Engine.Refresh(true));
lastRefreshedId = lastId;
stopWatch.stop();
}
}
private class SearcherThread implements Runnable {
@Override public void run() {
try {
barrier1.await();
barrier2.await();
for (int i = 0; i < searcherIterations; i++) {
Engine.Searcher searcher = engine.searcher();
TopDocs topDocs = searcher.searcher().search(new TermQuery(new Term("content", content(i))), 10);
// read one
searcher.searcher().doc(topDocs.scoreDocs[0].doc, new LoadFirstFieldSelector());
searcher.release();
}
} catch (Exception e) {
System.out.println("Searcher thread failed");
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
private class WriterThread implements Runnable {
@Override public void run() {
try {
barrier1.await();
barrier2.await();
for (int i = 0; i < writerIterations; i++) {
int id = idGenerator.incrementAndGet();
String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId))
.add(field("content", content(id))).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else {
engine.index(new Engine.Index(null, new Term("_id", sId), pDoc));
}
}
} catch (Exception e) {
System.out.println("Writer thread failed");
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
public static void main(String[] args) throws Exception {
ShardId shardId = new ShardId(new Index("index"), 1);
Settings settings = EMPTY_SETTINGS;
// Store store = new RamStore(shardId, settings);
Store store = new ByteBufferStore(shardId, settings, null, new ByteBufferCache(settings));
// Store store = new NioFsStore(shardId, settings);
store.deleteContent();
ThreadPool threadPool = new ThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog")), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
engine.start();
SimpleEngineBenchmark benchmark = new SimpleEngineBenchmark(store, engine)
.numberOfContentItems(1000)
.searcherThreads(50).searcherIterations(10000)
.writerThreads(10).writerIterations(10000)
.refreshSchedule(new TimeValue(1, TimeUnit.SECONDS))
.flushSchedule(new TimeValue(1, TimeUnit.MINUTES))
.create(false)
.build();
benchmark.run();
engine.close();
store.close();
threadPool.shutdown();
}
}

View File

@ -1,295 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.index.store;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.fs.*;
import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.store.ram.RamStore;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy
*/
public class SimpleStoreBenchmark {
private final AtomicLong dynamicFilesCounter = new AtomicLong();
private final Store store;
private String[] staticFiles = new String[10];
private ByteSizeValue staticFileSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private ByteSizeValue dynamicFileSize = new ByteSizeValue(1, ByteSizeUnit.MB);
private int readerIterations = 10;
private int writerIterations = 10;
private Thread[] readerThreads = new Thread[1];
private Thread[] writerThreads = new Thread[1];
private CountDownLatch latch;
private CyclicBarrier barrier1;
private CyclicBarrier barrier2;
public SimpleStoreBenchmark(Store store) throws Exception {
this.store = store;
}
public SimpleStoreBenchmark numberStaticFiles(int numberStaticFiles) {
this.staticFiles = new String[numberStaticFiles];
return this;
}
public SimpleStoreBenchmark staticFileSize(ByteSizeValue staticFileSize) {
this.staticFileSize = staticFileSize;
return this;
}
public SimpleStoreBenchmark dynamicFileSize(ByteSizeValue dynamicFileSize) {
this.dynamicFileSize = dynamicFileSize;
return this;
}
public SimpleStoreBenchmark readerThreads(int readerThreads) {
this.readerThreads = new Thread[readerThreads];
return this;
}
public SimpleStoreBenchmark readerIterations(int readerIterations) {
this.readerIterations = readerIterations;
return this;
}
public SimpleStoreBenchmark writerIterations(int writerIterations) {
this.writerIterations = writerIterations;
return this;
}
public SimpleStoreBenchmark writerThreads(int writerThreads) {
this.writerThreads = new Thread[writerThreads];
return this;
}
public SimpleStoreBenchmark build() throws Exception {
System.out.println("Creating [" + staticFiles.length + "] static files with size [" + staticFileSize + "]");
for (int i = 0; i < staticFiles.length; i++) {
staticFiles[i] = "static" + i;
IndexOutput io = store.directory().createOutput(staticFiles[i]);
for (long sizeCounter = 0; sizeCounter < staticFileSize.bytes(); sizeCounter++) {
io.writeByte((byte) 1);
}
io.close();
}
System.out.println("Using [" + dynamicFileSize + "] size for dynamic files");
// warmp
StopWatch stopWatch = new StopWatch("warmup");
stopWatch.start();
for (String staticFile : staticFiles) {
IndexInput ii = store.directory().openInput(staticFile);
// do a full read
for (long counter = 0; counter < ii.length(); counter++) {
byte result = ii.readByte();
if (result != 1) {
System.out.println("Failure, read wrong value [" + result + "]");
}
}
// do a list of the files
store.directory().listAll();
}
stopWatch.stop();
System.out.println("Warmup Took: " + stopWatch.shortSummary());
for (int i = 0; i < readerThreads.length; i++) {
readerThreads[i] = new Thread(new ReaderThread(), "Reader[" + i + "]");
}
for (int i = 0; i < writerThreads.length; i++) {
writerThreads[i] = new Thread(new WriterThread(), "Writer[" + i + "]");
}
latch = new CountDownLatch(readerThreads.length + writerThreads.length);
barrier1 = new CyclicBarrier(readerThreads.length + writerThreads.length + 1);
barrier2 = new CyclicBarrier(readerThreads.length + writerThreads.length + 1);
return this;
}
public void run() throws Exception {
for (int i = 0; i < 3; i++) {
System.gc();
MILLISECONDS.sleep(100);
}
long emptyUsed = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
System.out.println("Running:");
System.out.println(" -- Readers [" + readerThreads.length + "] with [" + readerIterations + "] iterations");
System.out.println(" -- Writers [" + writerThreads.length + "] with [" + writerIterations + "] iterations");
for (Thread t : readerThreads) {
t.start();
}
for (Thread t : writerThreads) {
t.start();
}
barrier1.await();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
barrier2.await();
latch.await();
stopWatch.stop();
System.out.println("Took: " + stopWatch.shortSummary());
for (int i = 0; i < 3; i++) {
System.gc();
MILLISECONDS.sleep(100);
}
long bytesTaken = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() - emptyUsed;
System.out.println("Size of [" + staticFiles.length + "], each with size [" + staticFileSize + "], is " + new ByteSizeValue(bytesTaken, ByteSizeUnit.BYTES));
}
private class ReaderThread implements Runnable {
@Override public void run() {
try {
barrier1.await();
barrier2.await();
} catch (Exception e) {
e.printStackTrace();
}
try {
for (int i = 0; i < readerIterations; i++) {
for (String staticFile : staticFiles) {
// do a list of the files
store.directory().listAll();
IndexInput ii = store.directory().openInput(staticFile);
// do a full read
for (long counter = 0; counter < ii.length(); counter++) {
byte result = ii.readByte();
if (result != 1) {
System.out.println("Failure, read wrong value [" + result + "]");
}
}
// do a list of the files
store.directory().listAll();
// do a seek and read some byes
ii.seek(ii.length() / 2);
ii.readByte();
ii.readByte();
// do a list of the files
store.directory().listAll();
}
}
} catch (Exception e) {
System.out.println("Reader Thread failed: " + e.getMessage());
e.printStackTrace();
}
latch.countDown();
}
}
private class WriterThread implements Runnable {
@Override public void run() {
try {
barrier1.await();
barrier2.await();
} catch (Exception e) {
e.printStackTrace();
}
try {
for (int i = 0; i < writerIterations; i++) {
String dynamicFileName = "dynamic" + dynamicFilesCounter.incrementAndGet();
IndexOutput io = store.directory().createOutput(dynamicFileName);
for (long sizeCounter = 0; sizeCounter < dynamicFileSize.bytes(); sizeCounter++) {
io.writeByte((byte) 1);
}
io.close();
store.directory().deleteFile(dynamicFileName);
}
} catch (Exception e) {
System.out.println("Writer thread failed: " + e.getMessage());
e.printStackTrace();
}
latch.countDown();
}
}
public static void main(String[] args) throws Exception {
Environment environment = new Environment();
Settings settings = EMPTY_SETTINGS;
NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment);
ByteBufferCache byteBufferCache = new ByteBufferCache(settings);
ShardId shardId = new ShardId(new Index("index"), 1);
String type = args.length > 0 ? args[0] : "ram";
Store store;
if (type.equalsIgnoreCase("ram")) {
store = new RamStore(shardId, settings, null);
} else if (type.equalsIgnoreCase("simple-fs")) {
store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("mmap-fs")) {
store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("nio-fs")) {
store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("memory")) {
store = new ByteBufferStore(shardId, settings, null, byteBufferCache);
} else {
throw new IllegalArgumentException("No type store [" + type + "]");
}
System.out.println("Using Store [" + store + "]");
store.deleteContent();
SimpleStoreBenchmark simpleStoreBenchmark = new SimpleStoreBenchmark(store)
.numberStaticFiles(5).staticFileSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.dynamicFileSize(new ByteSizeValue(1, ByteSizeUnit.MB))
.readerThreads(5).readerIterations(10)
.writerThreads(2).writerIterations(10)
.build();
simpleStoreBenchmark.run();
store.close();
}
}

View File

@ -124,6 +124,10 @@ public class FileSystemUtils {
return deleteRecursively(root, true);
}
private static boolean innerDeleteRecursively(File root) {
return deleteRecursively(root, true);
}
/**
* Delete the supplied {@link java.io.File} - for directories,
* recursively delete any nested directories or files as well.
@ -139,7 +143,7 @@ public class FileSystemUtils {
File[] children = root.listFiles();
if (children != null) {
for (File aChildren : children) {
deleteRecursively(aChildren);
innerDeleteRecursively(aChildren);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import java.io.IOException;
/**
*/
public interface DirectoryService {
Directory build() throws IOException;
void renameFile(Directory dir, String from, String to) throws IOException;
void fullDelete(Directory dir) throws IOException;
}

View File

@ -40,7 +40,7 @@ public interface IndexStore extends IndexComponent {
/**
* The shard store class that should be used for each shard.
*/
Class<? extends Store> shardStoreClass();
Class<? extends DirectoryService> shardDirectory();
/**
* Returns the backing store total space. Return <tt>-1</tt> if not available.

View File

@ -20,65 +20,516 @@
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
/**
* @author kimchy (shay.banon)
*/
public interface Store extends IndexShardComponent {
public class Store extends AbstractIndexShardComponent {
static final String CHECKSUMS_PREFIX = "_checksums-";
public static final boolean isChecksum(String name) {
return name.startsWith(CHECKSUMS_PREFIX);
}
private final IndexStore indexStore;
private final DirectoryService directoryService;
private final StoreDirectory directory;
private volatile ImmutableMap<String, StoreFileMetaData> filesMetadata = ImmutableMap.of();
private volatile String[] files = Strings.EMPTY_ARRAY;
private final Object mutex = new Object();
private final boolean sync;
@Inject public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException {
super(shardId, indexSettings);
this.indexStore = indexStore;
this.directoryService = directoryService;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(directoryService.build());
}
public Directory directory() {
return directory;
}
public ImmutableMap<String, StoreFileMetaData> list() throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (String name : files) {
StoreFileMetaData md = metaData(name);
if (md != null) {
builder.put(md.name(), md);
}
}
return builder.build();
}
public StoreFileMetaData metaData(String name) throws IOException {
StoreFileMetaData md = filesMetadata.get(name);
if (md == null) {
return null;
}
// IndexOutput not closed, does not exists
if (md.lastModified() == -1 || md.length() == -1) {
return null;
}
return md;
}
public void deleteContent() throws IOException {
String[] files = directory.listAll();
IOException lastException = null;
for (String file : files) {
if (isChecksum(file)) {
try {
directory.deleteFileChecksum(file);
} catch (IOException e) {
lastException = e;
}
} else {
try {
directory.deleteFile(file);
} catch (FileNotFoundException e) {
// ignore
} catch (IOException e) {
lastException = e;
}
}
}
if (lastException != null) {
throw lastException;
}
}
public void fullDelete() throws IOException {
deleteContent();
directoryService.fullDelete(directory.delegate());
}
public StoreStats stats() throws IOException {
return new StoreStats(Directories.estimateSize(directory));
}
public ByteSizeValue estimateSize() throws IOException {
return new ByteSizeValue(Directories.estimateSize(directory));
}
public void renameFile(String from, String to) throws IOException {
directoryService.renameFile(directory.delegate(), from, to);
synchronized (mutex) {
StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one
StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
public static Map<String, String> readChecksums(Directory dir) throws IOException {
long lastFound = -1;
for (String name : dir.listAll()) {
if (!isChecksum(name)) {
continue;
}
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
if (current > lastFound) {
lastFound = current;
}
}
if (lastFound == -1) {
return ImmutableMap.of();
}
IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound);
try {
indexInput.readInt(); // version
return indexInput.readStringStringMap();
} catch (Exception e) {
// failed to load checksums, ignore and return an empty map
return new HashMap<String, String>();
} finally {
indexInput.close();
}
}
public void writeChecksums() throws IOException {
writeChecksums(directory);
}
private void writeChecksums(StoreDirectory dir) throws IOException {
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
ImmutableMap<String, StoreFileMetaData> files = list();
synchronized (mutex) {
Map<String, String> checksums = new HashMap<String, String>();
for (StoreFileMetaData metaData : files.values()) {
if (metaData.checksum() != null) {
checksums.put(metaData.name(), metaData.checksum());
}
}
IndexOutput output = dir.createOutput(checksumName, false);
output.writeInt(0); // version
output.writeStringStringMap(checksums);
output.close();
}
for (StoreFileMetaData metaData : files.values()) {
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
try {
dir.deleteFileChecksum(metaData.name());
} catch (Exception e) {
// ignore
}
}
}
}
/**
* The Lucene {@link Directory} this store is using.
* Returns <tt>true</tt> by default.
*/
Directory directory();
public boolean suggestUseCompoundFile() {
return false;
}
IndexOutput createOutputWithNoChecksum(String name) throws IOException;
public void close() throws IOException {
directory.close();
}
void writeChecksum(String name, String checksum) throws IOException;
public IndexOutput createOutputWithNoChecksum(String name) throws IOException {
return directory.createOutput(name, false);
}
void writeChecksums(Map<String, String> checksums) throws IOException;
public void writeChecksum(String name, String checksum) throws IOException {
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
writeChecksums();
}
}
StoreFileMetaData metaData(String name) throws IOException;
ImmutableMap<String, StoreFileMetaData> list() throws IOException;
public void writeChecksums(Map<String, String> checksums) throws IOException {
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
for (Map.Entry<String, String> entry : checksums.entrySet()) {
StoreFileMetaData metaData = filesMetadata.get(entry.getKey());
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap();
}
writeChecksums();
}
}
/**
* Just deletes the content of the store.
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
void deleteContent() throws IOException;
protected class StoreDirectory extends Directory implements ForceSyncDirectory {
/**
* Renames, note, might not be atomic, and can fail "in the middle".
*/
void renameFile(String from, String to) throws IOException;
private final Directory delegate;
/**
* Deletes the store completely. For example, in FS ones, also deletes the parent
* directory.
*/
void fullDelete() throws IOException;
StoreDirectory(Directory delegate) throws IOException {
this.delegate = delegate;
synchronized (mutex) {
Map<String, String> checksums = readChecksums(delegate);
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
for (String file : delegate.listAll()) {
// BACKWARD CKS SUPPORT
if (file.endsWith(".cks")) { // ignore checksum files here
continue;
}
String checksum = checksums.get(file);
StoreStats stats() throws IOException;
// BACKWARD CKS SUPPORT
if (checksum == null) {
if (delegate.fileExists(file + ".cks")) {
IndexInput indexInput = delegate.openInput(file + ".cks");
try {
if (indexInput.length() > 0) {
byte[] checksumBytes = new byte[(int) indexInput.length()];
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
checksum = Unicode.fromBytes(checksumBytes);
}
} finally {
indexInput.close();
}
}
}
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum));
}
filesMetadata = builder.immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
/**
* The estimated size this store is using.
*/
ByteSizeValue estimateSize() throws IOException;
public Directory delegate() {
return delegate;
}
/**
* The store can suggest the best setting for compound file the
* {@link org.apache.lucene.index.MergePolicy} will use.
*/
boolean suggestUseCompoundFile();
@Override public String[] listAll() throws IOException {
return files;
}
/**
* Close the store.
*/
void close() throws IOException;
@Override public boolean fileExists(String name) throws IOException {
return filesMetadata.containsKey(name);
}
@Override public long fileModified(String name) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
// not set yet (IndexOutput not closed)
if (metaData.lastModified() != -1) {
return metaData.lastModified();
}
return delegate.fileModified(name);
}
@Override public void touchFile(String name) throws IOException {
delegate.touchFile(name);
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) {
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
}
}
}
public void deleteFileChecksum(String name) throws IOException {
try {
delegate.deleteFile(name);
} catch (IOException e) {
if (delegate.fileExists(name)) {
throw e;
}
}
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public void deleteFile(String name) throws IOException {
// we don't allow to delete the checksums files, only using the deleteChecksum method
if (isChecksum(name)) {
return;
}
try {
delegate.deleteFile(name);
} catch (IOException e) {
if (delegate.fileExists(name)) {
throw e;
}
}
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public long fileLength(String name) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
// not set yet (IndexOutput not closed)
if (metaData.length() != -1) {
return metaData.length();
}
return delegate.fileLength(name);
}
@Override public IndexOutput createOutput(String name) throws IOException {
return createOutput(name, true);
}
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException {
IndexOutput out = delegate.createOutput(name);
synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
return new StoreIndexOutput(out, name, computeChecksum);
}
@Override public IndexInput openInput(String name) throws IOException {
return delegate.openInput(name);
}
@Override public void close() throws IOException {
delegate.close();
synchronized (mutex) {
filesMetadata = ImmutableMap.of();
files = Strings.EMPTY_ARRAY;
}
}
@Override public Lock makeLock(String name) {
return delegate.makeLock(name);
}
@Override public IndexInput openInput(String name, int bufferSize) throws IOException {
return delegate.openInput(name, bufferSize);
}
@Override public void clearLock(String name) throws IOException {
delegate.clearLock(name);
}
@Override public void setLockFactory(LockFactory lockFactory) throws IOException {
delegate.setLockFactory(lockFactory);
}
@Override public LockFactory getLockFactory() {
return delegate.getLockFactory();
}
@Override public String getLockID() {
return delegate.getLockID();
}
@Override public void sync(Collection<String> names) throws IOException {
if (sync) {
delegate.sync(names);
}
for (String name : names) {
// write the checksums file when we sync on the segments file (committed)
if (!name.equals("segments.gen") && name.startsWith("segments")) {
writeChecksums();
break;
}
}
}
@Override public void sync(String name) throws IOException {
if (sync) {
delegate.sync(name);
}
// write the checksums file when we sync on the segments file (committed)
if (!name.equals("segments.gen") && name.startsWith("segments")) {
writeChecksums();
}
}
@Override public void forceSync(String name) throws IOException {
delegate.sync(name);
}
}
class StoreIndexOutput extends IndexOutput {
private final IndexOutput delegate;
private final String name;
private final Checksum digest;
StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) {
this.delegate = delegate;
this.name = name;
if (computeChecksum) {
if ("segments.gen".equals(name)) {
// no need to create checksum for segments.gen since its not snapshot to recovery
this.digest = null;
} else if (name.startsWith("segments")) {
// don't compute checksum for segments files, so pure Lucene can open this directory
// and since we, in any case, always recover the segments files
this.digest = null;
} else {
// this.digest = new CRC32();
// adler is faster, and we compare on length as well, should be enough to check for difference
// between files
this.digest = new Adler32();
}
} else {
this.digest = null;
}
}
@Override public void close() throws IOException {
delegate.close();
String checksum = null;
if (digest != null) {
checksum = Long.toString(digest.getValue(), Character.MAX_RADIX);
}
synchronized (mutex) {
StoreFileMetaData md = new StoreFileMetaData(name, directory.delegate().fileLength(name), directory.delegate().fileModified(name), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
if (digest != null) {
digest.update(b);
}
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
if (digest != null) {
digest.update(b, offset, length);
}
}
// don't override it, base class method simple reads from input and writes to this output
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
// delegate.copyBytes(input, numBytes);
// }
@Override public void flush() throws IOException {
delegate.flush();
}
@Override public long getFilePointer() {
return delegate.getFilePointer();
}
@Override public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
delegate.seek(pos);
}
@Override public long length() throws IOException {
return delegate.length();
}
@Override public void setLength(long length) throws IOException {
delegate.setLength(length);
}
@Override public void writeStringStringMap(Map<String, String> map) throws IOException {
delegate.writeStringStringMap(map);
}
}
}

View File

@ -37,7 +37,8 @@ public class StoreModule extends AbstractModule {
}
@Override protected void configure() {
bind(Store.class).to(indexStore.shardStoreClass()).asEagerSingleton();
bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton();
bind(StoreManagement.class).asEagerSingleton();
bind(Store.class).asEagerSingleton();
}
}

View File

@ -25,17 +25,13 @@ import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.File;
import java.io.FileNotFoundException;
@ -43,27 +39,30 @@ import java.io.IOException;
import java.io.InterruptedIOException;
/**
* @author kimchy (shay.banon)
*/
public abstract class FsStore extends AbstractStore {
public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
public static final boolean DEFAULT_SUGGEST_USE_COMPOUND_FILE = false;
protected final FsIndexStore indexStore;
public FsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings, indexStore);
public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings);
this.indexStore = (FsIndexStore) indexStore;
}
@Override public void fullDelete() throws IOException {
FileSystemUtils.deleteRecursively(fsDirectory().getDirectory());
// if we are the last ones, delete also the actual index
String[] list = fsDirectory().getDirectory().getParentFile().list();
if (list == null || list.length == 0) {
FileSystemUtils.deleteRecursively(fsDirectory().getDirectory().getParentFile());
protected LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("fs_lock", "native");
LockFactory lockFactory = NoLockFactory.getNoLockFactory();
if (fsLock.equals("native")) {
// TODO LUCENE MONITOR: this is not needed in next Lucene version
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}
return lockFactory;
}
@Override protected void doRenameFile(String from, String to) throws IOException {
File directory = fsDirectory().getDirectory();
@Override public void renameFile(Directory dir, String from, String to) throws IOException {
File directory = ((FSDirectory) dir).getDirectory();
File old = new File(directory, from);
File nu = new File(directory, to);
if (nu.exists())
@ -91,39 +90,13 @@ public abstract class FsStore extends AbstractStore {
}
}
public abstract FSDirectory fsDirectory();
protected LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("fs_lock", "native");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
// TODO LUCENE MONITOR: this is not needed in next Lucene version
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
@Override public void fullDelete(Directory dir) throws IOException {
FSDirectory fsDirectory = (FSDirectory) dir;
FileSystemUtils.deleteRecursively(fsDirectory.getDirectory());
// if we are the last ones, delete also the actual index
String[] list = fsDirectory.getDirectory().getParentFile().list();
if (list == null || list.length == 0) {
FileSystemUtils.deleteRecursively(fsDirectory.getDirectory().getParentFile());
}
return lockFactory;
}
protected Tuple<SwitchDirectory, Boolean> buildSwitchDirectoryIfNeeded(Directory fsDirectory, ByteBufferCache byteBufferCache) {
boolean cache = componentSettings.getAsBoolean("memory.enabled", false);
if (!cache) {
return null;
}
Directory memDir = new ByteBufferDirectory(byteBufferCache);
// see http://lucene.apache.org/java/3_0_1/fileformats.html
String[] primaryExtensions = componentSettings.getAsArray("memory.extensions", new String[]{"", "del", "gen"});
if (primaryExtensions == null || primaryExtensions.length == 0) {
return null;
}
Boolean forceUseCompound = null;
for (String extension : primaryExtensions) {
if (!("".equals(extension) || "del".equals(extension) || "gen".equals(extension))) {
// caching internal CFS extension, don't use compound file extension
forceUseCompound = false;
}
}
return new Tuple<SwitchDirectory, Boolean>(new SwitchDirectory(ImmutableSet.copyOf(primaryExtensions), memDir, fsDirectory, true), forceUseCompound);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
*/
public class MmapFsDirectoryService extends FsDirectoryService {
@Inject public MmapFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings, indexStore);
}
@Override public Directory build() throws IOException {
File location = indexStore.shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
return new MMapDirectory(location, buildLockFactory());
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.DirectoryService;
/**
* @author kimchy (shay.banon)
@ -36,7 +36,7 @@ public class MmapFsIndexStore extends FsIndexStore {
super(index, indexSettings, indexService, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {
return MmapFsStore.class;
@Override public Class<? extends DirectoryService> shardDirectory() {
return MmapFsDirectoryService.class;
}
}

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class MmapFsStore extends FsStore {
private final MMapDirectory fsDirectory;
private final Directory directory;
private final boolean suggestUseCompoundFile;
@Inject public MmapFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings, indexStore);
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
this.fsDirectory = new MMapDirectory(location, lockFactory);
boolean suggestUseCompoundFile;
Tuple<SwitchDirectory, Boolean> switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
if (switchDirectory.v2() != null) {
suggestUseCompoundFile = switchDirectory.v2();
}
logger.debug("using [mmap_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions());
directory = wrapDirectory(switchDirectory.v1());
} else {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
directory = wrapDirectory(fsDirectory);
logger.debug("using [mmap_fs] store with path [{}]", fsDirectory.getDirectory());
}
this.suggestUseCompoundFile = suggestUseCompoundFile;
}
@Override public FSDirectory fsDirectory() {
return fsDirectory;
}
@Override public Directory directory() {
return directory;
}
@Override public boolean suggestUseCompoundFile() {
return suggestUseCompoundFile;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
*/
public class NioFsDirectoryService extends FsDirectoryService {
@Inject public NioFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings, indexStore);
}
@Override public Directory build() throws IOException {
File location = indexStore.shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
return new NIOFSDirectory(location, buildLockFactory());
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.DirectoryService;
/**
* @author kimchy (shay.banon)
@ -36,7 +36,7 @@ public class NioFsIndexStore extends FsIndexStore {
super(index, indexSettings, indexService, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {
return NioFsStore.class;
@Override public Class<? extends DirectoryService> shardDirectory() {
return NioFsDirectoryService.class;
}
}

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class NioFsStore extends FsStore {
private final NIOFSDirectory fsDirectory;
private final Directory directory;
private final boolean suggestUseCompoundFile;
@Inject public NioFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings, indexStore);
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
this.fsDirectory = new NIOFSDirectory(location, lockFactory);
boolean suggestUseCompoundFile;
Tuple<SwitchDirectory, Boolean> switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
if (switchDirectory.v2() != null) {
suggestUseCompoundFile = switchDirectory.v2();
}
logger.debug("using [nio_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions());
directory = wrapDirectory(switchDirectory.v1());
} else {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
directory = wrapDirectory(fsDirectory);
logger.debug("using [nio_fs] store with path [{}]", fsDirectory.getDirectory());
}
this.suggestUseCompoundFile = suggestUseCompoundFile;
}
@Override public FSDirectory fsDirectory() {
return fsDirectory;
}
@Override public Directory directory() {
return directory;
}
@Override public boolean suggestUseCompoundFile() {
return suggestUseCompoundFile;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
*/
public class SimpleFsDirectoryService extends FsDirectoryService {
@Inject public SimpleFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings, indexStore);
}
@Override public Directory build() throws IOException {
File location = indexStore.shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
return new SimpleFSDirectory(location, buildLockFactory());
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.DirectoryService;
/**
* @author kimchy (shay.banon)
@ -36,7 +36,7 @@ public class SimpleFsIndexStore extends FsIndexStore {
super(index, indexSettings, indexService, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {
return SimpleFsStore.class;
@Override public Class<? extends DirectoryService> shardDirectory() {
return SimpleFsDirectoryService.class;
}
}

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class SimpleFsStore extends FsStore {
private SimpleFSDirectory fsDirectory;
private final Directory directory;
private final boolean suggestUseCompoundFile;
@Inject public SimpleFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings, indexStore);
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId);
FileSystemUtils.mkdirs(location);
this.fsDirectory = new SimpleFSDirectory(location, lockFactory);
boolean suggestUseCompoundFile;
Tuple<SwitchDirectory, Boolean> switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
if (switchDirectory.v2() != null) {
suggestUseCompoundFile = switchDirectory.v2();
}
logger.debug("using [simple_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions());
directory = wrapDirectory(switchDirectory.v1());
} else {
suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE;
directory = wrapDirectory(fsDirectory);
logger.debug("using [simple_fs] store with path [{}]", fsDirectory.getDirectory());
}
this.suggestUseCompoundFile = suggestUseCompoundFile;
}
@Override public FSDirectory fsDirectory() {
return fsDirectory;
}
@Override public Directory directory() {
return directory;
}
@Override public boolean suggestUseCompoundFile() {
return suggestUseCompoundFile;
}
}

View File

@ -27,43 +27,34 @@ import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class ByteBufferStore extends AbstractStore {
public class ByteBufferDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
private final CustomByteBufferDirectory bbDirectory;
private final ByteBufferCache byteBufferCache;
private final Directory directory;
@Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings, indexStore);
this.bbDirectory = new CustomByteBufferDirectory(byteBufferCache);
this.directory = wrapDirectory(bbDirectory);
logger.debug("Using [byte_buffer] store");
@Inject public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) {
super(shardId, indexSettings);
this.byteBufferCache = byteBufferCache;
}
@Override public Directory directory() {
return directory;
@Override public Directory build() {
return new CustomByteBufferDirectory(byteBufferCache);
}
/**
* Its better to not use the compound format when using the Ram store.
*/
@Override public boolean suggestUseCompoundFile() {
return false;
@Override public void renameFile(Directory dir, String from, String to) throws IOException {
((CustomByteBufferDirectory) dir).renameTo(from, to);
}
@Override protected void doRenameFile(String from, String to) throws IOException {
bbDirectory.renameTo(from, to);
@Override public void fullDelete(Directory dir) {
}
static class CustomByteBufferDirectory extends ByteBufferDirectory {

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats;
@ -49,8 +49,8 @@ public class ByteBufferIndexStore extends AbstractIndexStore {
return false;
}
@Override public Class<? extends Store> shardStoreClass() {
return ByteBufferStore.class;
@Override public Class<? extends DirectoryService> shardDirectory() {
return ByteBufferDirectoryService.class;
}
@Override public ByteSizeValue backingStoreTotalSpace() {

View File

@ -25,42 +25,30 @@ import org.apache.lucene.store.RAMFile;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import org.elasticsearch.index.store.DirectoryService;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
*/
public class RamStore extends AbstractStore {
public class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
private final CustomRAMDirectory ramDirectory;
private final Directory directory;
@Inject public RamStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException {
super(shardId, indexSettings, indexStore);
this.ramDirectory = new CustomRAMDirectory();
this.directory = wrapDirectory(ramDirectory);
logger.debug("Using [ram] Store");
@Inject public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
@Override public Directory directory() {
return directory;
@Override public Directory build() {
return new CustomRAMDirectory();
}
/**
* Its better to not use the compound format when using the Ram store.
*/
@Override public boolean suggestUseCompoundFile() {
return false;
@Override public void renameFile(Directory dir, String from, String to) throws IOException {
((CustomRAMDirectory) dir).renameTo(from, to);
}
@Override protected void doRenameFile(String from, String to) throws IOException {
ramDirectory.renameTo(from, to);
@Override public void fullDelete(Directory dir) {
}
static class CustomRAMDirectory extends RAMDirectory {

View File

@ -25,7 +25,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats;
@ -43,8 +43,8 @@ public class RamIndexStore extends AbstractIndexStore {
return false;
}
@Override public Class<? extends Store> shardStoreClass() {
return RamStore.class;
@Override public Class<? extends DirectoryService> shardDirectory() {
return RamDirectoryService.class;
}
@Override public ByteSizeValue backingStoreTotalSpace() {

View File

@ -1,517 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.support;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
/**
* @author kimchy (shay.banon)
*/
public abstract class AbstractStore extends AbstractIndexShardComponent implements Store {
static final String CHECKSUMS_PREFIX = "_checksums-";
public static final boolean isChecksum(String name) {
return name.startsWith(CHECKSUMS_PREFIX);
}
protected final IndexStore indexStore;
private volatile ImmutableMap<String, StoreFileMetaData> filesMetadata = ImmutableMap.of();
private volatile String[] files = Strings.EMPTY_ARRAY;
private final Object mutex = new Object();
private final boolean sync;
protected AbstractStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings);
this.indexStore = indexStore;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
}
protected Directory wrapDirectory(Directory dir) throws IOException {
return new StoreDirectory(dir);
}
@Override public ImmutableMap<String, StoreFileMetaData> list() throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (String name : files) {
StoreFileMetaData md = metaData(name);
if (md != null) {
builder.put(md.name(), md);
}
}
return builder.build();
}
public StoreFileMetaData metaData(String name) throws IOException {
StoreFileMetaData md = filesMetadata.get(name);
if (md == null) {
return null;
}
// IndexOutput not closed, does not exists
if (md.lastModified() == -1 || md.length() == -1) {
return null;
}
return md;
}
@Override public void deleteContent() throws IOException {
String[] files = directory().listAll();
IOException lastException = null;
for (String file : files) {
if (isChecksum(file)) {
((StoreDirectory) directory()).deleteFileChecksum(file);
} else {
try {
directory().deleteFile(file);
} catch (FileNotFoundException e) {
// ignore
} catch (IOException e) {
lastException = e;
}
}
}
if (lastException != null) {
throw lastException;
}
}
@Override public void fullDelete() throws IOException {
deleteContent();
}
@Override public StoreStats stats() throws IOException {
return new StoreStats(Directories.estimateSize(directory()));
}
@Override public ByteSizeValue estimateSize() throws IOException {
return new ByteSizeValue(Directories.estimateSize(directory()));
}
@Override public void renameFile(String from, String to) throws IOException {
doRenameFile(from, to);
synchronized (mutex) {
StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one
StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
protected abstract void doRenameFile(String from, String to) throws IOException;
public static Map<String, String> readChecksums(Directory dir) throws IOException {
long lastFound = -1;
for (String name : dir.listAll()) {
if (!isChecksum(name)) {
continue;
}
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
if (current > lastFound) {
lastFound = current;
}
}
if (lastFound == -1) {
return ImmutableMap.of();
}
IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound);
try {
indexInput.readInt(); // version
return indexInput.readStringStringMap();
} catch (Exception e) {
// failed to load checksums, ignore and return an empty map
return new HashMap<String, String>();
} finally {
indexInput.close();
}
}
public void writeChecksums() throws IOException {
writeChecksums((StoreDirectory) directory());
}
private void writeChecksums(StoreDirectory dir) throws IOException {
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
ImmutableMap<String, StoreFileMetaData> files = list();
synchronized (mutex) {
Map<String, String> checksums = new HashMap<String, String>();
for (StoreFileMetaData metaData : files.values()) {
if (metaData.checksum() != null) {
checksums.put(metaData.name(), metaData.checksum());
}
}
IndexOutput output = dir.createOutput(checksumName, false);
output.writeInt(0); // version
output.writeStringStringMap(checksums);
output.close();
}
for (StoreFileMetaData metaData : files.values()) {
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
try {
dir.deleteFileChecksum(metaData.name());
} catch (Exception e) {
// ignore
}
}
}
}
/**
* Returns <tt>true</tt> by default.
*/
@Override public boolean suggestUseCompoundFile() {
return true;
}
@Override public void close() throws IOException {
directory().close();
}
@Override public IndexOutput createOutputWithNoChecksum(String name) throws IOException {
return ((StoreDirectory) directory()).createOutput(name, false);
}
@Override public void writeChecksum(String name, String checksum) throws IOException {
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
writeChecksums();
}
}
@Override public void writeChecksums(Map<String, String> checksums) throws IOException {
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
for (Map.Entry<String, String> entry : checksums.entrySet()) {
StoreFileMetaData metaData = filesMetadata.get(entry.getKey());
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap();
}
writeChecksums();
}
}
/**
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
protected class StoreDirectory extends Directory implements ForceSyncDirectory {
private final Directory delegate;
StoreDirectory(Directory delegate) throws IOException {
this.delegate = delegate;
synchronized (mutex) {
Map<String, String> checksums = readChecksums(delegate);
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
for (String file : delegate.listAll()) {
// BACKWARD CKS SUPPORT
if (file.endsWith(".cks")) { // ignore checksum files here
continue;
}
String checksum = checksums.get(file);
// BACKWARD CKS SUPPORT
if (checksum == null) {
if (delegate.fileExists(file + ".cks")) {
IndexInput indexInput = delegate.openInput(file + ".cks");
try {
if (indexInput.length() > 0) {
byte[] checksumBytes = new byte[(int) indexInput.length()];
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
checksum = Unicode.fromBytes(checksumBytes);
}
} finally {
indexInput.close();
}
}
}
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum));
}
filesMetadata = builder.immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
public Directory delegate() {
return delegate;
}
@Override public String[] listAll() throws IOException {
return files;
}
@Override public boolean fileExists(String name) throws IOException {
return filesMetadata.containsKey(name);
}
@Override public long fileModified(String name) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
// not set yet (IndexOutput not closed)
if (metaData.lastModified() != -1) {
return metaData.lastModified();
}
return delegate.fileModified(name);
}
@Override public void touchFile(String name) throws IOException {
delegate.touchFile(name);
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) {
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
}
}
}
public void deleteFileChecksum(String name) throws IOException {
delegate.deleteFile(name);
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public void deleteFile(String name) throws IOException {
// we don't allow to delete the checksums files, only using the deleteChecksum method
if (isChecksum(name)) {
return;
}
delegate.deleteFile(name);
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public long fileLength(String name) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
// not set yet (IndexOutput not closed)
if (metaData.length() != -1) {
return metaData.length();
}
return delegate.fileLength(name);
}
@Override public IndexOutput createOutput(String name) throws IOException {
return createOutput(name, true);
}
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException {
IndexOutput out = delegate.createOutput(name);
synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
return new StoreIndexOutput(out, name, computeChecksum);
}
@Override public IndexInput openInput(String name) throws IOException {
return delegate.openInput(name);
}
@Override public void close() throws IOException {
delegate.close();
synchronized (mutex) {
filesMetadata = ImmutableMap.of();
files = Strings.EMPTY_ARRAY;
}
}
@Override public Lock makeLock(String name) {
return delegate.makeLock(name);
}
@Override public IndexInput openInput(String name, int bufferSize) throws IOException {
return delegate.openInput(name, bufferSize);
}
@Override public void clearLock(String name) throws IOException {
delegate.clearLock(name);
}
@Override public void setLockFactory(LockFactory lockFactory) throws IOException {
delegate.setLockFactory(lockFactory);
}
@Override public LockFactory getLockFactory() {
return delegate.getLockFactory();
}
@Override public String getLockID() {
return delegate.getLockID();
}
@Override public void sync(Collection<String> names) throws IOException {
if (sync) {
delegate.sync(names);
}
for (String name : names) {
// write the checksums file when we sync on the segments file (committed)
if (!name.equals("segments.gen") && name.startsWith("segments")) {
writeChecksums();
break;
}
}
}
@Override public void sync(String name) throws IOException {
if (sync) {
delegate.sync(name);
}
// write the checksums file when we sync on the segments file (committed)
if (!name.equals("segments.gen") && name.startsWith("segments")) {
writeChecksums();
}
}
@Override public void forceSync(String name) throws IOException {
delegate.sync(name);
}
}
class StoreIndexOutput extends IndexOutput {
private final IndexOutput delegate;
private final String name;
private final Checksum digest;
StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) {
this.delegate = delegate;
this.name = name;
if (computeChecksum) {
if ("segments.gen".equals(name)) {
// no need to create checksum for segments.gen since its not snapshot to recovery
this.digest = null;
} else if (name.startsWith("segments")) {
// don't compute checksum for segments files, so pure Lucene can open this directory
// and since we, in any case, always recover the segments files
this.digest = null;
} else {
// this.digest = new CRC32();
// adler is faster, and we compare on length as well, should be enough to check for difference
// between files
this.digest = new Adler32();
}
} else {
this.digest = null;
}
}
@Override public void close() throws IOException {
delegate.close();
String checksum = null;
if (digest != null) {
checksum = Long.toString(digest.getValue(), Character.MAX_RADIX);
}
synchronized (mutex) {
StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
@Override public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
if (digest != null) {
digest.update(b);
}
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
if (digest != null) {
digest.update(b, offset, length);
}
}
// don't override it, base class method simple reads from input and writes to this output
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
// delegate.copyBytes(input, numBytes);
// }
@Override public void flush() throws IOException {
delegate.flush();
}
@Override public long getFilePointer() {
return delegate.getFilePointer();
}
@Override public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
delegate.seek(pos);
}
@Override public long length() throws IOException {
return delegate.length();
}
@Override public void setLength(long length) throws IOException {
delegate.setLength(length);
}
@Override public void writeStringStringMap(Map<String, String> map) throws IOException {
delegate.writeStringStringMap(map);
}
}
}

View File

@ -42,7 +42,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.support.AbstractStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
@ -452,7 +452,7 @@ public class RecoveryTarget extends AbstractComponent {
for (String existingFile : shard.store().directory().listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!request.snapshotFiles().contains(existingFile) && !AbstractStore.isChecksum(existingFile)) {
if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) {
try {
shard.store().directory().deleteFile(existingFile);
} catch (Exception e) {

View File

@ -48,8 +48,8 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.support.AbstractStore;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -172,13 +172,13 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
FSDirectory directory = FSDirectory.open(indexFile);
Map<String, String> checksums = null;
try {
checksums = AbstractStore.readChecksums(directory);
checksums = Store.readChecksums(directory);
for (File file : indexFile.listFiles()) {
// BACKWARD CKS SUPPORT
if (file.getName().endsWith(".cks")) {
continue;
}
if (AbstractStore.isChecksum(file.getName())) {
if (Store.isChecksum(file.getName())) {
continue;
}
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName())));

View File

@ -39,7 +39,7 @@ import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.ram.RamStore;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.testng.annotations.AfterMethod;
@ -96,11 +96,11 @@ public abstract class AbstractSimpleEngineTests {
}
protected Store createStore() throws IOException {
return new RamStore(shardId, EMPTY_SETTINGS, null);
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
}
protected Store createStoreReplica() throws IOException {
return new RamStore(shardId, EMPTY_SETTINGS, null);
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
}
protected Translog createTranslog() {