diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java deleted file mode 100644 index 91296cd0f73..00000000000 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.benchmark.index.engine; - -import org.apache.lucene.document.Document; -import org.apache.lucene.document.LoadFirstFieldSelector; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.StopWatch; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.analysis.AnalysisService; -import org.elasticsearch.index.cache.bloom.none.NoneBloomCache; -import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.robin.RobinEngine; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; -import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; -import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.similarity.SimilarityService; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.memory.ByteBufferStore; -import org.elasticsearch.index.translog.fs.FsTranslog; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.common.lucene.DocumentBuilder.*; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; - -/** - * @author kimchy (Shay Banon) - */ -public class SimpleEngineBenchmark { - - private final Store store; - - private final Engine engine; - - - private final AtomicInteger idGenerator = new AtomicInteger(); - - private String[] contentItems = new String[]{"test1", "test2", "test3"}; - - private static byte[] TRANSLOG_PAYLOAD = new byte[12]; - - private volatile int lastRefreshedId = 0; - - - private boolean create = false; - - private int searcherIterations = 10; - - private Thread[] searcherThreads = new Thread[1]; - - private int writerIterations = 10; - - private Thread[] writerThreads = new Thread[1]; - - private TimeValue refreshSchedule = new TimeValue(1, TimeUnit.SECONDS); - - private TimeValue flushSchedule = new TimeValue(1, TimeUnit.MINUTES); - - - private CountDownLatch latch; - private CyclicBarrier barrier1; - private CyclicBarrier barrier2; - - - // scheduled thread pool for both refresh and flush operations - private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); - - public SimpleEngineBenchmark(Store store, Engine engine) { - this.store = store; - this.engine = engine; - } - - public SimpleEngineBenchmark numberOfContentItems(int numberOfContentItems) { - contentItems = new String[numberOfContentItems]; - for (int i = 0; i < contentItems.length; i++) { - contentItems[i] = "content" + i; - } - return this; - } - - public SimpleEngineBenchmark searcherThreads(int numberOfSearcherThreads) { - searcherThreads = new Thread[numberOfSearcherThreads]; - return this; - } - - public SimpleEngineBenchmark searcherIterations(int searcherIterations) { - this.searcherIterations = searcherIterations; - return this; - } - - public SimpleEngineBenchmark writerThreads(int numberOfWriterThreads) { - writerThreads = new Thread[numberOfWriterThreads]; - return this; - } - - public SimpleEngineBenchmark writerIterations(int writerIterations) { - this.writerIterations = writerIterations; - return this; - } - - public SimpleEngineBenchmark refreshSchedule(TimeValue refreshSchedule) { - this.refreshSchedule = refreshSchedule; - return this; - } - - public SimpleEngineBenchmark flushSchedule(TimeValue flushSchedule) { - this.flushSchedule = flushSchedule; - return this; - } - - public SimpleEngineBenchmark create(boolean create) { - this.create = create; - return this; - } - - public SimpleEngineBenchmark build() { - for (int i = 0; i < searcherThreads.length; i++) { - searcherThreads[i] = new Thread(new SearcherThread(), "Searcher[" + i + "]"); - } - for (int i = 0; i < writerThreads.length; i++) { - writerThreads[i] = new Thread(new WriterThread(), "Writer[" + i + "]"); - } - - latch = new CountDownLatch(searcherThreads.length + writerThreads.length); - barrier1 = new CyclicBarrier(searcherThreads.length + writerThreads.length + 1); - barrier2 = new CyclicBarrier(searcherThreads.length + writerThreads.length + 1); - - // warmup by indexing all content items - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - for (String contentItem : contentItems) { - int id = idGenerator.incrementAndGet(); - String sId = Integer.toString(id); - Document doc = doc().add(field("_id", sId)) - .add(field("content", contentItem)).build(); - ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); - if (create) { - engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); - } else { - engine.index(new Engine.Index(null, new Term("_id", sId), pDoc)); - } - } - engine.refresh(new Engine.Refresh(true)); - stopWatch.stop(); - System.out.println("Warmup of [" + contentItems.length + "] content items, took " + stopWatch.totalTime()); - - return this; - } - - public void run() throws Exception { - for (Thread t : searcherThreads) { - t.start(); - } - for (Thread t : writerThreads) { - t.start(); - } - barrier1.await(); - - Refresher refresher = new Refresher(); - scheduledExecutorService.scheduleWithFixedDelay(refresher, refreshSchedule.millis(), refreshSchedule.millis(), TimeUnit.MILLISECONDS); - Flusher flusher = new Flusher(); - scheduledExecutorService.scheduleWithFixedDelay(flusher, flushSchedule.millis(), flushSchedule.millis(), TimeUnit.MILLISECONDS); - - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - barrier2.await(); - - latch.await(); - stopWatch.stop(); - - System.out.println("Summary"); - System.out.println(" -- Readers [" + searcherThreads.length + "] with [" + searcherIterations + "] iterations"); - System.out.println(" -- Writers [" + writerThreads.length + "] with [" + writerIterations + "] iterations"); - System.out.println(" -- Took: " + stopWatch.totalTime()); - System.out.println(" -- Refresh [" + refresher.id + "] took: " + refresher.stopWatch.totalTime()); - System.out.println(" -- Flush [" + flusher.id + "] took: " + flusher.stopWatch.totalTime()); - System.out.println(" -- Store size " + store.estimateSize()); - - scheduledExecutorService.shutdown(); - - engine.refresh(new Engine.Refresh(true)); - stopWatch = new StopWatch(); - stopWatch.start(); - Engine.Searcher searcher = engine.searcher(); - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), idGenerator.get() + 1); - stopWatch.stop(); - System.out.println(" -- Indexed [" + idGenerator.get() + "] docs, found [" + topDocs.totalHits + "] hits, took " + stopWatch.totalTime()); - searcher.release(); - } - - private String content(long number) { - return contentItems[((int) (number % contentItems.length))]; - } - - private class Flusher implements Runnable { - StopWatch stopWatch = new StopWatch(); - private int id; - - @Override public void run() { - stopWatch.start("" + ++id); - engine.flush(new Engine.Flush()); - stopWatch.stop(); - } - } - - private class Refresher implements Runnable { - StopWatch stopWatch = new StopWatch(); - private int id; - - @Override public synchronized void run() { - stopWatch.start("" + ++id); - int lastId = idGenerator.get(); - engine.refresh(new Engine.Refresh(true)); - lastRefreshedId = lastId; - stopWatch.stop(); - } - } - - private class SearcherThread implements Runnable { - @Override public void run() { - try { - barrier1.await(); - barrier2.await(); - for (int i = 0; i < searcherIterations; i++) { - Engine.Searcher searcher = engine.searcher(); - TopDocs topDocs = searcher.searcher().search(new TermQuery(new Term("content", content(i))), 10); - // read one - searcher.searcher().doc(topDocs.scoreDocs[0].doc, new LoadFirstFieldSelector()); - searcher.release(); - } - } catch (Exception e) { - System.out.println("Searcher thread failed"); - e.printStackTrace(); - } finally { - latch.countDown(); - } - } - } - - private class WriterThread implements Runnable { - @Override public void run() { - try { - barrier1.await(); - barrier2.await(); - for (int i = 0; i < writerIterations; i++) { - int id = idGenerator.incrementAndGet(); - String sId = Integer.toString(id); - Document doc = doc().add(field("_id", sId)) - .add(field("content", content(id))).build(); - ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); - if (create) { - engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); - } else { - engine.index(new Engine.Index(null, new Term("_id", sId), pDoc)); - } - } - } catch (Exception e) { - System.out.println("Writer thread failed"); - e.printStackTrace(); - } finally { - latch.countDown(); - } - } - } - - public static void main(String[] args) throws Exception { - ShardId shardId = new ShardId(new Index("index"), 1); - Settings settings = EMPTY_SETTINGS; - -// Store store = new RamStore(shardId, settings); - Store store = new ByteBufferStore(shardId, settings, null, new ByteBufferCache(settings)); -// Store store = new NioFsStore(shardId, settings); - - store.deleteContent(); - - ThreadPool threadPool = new ThreadPool(); - SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); - Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog")), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)), - new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); - engine.start(); - - SimpleEngineBenchmark benchmark = new SimpleEngineBenchmark(store, engine) - .numberOfContentItems(1000) - .searcherThreads(50).searcherIterations(10000) - .writerThreads(10).writerIterations(10000) - .refreshSchedule(new TimeValue(1, TimeUnit.SECONDS)) - .flushSchedule(new TimeValue(1, TimeUnit.MINUTES)) - .create(false) - .build(); - - benchmark.run(); - - engine.close(); - store.close(); - threadPool.shutdown(); - } -} diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java deleted file mode 100644 index e34d379cb0a..00000000000 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.benchmark.index.store; - -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.StopWatch; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.env.Environment; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.fs.*; -import org.elasticsearch.index.store.memory.ByteBufferStore; -import org.elasticsearch.index.store.ram.RamStore; - -import java.lang.management.ManagementFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicLong; - -import static java.util.concurrent.TimeUnit.*; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; - -/** - * @author kimchy - */ -public class SimpleStoreBenchmark { - - private final AtomicLong dynamicFilesCounter = new AtomicLong(); - - private final Store store; - - private String[] staticFiles = new String[10]; - - private ByteSizeValue staticFileSize = new ByteSizeValue(5, ByteSizeUnit.MB); - - private ByteSizeValue dynamicFileSize = new ByteSizeValue(1, ByteSizeUnit.MB); - - - private int readerIterations = 10; - - private int writerIterations = 10; - - private Thread[] readerThreads = new Thread[1]; - - private Thread[] writerThreads = new Thread[1]; - - private CountDownLatch latch; - private CyclicBarrier barrier1; - private CyclicBarrier barrier2; - - public SimpleStoreBenchmark(Store store) throws Exception { - this.store = store; - } - - public SimpleStoreBenchmark numberStaticFiles(int numberStaticFiles) { - this.staticFiles = new String[numberStaticFiles]; - return this; - } - - public SimpleStoreBenchmark staticFileSize(ByteSizeValue staticFileSize) { - this.staticFileSize = staticFileSize; - return this; - } - - public SimpleStoreBenchmark dynamicFileSize(ByteSizeValue dynamicFileSize) { - this.dynamicFileSize = dynamicFileSize; - return this; - } - - public SimpleStoreBenchmark readerThreads(int readerThreads) { - this.readerThreads = new Thread[readerThreads]; - return this; - } - - public SimpleStoreBenchmark readerIterations(int readerIterations) { - this.readerIterations = readerIterations; - return this; - } - - public SimpleStoreBenchmark writerIterations(int writerIterations) { - this.writerIterations = writerIterations; - return this; - } - - public SimpleStoreBenchmark writerThreads(int writerThreads) { - this.writerThreads = new Thread[writerThreads]; - return this; - } - - public SimpleStoreBenchmark build() throws Exception { - System.out.println("Creating [" + staticFiles.length + "] static files with size [" + staticFileSize + "]"); - for (int i = 0; i < staticFiles.length; i++) { - staticFiles[i] = "static" + i; - IndexOutput io = store.directory().createOutput(staticFiles[i]); - for (long sizeCounter = 0; sizeCounter < staticFileSize.bytes(); sizeCounter++) { - io.writeByte((byte) 1); - } - io.close(); - } - System.out.println("Using [" + dynamicFileSize + "] size for dynamic files"); - - // warmp - StopWatch stopWatch = new StopWatch("warmup"); - stopWatch.start(); - for (String staticFile : staticFiles) { - IndexInput ii = store.directory().openInput(staticFile); - // do a full read - for (long counter = 0; counter < ii.length(); counter++) { - byte result = ii.readByte(); - if (result != 1) { - System.out.println("Failure, read wrong value [" + result + "]"); - } - } - // do a list of the files - store.directory().listAll(); - } - stopWatch.stop(); - System.out.println("Warmup Took: " + stopWatch.shortSummary()); - - for (int i = 0; i < readerThreads.length; i++) { - readerThreads[i] = new Thread(new ReaderThread(), "Reader[" + i + "]"); - } - for (int i = 0; i < writerThreads.length; i++) { - writerThreads[i] = new Thread(new WriterThread(), "Writer[" + i + "]"); - } - - latch = new CountDownLatch(readerThreads.length + writerThreads.length); - barrier1 = new CyclicBarrier(readerThreads.length + writerThreads.length + 1); - barrier2 = new CyclicBarrier(readerThreads.length + writerThreads.length + 1); - - return this; - } - - public void run() throws Exception { - for (int i = 0; i < 3; i++) { - System.gc(); - MILLISECONDS.sleep(100); - } - - long emptyUsed = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); - - System.out.println("Running:"); - System.out.println(" -- Readers [" + readerThreads.length + "] with [" + readerIterations + "] iterations"); - System.out.println(" -- Writers [" + writerThreads.length + "] with [" + writerIterations + "] iterations"); - for (Thread t : readerThreads) { - t.start(); - } - for (Thread t : writerThreads) { - t.start(); - } - barrier1.await(); - - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - barrier2.await(); - - latch.await(); - stopWatch.stop(); - - System.out.println("Took: " + stopWatch.shortSummary()); - - for (int i = 0; i < 3; i++) { - System.gc(); - MILLISECONDS.sleep(100); - } - long bytesTaken = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() - emptyUsed; - System.out.println("Size of [" + staticFiles.length + "], each with size [" + staticFileSize + "], is " + new ByteSizeValue(bytesTaken, ByteSizeUnit.BYTES)); - } - - private class ReaderThread implements Runnable { - @Override public void run() { - try { - barrier1.await(); - barrier2.await(); - } catch (Exception e) { - e.printStackTrace(); - } - try { - for (int i = 0; i < readerIterations; i++) { - for (String staticFile : staticFiles) { - // do a list of the files - store.directory().listAll(); - - IndexInput ii = store.directory().openInput(staticFile); - // do a full read - for (long counter = 0; counter < ii.length(); counter++) { - byte result = ii.readByte(); - if (result != 1) { - System.out.println("Failure, read wrong value [" + result + "]"); - } - } - // do a list of the files - store.directory().listAll(); - - // do a seek and read some byes - ii.seek(ii.length() / 2); - ii.readByte(); - ii.readByte(); - - // do a list of the files - store.directory().listAll(); - } - } - } catch (Exception e) { - System.out.println("Reader Thread failed: " + e.getMessage()); - e.printStackTrace(); - } - latch.countDown(); - } - } - - private class WriterThread implements Runnable { - @Override public void run() { - try { - barrier1.await(); - barrier2.await(); - } catch (Exception e) { - e.printStackTrace(); - } - try { - for (int i = 0; i < writerIterations; i++) { - String dynamicFileName = "dynamic" + dynamicFilesCounter.incrementAndGet(); - IndexOutput io = store.directory().createOutput(dynamicFileName); - for (long sizeCounter = 0; sizeCounter < dynamicFileSize.bytes(); sizeCounter++) { - io.writeByte((byte) 1); - } - io.close(); - - store.directory().deleteFile(dynamicFileName); - } - } catch (Exception e) { - System.out.println("Writer thread failed: " + e.getMessage()); - e.printStackTrace(); - } - latch.countDown(); - } - } - - public static void main(String[] args) throws Exception { - Environment environment = new Environment(); - Settings settings = EMPTY_SETTINGS; - NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment); - ByteBufferCache byteBufferCache = new ByteBufferCache(settings); - - ShardId shardId = new ShardId(new Index("index"), 1); - String type = args.length > 0 ? args[0] : "ram"; - Store store; - if (type.equalsIgnoreCase("ram")) { - store = new RamStore(shardId, settings, null); - } else if (type.equalsIgnoreCase("simple-fs")) { - store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache); - } else if (type.equalsIgnoreCase("mmap-fs")) { - store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache); - } else if (type.equalsIgnoreCase("nio-fs")) { - store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache); - } else if (type.equalsIgnoreCase("memory")) { - store = new ByteBufferStore(shardId, settings, null, byteBufferCache); - } else { - throw new IllegalArgumentException("No type store [" + type + "]"); - } - System.out.println("Using Store [" + store + "]"); - store.deleteContent(); - SimpleStoreBenchmark simpleStoreBenchmark = new SimpleStoreBenchmark(store) - .numberStaticFiles(5).staticFileSize(new ByteSizeValue(5, ByteSizeUnit.MB)) - .dynamicFileSize(new ByteSizeValue(1, ByteSizeUnit.MB)) - .readerThreads(5).readerIterations(10) - .writerThreads(2).writerIterations(10) - .build(); - simpleStoreBenchmark.run(); - store.close(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java index 50eeb0c4f6d..a5efd0e9113 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java @@ -124,6 +124,10 @@ public class FileSystemUtils { return deleteRecursively(root, true); } + private static boolean innerDeleteRecursively(File root) { + return deleteRecursively(root, true); + } + /** * Delete the supplied {@link java.io.File} - for directories, * recursively delete any nested directories or files as well. @@ -139,7 +143,7 @@ public class FileSystemUtils { File[] children = root.listFiles(); if (children != null) { for (File aChildren : children) { - deleteRecursively(aChildren); + innerDeleteRecursively(aChildren); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java new file mode 100644 index 00000000000..f85ed364168 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/DirectoryService.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +import org.apache.lucene.store.Directory; + +import java.io.IOException; + +/** + */ +public interface DirectoryService { + + Directory build() throws IOException; + + void renameFile(Directory dir, String from, String to) throws IOException; + + void fullDelete(Directory dir) throws IOException; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java index ccf82c37452..27c3eedf26a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -40,7 +40,7 @@ public interface IndexStore extends IndexComponent { /** * The shard store class that should be used for each shard. */ - Class shardStoreClass(); + Class shardDirectory(); /** * Returns the backing store total space. Return -1 if not available. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 3977393b73f..04fbafa37b2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -20,65 +20,516 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockFactory; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.Directories; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.shard.IndexShardComponent; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.support.ForceSyncDirectory; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.zip.Adler32; +import java.util.zip.Checksum; /** - * @author kimchy (shay.banon) */ -public interface Store extends IndexShardComponent { +public class Store extends AbstractIndexShardComponent { + + static final String CHECKSUMS_PREFIX = "_checksums-"; + + public static final boolean isChecksum(String name) { + return name.startsWith(CHECKSUMS_PREFIX); + } + + private final IndexStore indexStore; + + private final DirectoryService directoryService; + + private final StoreDirectory directory; + + private volatile ImmutableMap filesMetadata = ImmutableMap.of(); + + private volatile String[] files = Strings.EMPTY_ARRAY; + + private final Object mutex = new Object(); + + private final boolean sync; + + @Inject public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException { + super(shardId, indexSettings); + this.indexStore = indexStore; + this.directoryService = directoryService; + this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... + this.directory = new StoreDirectory(directoryService.build()); + } + + public Directory directory() { + return directory; + } + + public ImmutableMap list() throws IOException { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (String name : files) { + StoreFileMetaData md = metaData(name); + if (md != null) { + builder.put(md.name(), md); + } + } + return builder.build(); + } + + public StoreFileMetaData metaData(String name) throws IOException { + StoreFileMetaData md = filesMetadata.get(name); + if (md == null) { + return null; + } + // IndexOutput not closed, does not exists + if (md.lastModified() == -1 || md.length() == -1) { + return null; + } + return md; + } + + public void deleteContent() throws IOException { + String[] files = directory.listAll(); + IOException lastException = null; + for (String file : files) { + if (isChecksum(file)) { + try { + directory.deleteFileChecksum(file); + } catch (IOException e) { + lastException = e; + } + } else { + try { + directory.deleteFile(file); + } catch (FileNotFoundException e) { + // ignore + } catch (IOException e) { + lastException = e; + } + } + } + if (lastException != null) { + throw lastException; + } + } + + public void fullDelete() throws IOException { + deleteContent(); + directoryService.fullDelete(directory.delegate()); + } + + public StoreStats stats() throws IOException { + return new StoreStats(Directories.estimateSize(directory)); + } + + public ByteSizeValue estimateSize() throws IOException { + return new ByteSizeValue(Directories.estimateSize(directory)); + } + + public void renameFile(String from, String to) throws IOException { + directoryService.renameFile(directory.delegate(), from, to); + synchronized (mutex) { + StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one + StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum()); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } + + public static Map readChecksums(Directory dir) throws IOException { + long lastFound = -1; + for (String name : dir.listAll()) { + if (!isChecksum(name)) { + continue; + } + long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length())); + if (current > lastFound) { + lastFound = current; + } + } + if (lastFound == -1) { + return ImmutableMap.of(); + } + IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound); + try { + indexInput.readInt(); // version + return indexInput.readStringStringMap(); + } catch (Exception e) { + // failed to load checksums, ignore and return an empty map + return new HashMap(); + } finally { + indexInput.close(); + } + } + + public void writeChecksums() throws IOException { + writeChecksums(directory); + } + + private void writeChecksums(StoreDirectory dir) throws IOException { + String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis(); + ImmutableMap files = list(); + synchronized (mutex) { + Map checksums = new HashMap(); + for (StoreFileMetaData metaData : files.values()) { + if (metaData.checksum() != null) { + checksums.put(metaData.name(), metaData.checksum()); + } + } + IndexOutput output = dir.createOutput(checksumName, false); + output.writeInt(0); // version + output.writeStringStringMap(checksums); + output.close(); + } + for (StoreFileMetaData metaData : files.values()) { + if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) { + try { + dir.deleteFileChecksum(metaData.name()); + } catch (Exception e) { + // ignore + } + } + } + } /** - * The Lucene {@link Directory} this store is using. + * Returns true by default. */ - Directory directory(); + public boolean suggestUseCompoundFile() { + return false; + } - IndexOutput createOutputWithNoChecksum(String name) throws IOException; + public void close() throws IOException { + directory.close(); + } - void writeChecksum(String name, String checksum) throws IOException; + public IndexOutput createOutputWithNoChecksum(String name) throws IOException { + return directory.createOutput(name, false); + } - void writeChecksums(Map checksums) throws IOException; + public void writeChecksum(String name, String checksum) throws IOException { + // update the metadata to include the checksum and write a new checksums file + synchronized (mutex) { + StoreFileMetaData metaData = filesMetadata.get(name); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); + writeChecksums(); + } + } - StoreFileMetaData metaData(String name) throws IOException; - - ImmutableMap list() throws IOException; + public void writeChecksums(Map checksums) throws IOException { + // update the metadata to include the checksum and write a new checksums file + synchronized (mutex) { + for (Map.Entry entry : checksums.entrySet()) { + StoreFileMetaData metaData = filesMetadata.get(entry.getKey()); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue()); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap(); + } + writeChecksums(); + } + } /** - * Just deletes the content of the store. + * The idea of the store directory is to cache file level meta data, as well as md5 of it */ - void deleteContent() throws IOException; + protected class StoreDirectory extends Directory implements ForceSyncDirectory { - /** - * Renames, note, might not be atomic, and can fail "in the middle". - */ - void renameFile(String from, String to) throws IOException; + private final Directory delegate; - /** - * Deletes the store completely. For example, in FS ones, also deletes the parent - * directory. - */ - void fullDelete() throws IOException; + StoreDirectory(Directory delegate) throws IOException { + this.delegate = delegate; + synchronized (mutex) { + Map checksums = readChecksums(delegate); + MapBuilder builder = MapBuilder.newMapBuilder(); + for (String file : delegate.listAll()) { + // BACKWARD CKS SUPPORT + if (file.endsWith(".cks")) { // ignore checksum files here + continue; + } + String checksum = checksums.get(file); - StoreStats stats() throws IOException; + // BACKWARD CKS SUPPORT + if (checksum == null) { + if (delegate.fileExists(file + ".cks")) { + IndexInput indexInput = delegate.openInput(file + ".cks"); + try { + if (indexInput.length() > 0) { + byte[] checksumBytes = new byte[(int) indexInput.length()]; + indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false); + checksum = Unicode.fromBytes(checksumBytes); + } + } finally { + indexInput.close(); + } + } + } + builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum)); + } + filesMetadata = builder.immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } - /** - * The estimated size this store is using. - */ - ByteSizeValue estimateSize() throws IOException; + public Directory delegate() { + return delegate; + } - /** - * The store can suggest the best setting for compound file the - * {@link org.apache.lucene.index.MergePolicy} will use. - */ - boolean suggestUseCompoundFile(); + @Override public String[] listAll() throws IOException { + return files; + } - /** - * Close the store. - */ - void close() throws IOException; + @Override public boolean fileExists(String name) throws IOException { + return filesMetadata.containsKey(name); + } + + @Override public long fileModified(String name) throws IOException { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + // not set yet (IndexOutput not closed) + if (metaData.lastModified() != -1) { + return metaData.lastModified(); + } + return delegate.fileModified(name); + } + + @Override public void touchFile(String name) throws IOException { + delegate.touchFile(name); + synchronized (mutex) { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData != null) { + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum()); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); + } + } + } + + public void deleteFileChecksum(String name) throws IOException { + try { + delegate.deleteFile(name); + } catch (IOException e) { + if (delegate.fileExists(name)) { + throw e; + } + } + synchronized (mutex) { + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } + + @Override public void deleteFile(String name) throws IOException { + // we don't allow to delete the checksums files, only using the deleteChecksum method + if (isChecksum(name)) { + return; + } + try { + delegate.deleteFile(name); + } catch (IOException e) { + if (delegate.fileExists(name)) { + throw e; + } + } + synchronized (mutex) { + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } + + @Override public long fileLength(String name) throws IOException { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + // not set yet (IndexOutput not closed) + if (metaData.length() != -1) { + return metaData.length(); + } + return delegate.fileLength(name); + } + + @Override public IndexOutput createOutput(String name) throws IOException { + return createOutput(name, true); + } + + public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { + IndexOutput out = delegate.createOutput(name); + synchronized (mutex) { + StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + return new StoreIndexOutput(out, name, computeChecksum); + } + + @Override public IndexInput openInput(String name) throws IOException { + return delegate.openInput(name); + } + + @Override public void close() throws IOException { + delegate.close(); + synchronized (mutex) { + filesMetadata = ImmutableMap.of(); + files = Strings.EMPTY_ARRAY; + } + } + + @Override public Lock makeLock(String name) { + return delegate.makeLock(name); + } + + @Override public IndexInput openInput(String name, int bufferSize) throws IOException { + return delegate.openInput(name, bufferSize); + } + + @Override public void clearLock(String name) throws IOException { + delegate.clearLock(name); + } + + @Override public void setLockFactory(LockFactory lockFactory) throws IOException { + delegate.setLockFactory(lockFactory); + } + + @Override public LockFactory getLockFactory() { + return delegate.getLockFactory(); + } + + @Override public String getLockID() { + return delegate.getLockID(); + } + + @Override public void sync(Collection names) throws IOException { + if (sync) { + delegate.sync(names); + } + for (String name : names) { + // write the checksums file when we sync on the segments file (committed) + if (!name.equals("segments.gen") && name.startsWith("segments")) { + writeChecksums(); + break; + } + } + } + + @Override public void sync(String name) throws IOException { + if (sync) { + delegate.sync(name); + } + // write the checksums file when we sync on the segments file (committed) + if (!name.equals("segments.gen") && name.startsWith("segments")) { + writeChecksums(); + } + } + + @Override public void forceSync(String name) throws IOException { + delegate.sync(name); + } + } + + class StoreIndexOutput extends IndexOutput { + + private final IndexOutput delegate; + + private final String name; + + private final Checksum digest; + + StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) { + this.delegate = delegate; + this.name = name; + if (computeChecksum) { + if ("segments.gen".equals(name)) { + // no need to create checksum for segments.gen since its not snapshot to recovery + this.digest = null; + } else if (name.startsWith("segments")) { + // don't compute checksum for segments files, so pure Lucene can open this directory + // and since we, in any case, always recover the segments files + this.digest = null; + } else { +// this.digest = new CRC32(); + // adler is faster, and we compare on length as well, should be enough to check for difference + // between files + this.digest = new Adler32(); + } + } else { + this.digest = null; + } + } + + @Override public void close() throws IOException { + delegate.close(); + String checksum = null; + if (digest != null) { + checksum = Long.toString(digest.getValue(), Character.MAX_RADIX); + } + synchronized (mutex) { + StoreFileMetaData md = new StoreFileMetaData(name, directory.delegate().fileLength(name), directory.delegate().fileModified(name), checksum); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } + + @Override public void writeByte(byte b) throws IOException { + delegate.writeByte(b); + if (digest != null) { + digest.update(b); + } + } + + @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { + delegate.writeBytes(b, offset, length); + if (digest != null) { + digest.update(b, offset, length); + } + } + + // don't override it, base class method simple reads from input and writes to this output +// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { +// delegate.copyBytes(input, numBytes); +// } + + @Override public void flush() throws IOException { + delegate.flush(); + } + + @Override public long getFilePointer() { + return delegate.getFilePointer(); + } + + @Override public void seek(long pos) throws IOException { + // seek might be called on files, which means that the checksum is not file checksum + // but a checksum of the bytes written to this stream, which is the same for each + // type of file in lucene + delegate.seek(pos); + } + + @Override public long length() throws IOException { + return delegate.length(); + } + + @Override public void setLength(long length) throws IOException { + delegate.setLength(length); + } + + @Override public void writeStringStringMap(Map map) throws IOException { + delegate.writeStringStringMap(map); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreModule.java index ec036805e13..296f71529a7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreModule.java @@ -37,7 +37,8 @@ public class StoreModule extends AbstractModule { } @Override protected void configure() { - bind(Store.class).to(indexStore.shardStoreClass()).asEagerSingleton(); + bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton(); bind(StoreManagement.class).asEagerSingleton(); + bind(Store.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java similarity index 55% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java index 8edc0fe0be0..0de110e3965 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java @@ -25,17 +25,13 @@ import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.SimpleFSLockFactory; -import org.apache.lucene.store.bytebuffer.ByteBufferDirectory; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.collect.ImmutableSet; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.store.support.AbstractStore; import java.io.File; import java.io.FileNotFoundException; @@ -43,27 +39,30 @@ import java.io.IOException; import java.io.InterruptedIOException; /** - * @author kimchy (shay.banon) */ -public abstract class FsStore extends AbstractStore { +public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService { - public static final boolean DEFAULT_SUGGEST_USE_COMPOUND_FILE = false; + protected final FsIndexStore indexStore; - public FsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { - super(shardId, indexSettings, indexStore); + public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { + super(shardId, indexSettings); + this.indexStore = (FsIndexStore) indexStore; } - @Override public void fullDelete() throws IOException { - FileSystemUtils.deleteRecursively(fsDirectory().getDirectory()); - // if we are the last ones, delete also the actual index - String[] list = fsDirectory().getDirectory().getParentFile().list(); - if (list == null || list.length == 0) { - FileSystemUtils.deleteRecursively(fsDirectory().getDirectory().getParentFile()); + protected LockFactory buildLockFactory() throws IOException { + String fsLock = componentSettings.get("fs_lock", "native"); + LockFactory lockFactory = NoLockFactory.getNoLockFactory(); + if (fsLock.equals("native")) { + // TODO LUCENE MONITOR: this is not needed in next Lucene version + lockFactory = new NativeFSLockFactory(); + } else if (fsLock.equals("simple")) { + lockFactory = new SimpleFSLockFactory(); } + return lockFactory; } - @Override protected void doRenameFile(String from, String to) throws IOException { - File directory = fsDirectory().getDirectory(); + @Override public void renameFile(Directory dir, String from, String to) throws IOException { + File directory = ((FSDirectory) dir).getDirectory(); File old = new File(directory, from); File nu = new File(directory, to); if (nu.exists()) @@ -91,39 +90,13 @@ public abstract class FsStore extends AbstractStore { } } - public abstract FSDirectory fsDirectory(); - - protected LockFactory buildLockFactory() throws IOException { - String fsLock = componentSettings.get("fs_lock", "native"); - LockFactory lockFactory = new NoLockFactory(); - if (fsLock.equals("native")) { - // TODO LUCENE MONITOR: this is not needed in next Lucene version - lockFactory = new NativeFSLockFactory(); - } else if (fsLock.equals("simple")) { - lockFactory = new SimpleFSLockFactory(); + @Override public void fullDelete(Directory dir) throws IOException { + FSDirectory fsDirectory = (FSDirectory) dir; + FileSystemUtils.deleteRecursively(fsDirectory.getDirectory()); + // if we are the last ones, delete also the actual index + String[] list = fsDirectory.getDirectory().getParentFile().list(); + if (list == null || list.length == 0) { + FileSystemUtils.deleteRecursively(fsDirectory.getDirectory().getParentFile()); } - return lockFactory; - } - - protected Tuple buildSwitchDirectoryIfNeeded(Directory fsDirectory, ByteBufferCache byteBufferCache) { - boolean cache = componentSettings.getAsBoolean("memory.enabled", false); - if (!cache) { - return null; - } - Directory memDir = new ByteBufferDirectory(byteBufferCache); - // see http://lucene.apache.org/java/3_0_1/fileformats.html - String[] primaryExtensions = componentSettings.getAsArray("memory.extensions", new String[]{"", "del", "gen"}); - if (primaryExtensions == null || primaryExtensions.length == 0) { - return null; - } - Boolean forceUseCompound = null; - for (String extension : primaryExtensions) { - if (!("".equals(extension) || "del".equals(extension) || "gen".equals(extension))) { - // caching internal CFS extension, don't use compound file extension - forceUseCompound = false; - } - } - - return new Tuple(new SwitchDirectory(ImmutableSet.copyOf(primaryExtensions), memDir, fsDirectory, true), forceUseCompound); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java new file mode 100644 index 00000000000..336e82d0ca0 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.fs; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.IndexStore; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class MmapFsDirectoryService extends FsDirectoryService { + + @Inject public MmapFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { + super(shardId, indexSettings, indexStore); + } + + @Override public Directory build() throws IOException { + File location = indexStore.shardIndexLocation(shardId); + FileSystemUtils.mkdirs(location); + return new MMapDirectory(location, buildLockFactory()); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java index 5e8864251b9..ef779767500 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java @@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.DirectoryService; /** * @author kimchy (shay.banon) @@ -36,7 +36,7 @@ public class MmapFsIndexStore extends FsIndexStore { super(index, indexSettings, indexService, nodeEnv); } - @Override public Class shardStoreClass() { - return MmapFsStore.class; + @Override public Class shardDirectory() { + return MmapFsDirectoryService.class; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java deleted file mode 100644 index 2b56139ee0b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.store.fs; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.LockFactory; -import org.apache.lucene.store.MMapDirectory; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.lucene.store.SwitchDirectory; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.IndexStore; - -import java.io.File; -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class MmapFsStore extends FsStore { - - private final MMapDirectory fsDirectory; - - private final Directory directory; - - private final boolean suggestUseCompoundFile; - - @Inject public MmapFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException { - super(shardId, indexSettings, indexStore); - LockFactory lockFactory = buildLockFactory(); - File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - this.fsDirectory = new MMapDirectory(location, lockFactory); - - boolean suggestUseCompoundFile; - Tuple switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache); - if (switchDirectory != null) { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - if (switchDirectory.v2() != null) { - suggestUseCompoundFile = switchDirectory.v2(); - } - logger.debug("using [mmap_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions()); - directory = wrapDirectory(switchDirectory.v1()); - } else { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - directory = wrapDirectory(fsDirectory); - logger.debug("using [mmap_fs] store with path [{}]", fsDirectory.getDirectory()); - } - this.suggestUseCompoundFile = suggestUseCompoundFile; - } - - @Override public FSDirectory fsDirectory() { - return fsDirectory; - } - - @Override public Directory directory() { - return directory; - } - - @Override public boolean suggestUseCompoundFile() { - return suggestUseCompoundFile; - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java new file mode 100644 index 00000000000..5d148347c75 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.fs; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.NIOFSDirectory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.IndexStore; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class NioFsDirectoryService extends FsDirectoryService { + + @Inject public NioFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { + super(shardId, indexSettings, indexStore); + } + + @Override public Directory build() throws IOException { + File location = indexStore.shardIndexLocation(shardId); + FileSystemUtils.mkdirs(location); + return new NIOFSDirectory(location, buildLockFactory()); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java index ef5645874ce..c19ae47854d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java @@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.DirectoryService; /** * @author kimchy (shay.banon) @@ -36,7 +36,7 @@ public class NioFsIndexStore extends FsIndexStore { super(index, indexSettings, indexService, nodeEnv); } - @Override public Class shardStoreClass() { - return NioFsStore.class; + @Override public Class shardDirectory() { + return NioFsDirectoryService.class; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java deleted file mode 100644 index 420caa6286d..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.store.fs; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.LockFactory; -import org.apache.lucene.store.NIOFSDirectory; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.lucene.store.SwitchDirectory; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.IndexStore; - -import java.io.File; -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class NioFsStore extends FsStore { - - private final NIOFSDirectory fsDirectory; - - private final Directory directory; - - private final boolean suggestUseCompoundFile; - - @Inject public NioFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException { - super(shardId, indexSettings, indexStore); - LockFactory lockFactory = buildLockFactory(); - File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - this.fsDirectory = new NIOFSDirectory(location, lockFactory); - - boolean suggestUseCompoundFile; - Tuple switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache); - if (switchDirectory != null) { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - if (switchDirectory.v2() != null) { - suggestUseCompoundFile = switchDirectory.v2(); - } - logger.debug("using [nio_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions()); - directory = wrapDirectory(switchDirectory.v1()); - } else { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - directory = wrapDirectory(fsDirectory); - logger.debug("using [nio_fs] store with path [{}]", fsDirectory.getDirectory()); - } - this.suggestUseCompoundFile = suggestUseCompoundFile; - } - - @Override public FSDirectory fsDirectory() { - return fsDirectory; - } - - @Override public Directory directory() { - return directory; - } - - @Override public boolean suggestUseCompoundFile() { - return suggestUseCompoundFile; - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java new file mode 100644 index 00000000000..7fec5e31e2e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.fs; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.IndexStore; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class SimpleFsDirectoryService extends FsDirectoryService { + + @Inject public SimpleFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { + super(shardId, indexSettings, indexStore); + } + + @Override public Directory build() throws IOException { + File location = indexStore.shardIndexLocation(shardId); + FileSystemUtils.mkdirs(location); + return new SimpleFSDirectory(location, buildLockFactory()); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java index 2ede53e08d4..bc95287aedd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java @@ -25,7 +25,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.DirectoryService; /** * @author kimchy (shay.banon) @@ -36,7 +36,7 @@ public class SimpleFsIndexStore extends FsIndexStore { super(index, indexSettings, indexService, nodeEnv); } - @Override public Class shardStoreClass() { - return SimpleFsStore.class; + @Override public Class shardDirectory() { + return SimpleFsDirectoryService.class; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java deleted file mode 100644 index 9a9511314b8..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.store.fs; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.LockFactory; -import org.apache.lucene.store.SimpleFSDirectory; -import org.elasticsearch.cache.memory.ByteBufferCache; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.lucene.store.SwitchDirectory; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.IndexStore; - -import java.io.File; -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class SimpleFsStore extends FsStore { - - private SimpleFSDirectory fsDirectory; - - private final Directory directory; - - private final boolean suggestUseCompoundFile; - - @Inject public SimpleFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException { - super(shardId, indexSettings, indexStore); - LockFactory lockFactory = buildLockFactory(); - File location = ((FsIndexStore) indexStore).shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - this.fsDirectory = new SimpleFSDirectory(location, lockFactory); - - boolean suggestUseCompoundFile; - Tuple switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache); - if (switchDirectory != null) { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - if (switchDirectory.v2() != null) { - suggestUseCompoundFile = switchDirectory.v2(); - } - logger.debug("using [simple_fs] store with path [{}], cache [true] with extensions [{}]", fsDirectory.getDirectory(), switchDirectory.v1().primaryExtensions()); - directory = wrapDirectory(switchDirectory.v1()); - } else { - suggestUseCompoundFile = DEFAULT_SUGGEST_USE_COMPOUND_FILE; - directory = wrapDirectory(fsDirectory); - logger.debug("using [simple_fs] store with path [{}]", fsDirectory.getDirectory()); - } - this.suggestUseCompoundFile = suggestUseCompoundFile; - } - - @Override public FSDirectory fsDirectory() { - return fsDirectory; - } - - @Override public Directory directory() { - return directory; - } - - @Override public boolean suggestUseCompoundFile() { - return suggestUseCompoundFile; - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java similarity index 67% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java index d1a8ef381e0..964527a5cc0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java @@ -27,43 +27,34 @@ import org.elasticsearch.cache.memory.ByteBufferCache; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.store.support.AbstractStore; import java.io.FileNotFoundException; import java.io.IOException; /** - * @author kimchy (shay.banon) */ -public class ByteBufferStore extends AbstractStore { +public class ByteBufferDirectoryService extends AbstractIndexShardComponent implements DirectoryService { - private final CustomByteBufferDirectory bbDirectory; + private final ByteBufferCache byteBufferCache; - private final Directory directory; - - @Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException { - super(shardId, indexSettings, indexStore); - - this.bbDirectory = new CustomByteBufferDirectory(byteBufferCache); - this.directory = wrapDirectory(bbDirectory); - logger.debug("Using [byte_buffer] store"); + @Inject public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) { + super(shardId, indexSettings); + this.byteBufferCache = byteBufferCache; } - @Override public Directory directory() { - return directory; + @Override public Directory build() { + return new CustomByteBufferDirectory(byteBufferCache); } - /** - * Its better to not use the compound format when using the Ram store. - */ - @Override public boolean suggestUseCompoundFile() { - return false; + @Override public void renameFile(Directory dir, String from, String to) throws IOException { + ((CustomByteBufferDirectory) dir).renameTo(from, to); } - @Override protected void doRenameFile(String from, String to) throws IOException { - bbDirectory.renameTo(from, to); + @Override public void fullDelete(Directory dir) { } static class CustomByteBufferDirectory extends ByteBufferDirectory { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java index 505590697f6..c1b9aa9c57e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmStats; @@ -49,8 +49,8 @@ public class ByteBufferIndexStore extends AbstractIndexStore { return false; } - @Override public Class shardStoreClass() { - return ByteBufferStore.class; + @Override public Class shardDirectory() { + return ByteBufferDirectoryService.class; } @Override public ByteSizeValue backingStoreTotalSpace() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java similarity index 65% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java index 21d9e48fad9..5b8dea1d99e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java @@ -25,42 +25,30 @@ import org.apache.lucene.store.RAMFile; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.store.support.AbstractStore; +import org.elasticsearch.index.store.DirectoryService; import java.io.FileNotFoundException; import java.io.IOException; /** - * @author kimchy (Shay Banon) */ -public class RamStore extends AbstractStore { +public class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService { - private final CustomRAMDirectory ramDirectory; - - private final Directory directory; - - @Inject public RamStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException { - super(shardId, indexSettings, indexStore); - this.ramDirectory = new CustomRAMDirectory(); - this.directory = wrapDirectory(ramDirectory); - logger.debug("Using [ram] Store"); + @Inject public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) { + super(shardId, indexSettings); } - @Override public Directory directory() { - return directory; + @Override public Directory build() { + return new CustomRAMDirectory(); } - /** - * Its better to not use the compound format when using the Ram store. - */ - @Override public boolean suggestUseCompoundFile() { - return false; + @Override public void renameFile(Directory dir, String from, String to) throws IOException { + ((CustomRAMDirectory) dir).renameTo(from, to); } - @Override protected void doRenameFile(String from, String to) throws IOException { - ramDirectory.renameTo(from, to); + @Override public void fullDelete(Directory dir) { } static class CustomRAMDirectory extends RAMDirectory { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java index 5eb5e80f8c0..abd9b19034c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java @@ -25,7 +25,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmStats; @@ -43,8 +43,8 @@ public class RamIndexStore extends AbstractIndexStore { return false; } - @Override public Class shardStoreClass() { - return RamStore.class; + @Override public Class shardDirectory() { + return RamDirectoryService.class; } @Override public ByteSizeValue backingStoreTotalSpace() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java deleted file mode 100644 index 5e355ca8321..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.store.support; - -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockFactory; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.Unicode; -import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.lucene.Directories; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.index.store.StoreStats; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.zip.Adler32; -import java.util.zip.Checksum; - -/** - * @author kimchy (shay.banon) - */ -public abstract class AbstractStore extends AbstractIndexShardComponent implements Store { - - static final String CHECKSUMS_PREFIX = "_checksums-"; - - public static final boolean isChecksum(String name) { - return name.startsWith(CHECKSUMS_PREFIX); - } - - protected final IndexStore indexStore; - - private volatile ImmutableMap filesMetadata = ImmutableMap.of(); - - private volatile String[] files = Strings.EMPTY_ARRAY; - - private final Object mutex = new Object(); - - private final boolean sync; - - protected AbstractStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { - super(shardId, indexSettings); - this.indexStore = indexStore; - this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... - } - - protected Directory wrapDirectory(Directory dir) throws IOException { - return new StoreDirectory(dir); - } - - @Override public ImmutableMap list() throws IOException { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (String name : files) { - StoreFileMetaData md = metaData(name); - if (md != null) { - builder.put(md.name(), md); - } - } - return builder.build(); - } - - public StoreFileMetaData metaData(String name) throws IOException { - StoreFileMetaData md = filesMetadata.get(name); - if (md == null) { - return null; - } - // IndexOutput not closed, does not exists - if (md.lastModified() == -1 || md.length() == -1) { - return null; - } - return md; - } - - @Override public void deleteContent() throws IOException { - String[] files = directory().listAll(); - IOException lastException = null; - for (String file : files) { - if (isChecksum(file)) { - ((StoreDirectory) directory()).deleteFileChecksum(file); - } else { - try { - directory().deleteFile(file); - } catch (FileNotFoundException e) { - // ignore - } catch (IOException e) { - lastException = e; - } - } - } - if (lastException != null) { - throw lastException; - } - } - - @Override public void fullDelete() throws IOException { - deleteContent(); - } - - @Override public StoreStats stats() throws IOException { - return new StoreStats(Directories.estimateSize(directory())); - } - - @Override public ByteSizeValue estimateSize() throws IOException { - return new ByteSizeValue(Directories.estimateSize(directory())); - } - - @Override public void renameFile(String from, String to) throws IOException { - doRenameFile(from, to); - synchronized (mutex) { - StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one - StoreFileMetaData toMetaData = new StoreFileMetaData(to, fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum()); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - } - - protected abstract void doRenameFile(String from, String to) throws IOException; - - public static Map readChecksums(Directory dir) throws IOException { - long lastFound = -1; - for (String name : dir.listAll()) { - if (!isChecksum(name)) { - continue; - } - long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length())); - if (current > lastFound) { - lastFound = current; - } - } - if (lastFound == -1) { - return ImmutableMap.of(); - } - IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound); - try { - indexInput.readInt(); // version - return indexInput.readStringStringMap(); - } catch (Exception e) { - // failed to load checksums, ignore and return an empty map - return new HashMap(); - } finally { - indexInput.close(); - } - } - - public void writeChecksums() throws IOException { - writeChecksums((StoreDirectory) directory()); - } - - private void writeChecksums(StoreDirectory dir) throws IOException { - String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis(); - ImmutableMap files = list(); - synchronized (mutex) { - Map checksums = new HashMap(); - for (StoreFileMetaData metaData : files.values()) { - if (metaData.checksum() != null) { - checksums.put(metaData.name(), metaData.checksum()); - } - } - IndexOutput output = dir.createOutput(checksumName, false); - output.writeInt(0); // version - output.writeStringStringMap(checksums); - output.close(); - } - for (StoreFileMetaData metaData : files.values()) { - if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) { - try { - dir.deleteFileChecksum(metaData.name()); - } catch (Exception e) { - // ignore - } - } - } - } - - /** - * Returns true by default. - */ - @Override public boolean suggestUseCompoundFile() { - return true; - } - - @Override public void close() throws IOException { - directory().close(); - } - - @Override public IndexOutput createOutputWithNoChecksum(String name) throws IOException { - return ((StoreDirectory) directory()).createOutput(name, false); - } - - @Override public void writeChecksum(String name, String checksum) throws IOException { - // update the metadata to include the checksum and write a new checksums file - synchronized (mutex) { - StoreFileMetaData metaData = filesMetadata.get(name); - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); - writeChecksums(); - } - } - - @Override public void writeChecksums(Map checksums) throws IOException { - // update the metadata to include the checksum and write a new checksums file - synchronized (mutex) { - for (Map.Entry entry : checksums.entrySet()) { - StoreFileMetaData metaData = filesMetadata.get(entry.getKey()); - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue()); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap(); - } - writeChecksums(); - } - } - - /** - * The idea of the store directory is to cache file level meta data, as well as md5 of it - */ - protected class StoreDirectory extends Directory implements ForceSyncDirectory { - - private final Directory delegate; - - StoreDirectory(Directory delegate) throws IOException { - this.delegate = delegate; - synchronized (mutex) { - Map checksums = readChecksums(delegate); - MapBuilder builder = MapBuilder.newMapBuilder(); - for (String file : delegate.listAll()) { - // BACKWARD CKS SUPPORT - if (file.endsWith(".cks")) { // ignore checksum files here - continue; - } - String checksum = checksums.get(file); - - // BACKWARD CKS SUPPORT - if (checksum == null) { - if (delegate.fileExists(file + ".cks")) { - IndexInput indexInput = delegate.openInput(file + ".cks"); - try { - if (indexInput.length() > 0) { - byte[] checksumBytes = new byte[(int) indexInput.length()]; - indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false); - checksum = Unicode.fromBytes(checksumBytes); - } - } finally { - indexInput.close(); - } - } - } - builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum)); - } - filesMetadata = builder.immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - } - - public Directory delegate() { - return delegate; - } - - @Override public String[] listAll() throws IOException { - return files; - } - - @Override public boolean fileExists(String name) throws IOException { - return filesMetadata.containsKey(name); - } - - @Override public long fileModified(String name) throws IOException { - StoreFileMetaData metaData = filesMetadata.get(name); - if (metaData == null) { - throw new FileNotFoundException(name); - } - // not set yet (IndexOutput not closed) - if (metaData.lastModified() != -1) { - return metaData.lastModified(); - } - return delegate.fileModified(name); - } - - @Override public void touchFile(String name) throws IOException { - delegate.touchFile(name); - synchronized (mutex) { - StoreFileMetaData metaData = filesMetadata.get(name); - if (metaData != null) { - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum()); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); - } - } - } - - public void deleteFileChecksum(String name) throws IOException { - delegate.deleteFile(name); - synchronized (mutex) { - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - } - - @Override public void deleteFile(String name) throws IOException { - // we don't allow to delete the checksums files, only using the deleteChecksum method - if (isChecksum(name)) { - return; - } - delegate.deleteFile(name); - synchronized (mutex) { - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - } - - @Override public long fileLength(String name) throws IOException { - StoreFileMetaData metaData = filesMetadata.get(name); - if (metaData == null) { - throw new FileNotFoundException(name); - } - // not set yet (IndexOutput not closed) - if (metaData.length() != -1) { - return metaData.length(); - } - return delegate.fileLength(name); - } - - @Override public IndexOutput createOutput(String name) throws IOException { - return createOutput(name, true); - } - - public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { - IndexOutput out = delegate.createOutput(name); - synchronized (mutex) { - StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - return new StoreIndexOutput(out, name, computeChecksum); - } - - @Override public IndexInput openInput(String name) throws IOException { - return delegate.openInput(name); - } - - @Override public void close() throws IOException { - delegate.close(); - synchronized (mutex) { - filesMetadata = ImmutableMap.of(); - files = Strings.EMPTY_ARRAY; - } - } - - @Override public Lock makeLock(String name) { - return delegate.makeLock(name); - } - - @Override public IndexInput openInput(String name, int bufferSize) throws IOException { - return delegate.openInput(name, bufferSize); - } - - @Override public void clearLock(String name) throws IOException { - delegate.clearLock(name); - } - - @Override public void setLockFactory(LockFactory lockFactory) throws IOException { - delegate.setLockFactory(lockFactory); - } - - @Override public LockFactory getLockFactory() { - return delegate.getLockFactory(); - } - - @Override public String getLockID() { - return delegate.getLockID(); - } - - @Override public void sync(Collection names) throws IOException { - if (sync) { - delegate.sync(names); - } - for (String name : names) { - // write the checksums file when we sync on the segments file (committed) - if (!name.equals("segments.gen") && name.startsWith("segments")) { - writeChecksums(); - break; - } - } - } - - @Override public void sync(String name) throws IOException { - if (sync) { - delegate.sync(name); - } - // write the checksums file when we sync on the segments file (committed) - if (!name.equals("segments.gen") && name.startsWith("segments")) { - writeChecksums(); - } - } - - @Override public void forceSync(String name) throws IOException { - delegate.sync(name); - } - } - - class StoreIndexOutput extends IndexOutput { - - private final IndexOutput delegate; - - private final String name; - - private final Checksum digest; - - StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) { - this.delegate = delegate; - this.name = name; - if (computeChecksum) { - if ("segments.gen".equals(name)) { - // no need to create checksum for segments.gen since its not snapshot to recovery - this.digest = null; - } else if (name.startsWith("segments")) { - // don't compute checksum for segments files, so pure Lucene can open this directory - // and since we, in any case, always recover the segments files - this.digest = null; - } else { -// this.digest = new CRC32(); - // adler is faster, and we compare on length as well, should be enough to check for difference - // between files - this.digest = new Adler32(); - } - } else { - this.digest = null; - } - } - - @Override public void close() throws IOException { - delegate.close(); - String checksum = null; - if (digest != null) { - checksum = Long.toString(digest.getValue(), Character.MAX_RADIX); - } - synchronized (mutex) { - StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), checksum); - filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap(); - files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - } - } - - @Override public void writeByte(byte b) throws IOException { - delegate.writeByte(b); - if (digest != null) { - digest.update(b); - } - } - - @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { - delegate.writeBytes(b, offset, length); - if (digest != null) { - digest.update(b, offset, length); - } - } - - // don't override it, base class method simple reads from input and writes to this output -// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { -// delegate.copyBytes(input, numBytes); -// } - - @Override public void flush() throws IOException { - delegate.flush(); - } - - @Override public long getFilePointer() { - return delegate.getFilePointer(); - } - - @Override public void seek(long pos) throws IOException { - // seek might be called on files, which means that the checksum is not file checksum - // but a checksum of the bytes written to this stream, which is the same for each - // type of file in lucene - delegate.seek(pos); - } - - @Override public long length() throws IOException { - return delegate.length(); - } - - @Override public void setLength(long length) throws IOException { - delegate.setLength(length); - } - - @Override public void writeStringStringMap(Map map) throws IOException { - delegate.writeStringStringMap(map); - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b26b24c82b7..8bbff0d078a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -42,7 +42,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.index.store.support.AbstractStore; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesLifecycle; @@ -452,7 +452,7 @@ public class RecoveryTarget extends AbstractComponent { for (String existingFile : shard.store().directory().listAll()) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) - if (!request.snapshotFiles().contains(existingFile) && !AbstractStore.isChecksum(existingFile)) { + if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) { try { shard.store().directory().deleteFile(existingFile); } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index ca9ea8cbc10..a25624cf184 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -48,8 +48,8 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.index.store.support.AbstractStore; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -172,13 +172,13 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio FSDirectory directory = FSDirectory.open(indexFile); Map checksums = null; try { - checksums = AbstractStore.readChecksums(directory); + checksums = Store.readChecksums(directory); for (File file : indexFile.listFiles()) { // BACKWARD CKS SUPPORT if (file.getName().endsWith(".cks")) { continue; } - if (AbstractStore.isChecksum(file.getName())) { + if (Store.isChecksum(file.getName())) { continue; } files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName()))); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index 9c4db2add2f..9f1046be1b8 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -39,7 +39,7 @@ import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.ram.RamStore; +import org.elasticsearch.index.store.ram.RamDirectoryService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.fs.FsTranslog; import org.testng.annotations.AfterMethod; @@ -96,11 +96,11 @@ public abstract class AbstractSimpleEngineTests { } protected Store createStore() throws IOException { - return new RamStore(shardId, EMPTY_SETTINGS, null); + return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); } protected Store createStoreReplica() throws IOException { - return new RamStore(shardId, EMPTY_SETTINGS, null); + return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); } protected Translog createTranslog() {