Merge bytebuffer and memory stores into a single memory store options, closes #22.

This commit is contained in:
kimchy 2010-02-17 13:09:22 +02:00
parent 042d71073c
commit 872781536d
17 changed files with 77 additions and 103 deletions

View File

@ -36,7 +36,7 @@ import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.memory.MemoryStore;
import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.translog.memory.MemoryTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.dynamic.DynamicThreadPool;
@ -277,7 +277,8 @@ public class SimpleEngineBenchmark {
Settings settings = EMPTY_SETTINGS;
// Store store = new RamStore(shardId, settings);
Store store = new MemoryStore(shardId, settings);
Store store = new ByteBufferStore(shardId, settings);
// Store store = new HeapStore(shardId, settings);
// Store store = new NioFsStore(shardId, settings);
store.deleteContent();

View File

@ -25,11 +25,11 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.bytebuffer.ByteBufferStore;
import org.elasticsearch.index.store.fs.MmapFsStore;
import org.elasticsearch.index.store.fs.NioFsStore;
import org.elasticsearch.index.store.fs.SimpleFsStore;
import org.elasticsearch.index.store.memory.MemoryStore;
import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.store.memory.HeapStore;
import org.elasticsearch.index.store.ram.RamStore;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
@ -276,23 +276,17 @@ public class SimpleStoreBenchmark {
store = new NioFsStore(shardId, settings, environment, localNodeId);
} else if (type.equalsIgnoreCase("nio-fs")) {
store = new MmapFsStore(shardId, settings, environment, localNodeId);
} else if (type.equalsIgnoreCase("bb")) {
Settings byteBufferSettings = settingsBuilder()
.putAll(settings)
.putBoolean("index.store.bytebuffer.direct", false)
.build();
store = new ByteBufferStore(shardId, byteBufferSettings);
} else if (type.equalsIgnoreCase("bb-direct")) {
} else if (type.equalsIgnoreCase("memory-direct")) {
Settings byteBufferSettings = settingsBuilder()
.putAll(settings)
.putBoolean("index.store.bytebuffer.direct", true)
.build();
store = new ByteBufferStore(shardId, byteBufferSettings);
} else if (type.equalsIgnoreCase("mem")) {
} else if (type.equalsIgnoreCase("memory-heap")) {
Settings memorySettings = settingsBuilder()
.putAll(settings)
.build();
store = new MemoryStore(shardId, memorySettings);
store = new HeapStore(shardId, memorySettings);
} else {
throw new IllegalArgumentException("No type store [" + type + "]");
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.store;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import org.elasticsearch.index.store.bytebuffer.ByteBufferStoreModule;
import org.elasticsearch.index.store.fs.MmapFsStoreModule;
import org.elasticsearch.index.store.fs.NioFsStoreModule;
import org.elasticsearch.index.store.fs.SimpleFsStoreModule;
@ -52,8 +51,6 @@ public class StoreModule extends AbstractModule {
storeModule = RamStoreModule.class;
} else if ("memory".equalsIgnoreCase(storeType)) {
storeModule = MemoryStoreModule.class;
} else if ("bytebuffer".equalsIgnoreCase(storeType)) {
storeModule = ByteBufferStoreModule.class;
} else if ("fs".equalsIgnoreCase(storeType)) {
// nothing to set here ... (we default to fs)
} else if ("simplefs".equalsIgnoreCase(storeType)) {

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
import com.google.inject.AbstractModule;
import org.elasticsearch.index.store.Store;
/**
* @author kimchy (Shay Banon)
*/
public class ByteBufferStoreModule extends AbstractModule {
@Override protected void configure() {
bind(Store.class).to(ByteBufferStore.class).asEagerSingleton();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
@ -197,14 +197,14 @@ public class ByteBufferDirectory extends Directory {
return byteBuffer;
}
ByteBuffer createBuffer() {
private ByteBuffer createBuffer() {
if (isDirect()) {
return ByteBuffer.allocateDirect(bufferSizeInBytes());
}
return ByteBuffer.allocate(bufferSizeInBytes());
}
void closeBuffer(ByteBuffer byteBuffer) {
private void closeBuffer(ByteBuffer byteBuffer) {
if (isDirect()) {
((DirectBuffer) byteBuffer).cleaner().clean();
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import java.nio.ByteBuffer;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexInput;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexOutput;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import com.google.inject.Inject;
import org.elasticsearch.index.settings.IndexSettings;

View File

@ -37,9 +37,9 @@ import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryDirectory extends Directory {
public class HeapDirectory extends Directory {
private final Map<String, MemoryFile> files = newConcurrentMap();
private final Map<String, HeapRamFile> files = newConcurrentMap();
private final Queue<byte[]> cache;
@ -51,11 +51,11 @@ public class MemoryDirectory extends Directory {
private final boolean disableCache;
public MemoryDirectory() {
public HeapDirectory() {
this(new SizeValue(1, SizeUnit.KB), new SizeValue(20, SizeUnit.MB), false);
}
public MemoryDirectory(SizeValue bufferSize, SizeValue cacheSize, boolean warmCache) {
public HeapDirectory(SizeValue bufferSize, SizeValue cacheSize, boolean warmCache) {
disableCache = cacheSize.bytes() == 0;
if (!disableCache && cacheSize.bytes() < bufferSize.bytes()) {
throw new IllegalArgumentException("Cache size [" + cacheSize + "] is smaller than buffer size [" + bufferSize + "]");
@ -94,14 +94,14 @@ public class MemoryDirectory extends Directory {
}
@Override public long fileModified(String name) throws IOException {
MemoryFile file = files.get(name);
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return file.lastModified();
}
@Override public void touchFile(String name) throws IOException {
MemoryFile file = files.get(name);
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
@ -122,33 +122,33 @@ public class MemoryDirectory extends Directory {
}
@Override public void deleteFile(String name) throws IOException {
MemoryFile file = files.remove(name);
HeapRamFile file = files.remove(name);
if (file == null)
throw new FileNotFoundException(name);
file.clean();
}
@Override public long fileLength(String name) throws IOException {
MemoryFile file = files.get(name);
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return file.length();
}
@Override public IndexOutput createOutput(String name) throws IOException {
MemoryFile file = new MemoryFile(this);
MemoryFile existing = files.put(name, file);
HeapRamFile file = new HeapRamFile(this);
HeapRamFile existing = files.put(name, file);
if (existing != null) {
existing.clean();
}
return new MemoryIndexOutput(this, file);
return new HeapIndexOutput(this, file);
}
@Override public IndexInput openInput(String name) throws IOException {
MemoryFile file = files.get(name);
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return new MemoryIndexInput(this, file);
return new HeapIndexInput(this, file);
}
@Override public void close() throws IOException {

View File

@ -26,10 +26,10 @@ import java.io.IOException;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryIndexInput extends IndexInput {
public class HeapIndexInput extends IndexInput {
private final int bufferSize;
private final MemoryFile file;
private final HeapRamFile file;
private long length;
@ -40,7 +40,7 @@ public class MemoryIndexInput extends IndexInput {
private long bufferStart;
private int bufferLength;
public MemoryIndexInput(MemoryDirectory dir, MemoryFile file) throws IOException {
public HeapIndexInput(HeapDirectory dir, HeapRamFile file) throws IOException {
this.bufferSize = dir.bufferSizeInBytes();
this.file = file;

View File

@ -27,10 +27,10 @@ import java.util.ArrayList;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryIndexOutput extends IndexOutput {
public class HeapIndexOutput extends IndexOutput {
private final MemoryDirectory dir;
private final MemoryFile file;
private final HeapDirectory dir;
private final HeapRamFile file;
private ArrayList<byte[]> buffers = new ArrayList<byte[]>();
@ -41,7 +41,7 @@ public class MemoryIndexOutput extends IndexOutput {
private long bufferStart;
private int bufferLength;
public MemoryIndexOutput(MemoryDirectory dir, MemoryFile file) {
public HeapIndexOutput(HeapDirectory dir, HeapRamFile file) {
this.dir = dir;
this.file = file;

View File

@ -22,9 +22,9 @@ package org.elasticsearch.index.store.memory;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryFile {
public class HeapRamFile {
private final MemoryDirectory dir;
private final HeapDirectory dir;
private volatile long lastModified = System.currentTimeMillis();
@ -32,7 +32,7 @@ public class MemoryFile {
private volatile byte[][] buffers;
public MemoryFile(MemoryDirectory dir) {
public HeapRamFile(HeapDirectory dir) {
this.dir = dir;
}

View File

@ -30,7 +30,7 @@ import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryStore extends AbstractStore<MemoryDirectory> {
public class HeapStore extends AbstractStore<HeapDirectory> {
private final SizeValue bufferSize;
@ -38,21 +38,21 @@ public class MemoryStore extends AbstractStore<MemoryDirectory> {
private final boolean warmCache;
private MemoryDirectory directory;
private HeapDirectory directory;
@Inject public MemoryStore(ShardId shardId, @IndexSettings Settings indexSettings) {
@Inject public HeapStore(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
this.bufferSize = componentSettings.getAsSize("bufferSize", new SizeValue(1, SizeUnit.KB));
this.cacheSize = componentSettings.getAsSize("cacheSize", new SizeValue(20, SizeUnit.MB));
this.warmCache = componentSettings.getAsBoolean("warmCache", true);
this.directory = new MemoryDirectory(bufferSize, cacheSize, warmCache);
this.directory = new HeapDirectory(bufferSize, cacheSize, warmCache);
logger.debug("Using [Memory] Store with bufferSize[{}], cacheSize[{}], warmCache[{}]",
new Object[]{directory.bufferSize(), directory.cacheSize(), warmCache});
}
@Override public MemoryDirectory directory() {
@Override public HeapDirectory directory() {
return directory;
}

View File

@ -20,14 +20,29 @@
package org.elasticsearch.index.store.memory;
import com.google.inject.AbstractModule;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (Shay Banon)
*/
public class MemoryStoreModule extends AbstractModule {
private final Settings settings;
public MemoryStoreModule(Settings settings) {
this.settings = settings;
}
@Override protected void configure() {
bind(Store.class).to(MemoryStore.class).asEagerSingleton();
String location = settings.get("index.store.memory.location", "direct");
if ("direct".equalsIgnoreCase(location)) {
bind(Store.class).to(ByteBufferStore.class).asEagerSingleton();
} else if ("heap".equalsIgnoreCase(location)) {
bind(Store.class).to(HeapStore.class).asEagerSingleton();
} else {
throw new ElasticSearchIllegalArgumentException("Memory location [" + location + "] is invalid, can be one of [direct,heap]");
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.store.bytebuffer;
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@ -38,49 +38,49 @@ import static org.hamcrest.Matchers.*;
public class SimpleByteBufferStoreTests {
@Test public void test1BufferNoCache() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(0, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(0, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test1Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test3Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(3, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(3, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test10Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(10, SizeUnit.BYTES), new SizeValue(20, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(10, SizeUnit.BYTES), new SizeValue(20, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test15Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(15, SizeUnit.BYTES), new SizeValue(30, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(15, SizeUnit.BYTES), new SizeValue(30, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test40Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), true, false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void testSimpleLocking() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false, false);
ByteBufferDirectory dir = new ByteBufferDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), true, false);
Lock lock = dir.makeLock("testlock");

View File

@ -35,52 +35,52 @@ import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
*/
public class SimpleMemoryStoreTests {
public class SimpleHeapStoreTests {
@Test public void test1BufferNoCache() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(0, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(0, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test1Buffer() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(1, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test3Buffer() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(3, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(3, SizeUnit.BYTES), new SizeValue(10, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test10Buffer() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(10, SizeUnit.BYTES), new SizeValue(20, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(10, SizeUnit.BYTES), new SizeValue(20, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test15Buffer() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(15, SizeUnit.BYTES), new SizeValue(30, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(15, SizeUnit.BYTES), new SizeValue(30, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test40Buffer() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void testSimpeLocking() throws Exception {
MemoryDirectory dir = new MemoryDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false);
HeapDirectory dir = new HeapDirectory(new SizeValue(40, SizeUnit.BYTES), new SizeValue(80, SizeUnit.BYTES), false);
Lock lock = dir.makeLock("testlock");
@ -98,7 +98,7 @@ public class SimpleMemoryStoreTests {
dir.close();
}
private void insertData(MemoryDirectory dir) throws IOException {
private void insertData(HeapDirectory dir) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
IndexOutput indexOutput = dir.createOutput("value1");
indexOutput.writeBytes(new byte[]{2, 4, 6, 7, 8}, 5);
@ -119,7 +119,7 @@ public class SimpleMemoryStoreTests {
indexOutput.close();
}
private void verifyData(MemoryDirectory dir) throws IOException {
private void verifyData(HeapDirectory dir) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
assertThat(dir.fileExists("value1"), equalTo(true));
assertThat(dir.fileLength("value1"), equalTo(38l));