Memory Store / FS Memory: Create a node level memory store cache and allocator, closes #235.

This commit is contained in:
kimchy 2010-06-26 22:34:30 +03:00
parent 00d2abef40
commit 33d357dbb4
26 changed files with 264 additions and 1013 deletions

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -294,8 +295,7 @@ public class SimpleEngineBenchmark {
Settings settings = EMPTY_SETTINGS; Settings settings = EMPTY_SETTINGS;
// Store store = new RamStore(shardId, settings); // Store store = new RamStore(shardId, settings);
Store store = new ByteBufferStore(shardId, settings); Store store = new ByteBufferStore(shardId, settings, new ByteBufferCache(settings));
// Store store = new HeapStore(shardId, settings);
// Store store = new NioFsStore(shardId, settings); // Store store = new NioFsStore(shardId, settings);
store.deleteContent(); store.deleteContent();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.benchmark.index.store;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -32,7 +33,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.fs.*; import org.elasticsearch.index.store.fs.*;
import org.elasticsearch.index.store.memory.ByteBufferStore; import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.store.memory.HeapStore;
import org.elasticsearch.index.store.ram.RamStore; import org.elasticsearch.index.store.ram.RamStore;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/** /**
* @author kimchy * @author kimchy
@ -264,6 +263,7 @@ public class SimpleStoreBenchmark {
Environment environment = new Environment(); Environment environment = new Environment();
Settings settings = EMPTY_SETTINGS; Settings settings = EMPTY_SETTINGS;
NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment); NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment);
ByteBufferCache byteBufferCache = new ByteBufferCache(settings);
ShardId shardId = new ShardId(new Index("index"), 1); ShardId shardId = new ShardId(new Index("index"), 1);
String type = args.length > 0 ? args[0] : "ram"; String type = args.length > 0 ? args[0] : "ram";
@ -271,22 +271,13 @@ public class SimpleStoreBenchmark {
if (type.equalsIgnoreCase("ram")) { if (type.equalsIgnoreCase("ram")) {
store = new RamStore(shardId, settings); store = new RamStore(shardId, settings);
} else if (type.equalsIgnoreCase("simple-fs")) { } else if (type.equalsIgnoreCase("simple-fs")) {
store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, null, nodeEnvironment)); store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("mmap-fs")) { } else if (type.equalsIgnoreCase("mmap-fs")) {
store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, null, nodeEnvironment)); store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("nio-fs")) { } else if (type.equalsIgnoreCase("nio-fs")) {
store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, null, nodeEnvironment)); store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, null, nodeEnvironment), byteBufferCache);
} else if (type.equalsIgnoreCase("memory-direct")) { } else if (type.equalsIgnoreCase("memory")) {
Settings byteBufferSettings = settingsBuilder() store = new ByteBufferStore(shardId, settings, byteBufferCache);
.put(settings)
.put("index.store.bytebuffer.direct", true)
.build();
store = new ByteBufferStore(shardId, byteBufferSettings);
} else if (type.equalsIgnoreCase("memory-heap")) {
Settings memorySettings = settingsBuilder()
.put(settings)
.build();
store = new HeapStore(shardId, memorySettings);
} else { } else {
throw new IllegalArgumentException("No type store [" + type + "]"); throw new IllegalArgumentException("No type store [" + type + "]");
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cache; package org.elasticsearch.cache;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -28,7 +29,18 @@ import org.elasticsearch.common.settings.Settings;
*/ */
public class NodeCache extends AbstractComponent { public class NodeCache extends AbstractComponent {
@Inject public NodeCache(Settings settings) { private final ByteBufferCache byteBufferCache;
@Inject public NodeCache(Settings settings, ByteBufferCache byteBufferCache) {
super(settings); super(settings);
this.byteBufferCache = byteBufferCache;
}
public void close() {
byteBufferCache.close();
}
public ByteBufferCache byteBuffer() {
return byteBufferCache;
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cache; package org.elasticsearch.cache;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
/** /**
@ -28,5 +29,6 @@ public class NodeCacheModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(NodeCache.class).asEagerSingleton(); bind(NodeCache.class).asEagerSingleton();
bind(ByteBufferCache.class).asEagerSingleton();
} }
} }

View File

@ -0,0 +1,181 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.memory;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class ByteBufferCache extends AbstractComponent {
public static final boolean CLEAN_SUPPORTED;
private static final Method directBufferCleaner;
private static final Method directBufferCleanerClean;
static {
Method directBufferCleanerX = null;
Method directBufferCleanerCleanX = null;
boolean v;
try {
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
directBufferCleanerX.setAccessible(true);
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
directBufferCleanerCleanX.setAccessible(true);
v = true;
} catch (Exception e) {
v = false;
}
CLEAN_SUPPORTED = v;
directBufferCleaner = directBufferCleanerX;
directBufferCleanerClean = directBufferCleanerCleanX;
}
private final Queue<ByteBuffer> cache;
private final boolean disableCache;
private final int bufferSizeInBytes;
private final long cacheSizeInBytes;
private final boolean direct;
private final AtomicLong acquiredBuffers = new AtomicLong();
public ByteBufferCache() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public ByteBufferCache(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct, boolean warmCache) {
this(ImmutableSettings.settingsBuilder().put("buffer_size", bufferSizeInBytes).put("cache_size", cacheSizeInBytes).put("direct", direct).put("warm_cache", warmCache).build());
}
@Inject public ByteBufferCache(Settings settings) {
super(settings);
this.bufferSizeInBytes = (int) componentSettings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
long cacheSizeInBytes = componentSettings.getAsBytesSize("cache_size", new ByteSizeValue(20, ByteSizeUnit.MB)).bytes();
this.direct = componentSettings.getAsBoolean("direct", true);
boolean warmCache = componentSettings.getAsBoolean("warm_cache", false);
disableCache = cacheSizeInBytes == 0;
if (!disableCache && cacheSizeInBytes < bufferSizeInBytes) {
throw new IllegalArgumentException("Cache size [" + cacheSizeInBytes + "] is smaller than buffer size [" + bufferSizeInBytes + "]");
}
int numberOfCacheEntries = (int) (cacheSizeInBytes / bufferSizeInBytes);
this.cache = disableCache ? null : new ArrayBlockingQueue<ByteBuffer>(numberOfCacheEntries);
this.cacheSizeInBytes = disableCache ? 0 : numberOfCacheEntries * bufferSizeInBytes;
if (logger.isDebugEnabled()) {
logger.debug("using bytebuffer cache with buffer_size [{}], cache_size [{}], direct [{}], warm_cache [{}]",
new ByteSizeValue(bufferSizeInBytes), new ByteSizeValue(cacheSizeInBytes), direct, warmCache);
}
}
public ByteSizeValue bufferSize() {
return new ByteSizeValue(bufferSizeInBytes);
}
public ByteSizeValue cacheSize() {
return new ByteSizeValue(cacheSizeInBytes);
}
public ByteSizeValue allocatedMemory() {
return new ByteSizeValue(acquiredBuffers.get() * bufferSizeInBytes);
}
public int bufferSizeInBytes() {
return bufferSizeInBytes;
}
public boolean direct() {
return direct;
}
public void close() {
if (!disableCache) {
ByteBuffer buffer = cache.poll();
while (buffer != null) {
closeBuffer(buffer);
buffer = cache.poll();
}
}
acquiredBuffers.set(0);
}
public ByteBuffer acquireBuffer() {
acquiredBuffers.incrementAndGet();
if (disableCache) {
return createBuffer();
}
ByteBuffer byteBuffer = cache.poll();
if (byteBuffer == null) {
// everything is taken, return a new one
return createBuffer();
}
byteBuffer.position(0);
return byteBuffer;
}
public void releaseBuffer(ByteBuffer byteBuffer) {
acquiredBuffers.decrementAndGet();
if (disableCache) {
closeBuffer(byteBuffer);
return;
}
boolean success = cache.offer(byteBuffer);
if (!success) {
closeBuffer(byteBuffer);
}
}
private ByteBuffer createBuffer() {
if (direct) {
return ByteBuffer.allocateDirect(bufferSizeInBytes);
}
return ByteBuffer.allocate(bufferSizeInBytes);
}
private void closeBuffer(ByteBuffer byteBuffer) {
if (direct && CLEAN_SUPPORTED) {
try {
Object cleaner = directBufferCleaner.invoke(byteBuffer);
directBufferCleanerClean.invoke(cleaner);
} catch (Exception e) {
logger.debug("Failed to clean memory");
// ignore
}
}
}
}

View File

@ -20,16 +20,14 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.memory.ByteBufferDirectory; import org.elasticsearch.index.store.memory.ByteBufferDirectory;
import org.elasticsearch.index.store.memory.HeapDirectory;
import org.elasticsearch.index.store.support.AbstractStore; import org.elasticsearch.index.store.support.AbstractStore;
import java.io.IOException; import java.io.IOException;
@ -65,22 +63,12 @@ public abstract class FsStore<T extends Directory> extends AbstractStore<T> {
return lockFactory; return lockFactory;
} }
protected SwitchDirectory buildSwitchDirectoryIfNeeded(Directory fsDirectory) { protected SwitchDirectory buildSwitchDirectoryIfNeeded(Directory fsDirectory, ByteBufferCache byteBufferCache) {
boolean cache = componentSettings.getAsBoolean("memory.enabled", false); boolean cache = componentSettings.getAsBoolean("memory.enabled", false);
if (!cache) { if (!cache) {
return null; return null;
} }
ByteSizeValue bufferSize = componentSettings.getAsBytesSize("memory.buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)); Directory memDir = new ByteBufferDirectory(byteBufferCache);
ByteSizeValue cacheSize = componentSettings.getAsBytesSize("memory.cache_size", new ByteSizeValue(20, ByteSizeUnit.MB));
boolean direct = componentSettings.getAsBoolean("memory.direct", true);
boolean warmCache = componentSettings.getAsBoolean("memory.warm_cache", true);
Directory memDir;
if (direct) {
memDir = new ByteBufferDirectory((int) bufferSize.bytes(), (int) cacheSize.bytes(), true, warmCache);
} else {
memDir = new HeapDirectory(bufferSize, cacheSize, warmCache);
}
// see http://lucene.apache.org/java/3_0_1/fileformats.html // see http://lucene.apache.org/java/3_0_1/fileformats.html
String[] primaryExtensions = componentSettings.getAsArray("memory.extensions", new String[]{"", "del", "gen"}); String[] primaryExtensions = componentSettings.getAsArray("memory.extensions", new String[]{"", "del", "gen"});
return new SwitchDirectory(ImmutableSet.copyOf(primaryExtensions), memDir, fsDirectory, true); return new SwitchDirectory(ImmutableSet.copyOf(primaryExtensions), memDir, fsDirectory, true);

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -47,7 +48,7 @@ public class MmapFsStore extends FsStore<Directory> {
private final boolean suggestUseCompoundFile; private final boolean suggestUseCompoundFile;
@Inject public MmapFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException { @Inject public MmapFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway // by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
@ -56,7 +57,7 @@ public class MmapFsStore extends FsStore<Directory> {
location.mkdirs(); location.mkdirs();
this.fsDirectory = new CustomMMapDirectory(location, lockFactory, syncToDisk); this.fsDirectory = new CustomMMapDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) { if (switchDirectory != null) {
suggestUseCompoundFile = false; suggestUseCompoundFile = false;
logger.debug("Using [mmap_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions()); logger.debug("Using [mmap_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions());

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -47,7 +48,7 @@ public class NioFsStore extends FsStore<Directory> {
private final boolean suggestUseCompoundFile; private final boolean suggestUseCompoundFile;
@Inject public NioFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException { @Inject public NioFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway // by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
@ -56,7 +57,7 @@ public class NioFsStore extends FsStore<Directory> {
location.mkdirs(); location.mkdirs();
this.fsDirectory = new CustomNioFSDirectory(location, lockFactory, syncToDisk); this.fsDirectory = new CustomNioFSDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) { if (switchDirectory != null) {
suggestUseCompoundFile = false; suggestUseCompoundFile = false;
logger.debug("Using [nio_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions()); logger.debug("Using [nio_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions());

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -47,7 +48,7 @@ public class SimpleFsStore extends FsStore<Directory> {
private final boolean suggestUseCompoundFile; private final boolean suggestUseCompoundFile;
@Inject public SimpleFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException { @Inject public SimpleFsStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway // by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
@ -56,7 +57,7 @@ public class SimpleFsStore extends FsStore<Directory> {
location.mkdirs(); location.mkdirs();
this.fsDirectory = new CustomSimpleFSDirectory(location, lockFactory, syncToDisk); this.fsDirectory = new CustomSimpleFSDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory, byteBufferCache);
if (switchDirectory != null) { if (switchDirectory != null) {
suggestUseCompoundFile = false; suggestUseCompoundFile = false;
logger.debug("Using [simple_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions()); logger.debug("Using [simple_fs] Store with path [{}], cache [true] with extensions [{}]", fsDirectory.getFile(), switchDirectory.primaryExtensions());

View File

@ -23,14 +23,11 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.lucene.store.SingleInstanceLockFactory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -48,94 +45,20 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class ByteBufferDirectory extends Directory { public class ByteBufferDirectory extends Directory {
final ByteBufferCache byteBufferCache;
private final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>(); private final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
private final Queue<ByteBuffer> cache;
private final int bufferSizeInBytes;
private final int cacheSizeInBytes;
private final boolean disableCache;
private final boolean direct;
private boolean useCleanHack = true;
public static final boolean CLEAN_SUPPORTED;
private static final Method directBufferCleaner;
private static final Method directBufferCleanerClean;
static {
Method directBufferCleanerX = null;
Method directBufferCleanerCleanX = null;
boolean v;
try {
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
directBufferCleanerX.setAccessible(true);
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
directBufferCleanerCleanX.setAccessible(true);
v = true;
} catch (Exception e) {
v = false;
}
CLEAN_SUPPORTED = v;
directBufferCleaner = directBufferCleanerX;
directBufferCleanerClean = directBufferCleanerCleanX;
}
/** /**
* Constructs a new byte buffer directory. * Constructs a new byte buffer directory.
*
* @param bufferSizeInBytes The size of a byte buffer
* @param cacheSizeInBytes The size of the cache, set to <code>0</code> to disable caching
* @param direct Should the byte buffers be stored outside the heap (<code>true</code) or in head (<code>false</code>)
* @param warmCache Should the cache be warmed
*/ */
public ByteBufferDirectory(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct, boolean warmCache) { public ByteBufferDirectory(ByteBufferCache byteBufferCache) {
disableCache = cacheSizeInBytes == 0; this.byteBufferCache = byteBufferCache;
if (!disableCache && cacheSizeInBytes < bufferSizeInBytes) {
throw new IllegalArgumentException("Cache size [" + cacheSizeInBytes + "] is smaller than buffer size [" + bufferSizeInBytes + "]");
}
this.bufferSizeInBytes = bufferSizeInBytes;
int numberOfCacheEntries = cacheSizeInBytes / bufferSizeInBytes;
this.cache = disableCache ? null : new ArrayBlockingQueue<ByteBuffer>(numberOfCacheEntries);
this.cacheSizeInBytes = disableCache ? 0 : numberOfCacheEntries * bufferSizeInBytes;
this.direct = direct;
setLockFactory(new SingleInstanceLockFactory()); setLockFactory(new SingleInstanceLockFactory());
if (!disableCache && warmCache) {
for (int i = 0; i < numberOfCacheEntries; i++) {
cache.add(createBuffer());
}
}
}
public void setUseClean(final boolean useCleanHack) {
if (useCleanHack && !CLEAN_SUPPORTED)
throw new IllegalArgumentException("Clean hack not supported on this platform!");
this.useCleanHack = useCleanHack;
}
public boolean getUseClean() {
return useCleanHack;
}
public int cacheSizeInBytes() {
return this.cacheSizeInBytes;
} }
public int bufferSizeInBytes() { public int bufferSizeInBytes() {
return bufferSizeInBytes; return byteBufferCache.bufferSizeInBytes();
}
public boolean isDirect() {
return direct;
}
public boolean isCacheEnabled() {
return !disableCache;
} }
@Override public String[] listAll() throws IOException { @Override public String[] listAll() throws IOException {
@ -209,55 +132,5 @@ public class ByteBufferDirectory extends Directory {
for (String file : files) { for (String file : files) {
deleteFile(file); deleteFile(file);
} }
if (!disableCache) {
ByteBuffer buffer = cache.poll();
while (buffer != null) {
closeBuffer(buffer);
buffer = cache.poll();
}
}
}
void releaseBuffer(ByteBuffer byteBuffer) {
if (disableCache) {
closeBuffer(byteBuffer);
return;
}
boolean success = cache.offer(byteBuffer);
if (!success) {
closeBuffer(byteBuffer);
}
}
ByteBuffer acquireBuffer() {
if (disableCache) {
return createBuffer();
}
ByteBuffer byteBuffer = cache.poll();
if (byteBuffer == null) {
// everything is taken, return a new one
return createBuffer();
}
byteBuffer.position(0);
return byteBuffer;
}
private ByteBuffer createBuffer() {
if (isDirect()) {
return ByteBuffer.allocateDirect(bufferSizeInBytes());
}
return ByteBuffer.allocate(bufferSizeInBytes());
}
private void closeBuffer(ByteBuffer byteBuffer) {
if (useCleanHack && isDirect()) {
try {
Object cleaner = directBufferCleaner.invoke(byteBuffer);
directBufferCleanerClean.invoke(cleaner);
} catch (Exception e) {
e.printStackTrace();
// silently ignore exception
}
}
} }
} }

View File

@ -69,7 +69,7 @@ public class ByteBufferFile {
void clean() { void clean() {
if (buffers != null) { if (buffers != null) {
for (ByteBuffer buffer : buffers) { for (ByteBuffer buffer : buffers) {
dir.releaseBuffer(buffer); dir.byteBufferCache.releaseBuffer(buffer);
} }
buffers = null; buffers = null;
} }

View File

@ -41,7 +41,7 @@ public class ByteBufferIndexInput extends IndexInput {
public ByteBufferIndexInput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException { public ByteBufferIndexInput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException {
this.file = file; this.file = file;
this.bufferSize = dir.bufferSizeInBytes(); this.bufferSize = dir.byteBufferCache.bufferSizeInBytes();
this.length = file.length(); this.length = file.length();
switchCurrentBuffer(true); switchCurrentBuffer(true);
} }

View File

@ -89,10 +89,10 @@ public class ByteBufferIndexOutput extends IndexOutput {
// and flush() has not been called yet // and flush() has not been called yet
setFileLength(); setFileLength();
if (pos < bufferStart || pos >= bufferStart + bufferLength) { if (pos < bufferStart || pos >= bufferStart + bufferLength) {
currentBufferIndex = (int) (pos / dir.bufferSizeInBytes()); currentBufferIndex = (int) (pos / dir.byteBufferCache.bufferSizeInBytes());
switchCurrentBuffer(); switchCurrentBuffer();
} }
currentBuffer.position((int) (pos % dir.bufferSizeInBytes())); currentBuffer.position((int) (pos % dir.byteBufferCache.bufferSizeInBytes()));
} }
@Override public long length() throws IOException { @Override public long length() throws IOException {
@ -101,13 +101,13 @@ public class ByteBufferIndexOutput extends IndexOutput {
private void switchCurrentBuffer() throws IOException { private void switchCurrentBuffer() throws IOException {
if (currentBufferIndex == buffers.size()) { if (currentBufferIndex == buffers.size()) {
currentBuffer = dir.acquireBuffer(); currentBuffer = dir.byteBufferCache.acquireBuffer();
buffers.add(currentBuffer); buffers.add(currentBuffer);
} else { } else {
currentBuffer = buffers.get(currentBufferIndex); currentBuffer = buffers.get(currentBufferIndex);
} }
currentBuffer.position(0); currentBuffer.position(0);
bufferStart = (long) dir.bufferSizeInBytes() * (long) currentBufferIndex; bufferStart = (long) dir.byteBufferCache.bufferSizeInBytes() * (long) currentBufferIndex;
bufferLength = currentBuffer.capacity(); bufferLength = currentBuffer.capacity();
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.store.memory; package org.elasticsearch.index.store.memory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -38,9 +39,10 @@ public class ByteBufferIndexStore extends AbstractIndexStore {
private final boolean direct; private final boolean direct;
@Inject public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) { @Inject public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService,
ByteBufferCache byteBufferCache) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService);
this.direct = componentSettings.getAsBoolean("direct", true); this.direct = byteBufferCache.direct();
} }
@Override public boolean persistent() { @Override public boolean persistent() {

View File

@ -19,10 +19,9 @@
package org.elasticsearch.index.store.memory; package org.elasticsearch.index.store.memory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.AbstractStore; import org.elasticsearch.index.store.support.AbstractStore;
@ -32,25 +31,13 @@ import org.elasticsearch.index.store.support.AbstractStore;
*/ */
public class ByteBufferStore extends AbstractStore<ByteBufferDirectory> { public class ByteBufferStore extends AbstractStore<ByteBufferDirectory> {
private final ByteSizeValue bufferSize;
private final ByteSizeValue cacheSize;
private final boolean direct;
private final boolean warmCache;
private final ByteBufferDirectory directory; private final ByteBufferDirectory directory;
@Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings) { @Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings, ByteBufferCache byteBufferCache) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.bufferSize = componentSettings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)); this.directory = new ByteBufferDirectory(byteBufferCache);
this.cacheSize = componentSettings.getAsBytesSize("cache_size", new ByteSizeValue(20, ByteSizeUnit.MB)); logger.debug("Using [byte_buffer] store");
this.direct = componentSettings.getAsBoolean("direct", true);
this.warmCache = componentSettings.getAsBoolean("warm_cache", true);
this.directory = new ByteBufferDirectory((int) bufferSize.bytes(), (int) cacheSize.bytes(), direct, warmCache);
logger.debug("Using [byte_buffer] store with buffer_size[{}], cache_size[{}], direct[{}], warm_cache[{}]", bufferSize, cacheSize, directory.isDirect(), warmCache);
} }
@Override public ByteBufferDirectory directory() { @Override public ByteBufferDirectory directory() {

View File

@ -1,197 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
/**
* @author kimchy (Shay Banon)
*/
public class HeapDirectory extends Directory {
private final Map<String, HeapRamFile> files = newConcurrentMap();
private final Queue<byte[]> cache;
private final int bufferSizeInBytes;
private final ByteSizeValue bufferSize;
private final ByteSizeValue cacheSize;
private final boolean disableCache;
public HeapDirectory() {
this(new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(20, ByteSizeUnit.MB), false);
}
public HeapDirectory(ByteSizeValue bufferSize, ByteSizeValue cacheSize, boolean warmCache) {
disableCache = cacheSize.bytes() == 0;
if (!disableCache && cacheSize.bytes() < bufferSize.bytes()) {
throw new IllegalArgumentException("Cache size [" + cacheSize + "] is smaller than buffer size [" + bufferSize + "]");
}
this.bufferSize = bufferSize;
this.bufferSizeInBytes = (int) bufferSize.bytes();
int numberOfCacheEntries = (int) (cacheSize.bytes() / bufferSize.bytes());
this.cache = disableCache ? null : new ArrayBlockingQueue<byte[]>(numberOfCacheEntries);
this.cacheSize = disableCache ? new ByteSizeValue(0, ByteSizeUnit.BYTES) : new ByteSizeValue(numberOfCacheEntries * bufferSize.bytes(), ByteSizeUnit.BYTES);
setLockFactory(new SingleInstanceLockFactory());
if (!disableCache && warmCache) {
for (int i = 0; i < numberOfCacheEntries; i++) {
cache.add(createBuffer());
}
}
}
public ByteSizeValue bufferSize() {
return this.bufferSize;
}
public ByteSizeValue cacheSize() {
return this.cacheSize;
}
int bufferSizeInBytes() {
return bufferSizeInBytes;
}
@Override public String[] listAll() throws IOException {
return files.keySet().toArray(new String[0]);
}
@Override public boolean fileExists(String name) throws IOException {
return files.containsKey(name);
}
@Override public long fileModified(String name) throws IOException {
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return file.lastModified();
}
@Override public void touchFile(String name) throws IOException {
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
long ts2, ts1 = System.currentTimeMillis();
do {
try {
Thread.sleep(0, 1);
} catch (InterruptedException ie) {
// In 3.0 we will change this to throw
// InterruptedException instead
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
ts2 = System.currentTimeMillis();
} while (ts1 == ts2);
file.lastModified(ts2);
}
@Override public void deleteFile(String name) throws IOException {
HeapRamFile file = files.remove(name);
if (file == null)
throw new FileNotFoundException(name);
file.clean();
}
@Override public long fileLength(String name) throws IOException {
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return file.length();
}
@Override public IndexOutput createOutput(String name) throws IOException {
HeapRamFile file = new HeapRamFile(this);
HeapRamFile existing = files.put(name, file);
if (existing != null) {
existing.clean();
}
return new HeapIndexOutput(this, file);
}
@Override public IndexInput openInput(String name) throws IOException {
HeapRamFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return new HeapIndexInput(this, file);
}
@Override public void close() throws IOException {
String[] files = listAll();
for (String file : files) {
deleteFile(file);
}
if (!disableCache) {
byte[] buffer = cache.poll();
while (buffer != null) {
closeBuffer(buffer);
buffer = cache.poll();
}
}
}
void releaseBuffer(byte[] buffer) {
if (disableCache) {
closeBuffer(buffer);
return;
}
boolean success = cache.offer(buffer);
if (!success) {
closeBuffer(buffer);
}
}
byte[] acquireBuffer() {
if (disableCache) {
return createBuffer();
}
byte[] buffer = cache.poll();
if (buffer == null) {
// everything is taken, return a new one
return createBuffer();
}
return buffer;
}
byte[] createBuffer() {
return new byte[bufferSizeInBytes];
}
void closeBuffer(byte[] buffer) {
}
}

View File

@ -1,119 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
*/
public class HeapIndexInput extends IndexInput {
private final int bufferSize;
private final HeapRamFile file;
private long length;
private byte[] currentBuffer;
private int currentBufferIndex;
private int bufferPosition;
private long bufferStart;
private int bufferLength;
public HeapIndexInput(HeapDirectory dir, HeapRamFile file) throws IOException {
this.bufferSize = dir.bufferSizeInBytes();
this.file = file;
length = file.length();
if (length / dir.bufferSizeInBytes() >= Integer.MAX_VALUE) {
throw new IOException("Too large RAMFile! " + length);
}
// make sure that we switch to the
// first needed buffer lazily
currentBufferIndex = -1;
currentBuffer = null;
}
@Override public byte readByte() throws IOException {
if (bufferPosition >= bufferLength) {
currentBufferIndex++;
switchCurrentBuffer(true);
}
return currentBuffer[bufferPosition++];
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
while (len > 0) {
if (bufferPosition >= bufferLength) {
currentBufferIndex++;
switchCurrentBuffer(true);
}
int remainInBuffer = bufferLength - bufferPosition;
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
System.arraycopy(currentBuffer, bufferPosition, b, offset, bytesToCopy);
offset += bytesToCopy;
len -= bytesToCopy;
bufferPosition += bytesToCopy;
}
}
@Override public void close() throws IOException {
}
@Override public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
}
@Override public void seek(long pos) throws IOException {
if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + bufferSize) {
currentBufferIndex = (int) (pos / bufferSize);
switchCurrentBuffer(false);
}
bufferPosition = (int) (pos % bufferSize);
}
@Override public long length() {
return length;
}
private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
if (currentBufferIndex >= file.numberOfBuffers()) {
// end of file reached, no more buffers left
if (enforceEOF)
throw new IOException("Read past EOF");
else {
// Force EOF if a read takes place at this position
currentBufferIndex--;
bufferPosition = bufferSize;
}
} else {
currentBuffer = file.buffer(currentBufferIndex);
bufferPosition = 0;
bufferStart = (long) bufferSize * (long) currentBufferIndex;
long buflen = length - bufferStart;
bufferLength = buflen > bufferSize ? bufferSize : (int) buflen;
}
}
}

View File

@ -1,126 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
import java.util.ArrayList;
/**
* @author kimchy (Shay Banon)
*/
public class HeapIndexOutput extends IndexOutput {
private final HeapDirectory dir;
private final HeapRamFile file;
private ArrayList<byte[]> buffers = new ArrayList<byte[]>();
private byte[] currentBuffer;
private int currentBufferIndex;
private int bufferPosition;
private long bufferStart;
private int bufferLength;
public HeapIndexOutput(HeapDirectory dir, HeapRamFile file) {
this.dir = dir;
this.file = file;
// make sure that we switch to the
// first needed buffer lazily
currentBufferIndex = -1;
currentBuffer = null;
}
@Override public void writeByte(byte b) throws IOException {
if (bufferPosition == bufferLength) {
currentBufferIndex++;
switchCurrentBuffer();
}
currentBuffer[bufferPosition++] = b;
}
@Override public void writeBytes(byte[] b, int offset, int len) throws IOException {
while (len > 0) {
if (bufferPosition == bufferLength) {
currentBufferIndex++;
switchCurrentBuffer();
}
int remainInBuffer = currentBuffer.length - bufferPosition;
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
System.arraycopy(b, offset, currentBuffer, bufferPosition, bytesToCopy);
offset += bytesToCopy;
len -= bytesToCopy;
bufferPosition += bytesToCopy;
}
}
@Override public void flush() throws IOException {
file.lastModified(System.currentTimeMillis());
setFileLength();
}
@Override public void close() throws IOException {
flush();
file.buffers(buffers.toArray(new byte[buffers.size()][]));
}
@Override public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
}
@Override public void seek(long pos) throws IOException {
// set the file length in case we seek back
// and flush() has not been called yet
setFileLength();
if (pos < bufferStart || pos >= bufferStart + bufferLength) {
currentBufferIndex = (int) (pos / dir.bufferSizeInBytes());
switchCurrentBuffer();
}
bufferPosition = (int) (pos % dir.bufferSizeInBytes());
}
@Override public long length() throws IOException {
return file.length();
}
private void switchCurrentBuffer() throws IOException {
if (currentBufferIndex == buffers.size()) {
currentBuffer = dir.acquireBuffer();
buffers.add(currentBuffer);
} else {
currentBuffer = buffers.get(currentBufferIndex);
}
bufferPosition = 0;
bufferStart = (long) dir.bufferSizeInBytes() * (long) currentBufferIndex;
bufferLength = currentBuffer.length;
}
private void setFileLength() {
long pointer = bufferStart + bufferPosition;
if (pointer > file.length()) {
file.length(pointer);
}
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats;
/**
* @author kimchy (shay.banon)
*/
public class HeapIndexStore extends AbstractIndexStore {
@Inject public HeapIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) {
super(index, indexSettings, indexService);
}
@Override public boolean persistent() {
return false;
}
@Override public Class<? extends Store> shardStoreClass() {
return HeapStore.class;
}
@Override public ByteSizeValue backingStoreTotalSpace() {
return JvmInfo.jvmInfo().getMem().heapMax();
}
@Override public ByteSizeValue backingStoreFreeSpace() {
return JvmStats.jvmStats().getMem().heapUsed();
}
}

View File

@ -1,75 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
/**
* @author kimchy (Shay Banon)
*/
public class HeapRamFile {
private final HeapDirectory dir;
private volatile long lastModified = System.currentTimeMillis();
private volatile long length;
private volatile byte[][] buffers;
public HeapRamFile(HeapDirectory dir) {
this.dir = dir;
}
long lastModified() {
return lastModified;
}
void lastModified(long lastModified) {
this.lastModified = lastModified;
}
long length() {
return length;
}
void length(long length) {
this.length = length;
}
byte[] buffer(int i) {
return this.buffers[i];
}
int numberOfBuffers() {
return this.buffers.length;
}
void buffers(byte[][] buffers) {
this.buffers = buffers;
}
void clean() {
if (buffers != null) {
for (byte[] buffer : buffers) {
dir.releaseBuffer(buffer);
}
buffers = null;
}
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.AbstractStore;
/**
* @author kimchy (Shay Banon)
*/
public class HeapStore extends AbstractStore<HeapDirectory> {
private final ByteSizeValue bufferSize;
private final ByteSizeValue cacheSize;
private final boolean warmCache;
private HeapDirectory directory;
@Inject public HeapStore(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
this.bufferSize = componentSettings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.cacheSize = componentSettings.getAsBytesSize("cache_size", new ByteSizeValue(20, ByteSizeUnit.MB));
this.warmCache = componentSettings.getAsBoolean("warm_cache", true);
this.directory = new HeapDirectory(bufferSize, cacheSize, warmCache);
logger.debug("Using [heap] Store with buffer_size[{}], cache_size[{}], warm_cache[{}]", directory.bufferSize(), directory.cacheSize(), warmCache);
}
@Override public HeapDirectory directory() {
return directory;
}
/**
* Its better to not use the compound format when using the Ram store.
*/
@Override public boolean suggestUseCompoundFile() {
return false;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.store.memory; package org.elasticsearch.index.store.memory;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
@ -36,13 +35,6 @@ public class MemoryIndexStoreModule extends AbstractModule {
} }
@Override protected void configure() { @Override protected void configure() {
String location = settings.get("index.store.memory.location", "direct");
if ("direct".equalsIgnoreCase(location)) {
bind(IndexStore.class).to(ByteBufferIndexStore.class).asEagerSingleton(); bind(IndexStore.class).to(ByteBufferIndexStore.class).asEagerSingleton();
} else if ("heap".equalsIgnoreCase(location)) {
bind(IndexStore.class).to(HeapIndexStore.class).asEagerSingleton();
} else {
throw new ElasticSearchIllegalArgumentException("Memory location [" + location + "] is invalid, can be one of [direct,heap]");
}
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.node.internal;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.TransportActionModule; import org.elasticsearch.action.TransportActionModule;
import org.elasticsearch.cache.NodeCache;
import org.elasticsearch.cache.NodeCacheModule; import org.elasticsearch.cache.NodeCacheModule;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClientModule; import org.elasticsearch.client.node.NodeClientModule;
@ -256,6 +257,8 @@ public final class InternalNode implements Node {
injector.getInstance(plugin).close(); injector.getInstance(plugin).close();
} }
injector.getInstance(NodeCache.class).close();
injector.getInstance(TimerService.class).close(); injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown(); injector.getInstance(ThreadPool.class).shutdown();
try { try {

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.IOException; import java.io.IOException;
@ -36,49 +37,62 @@ import static org.hamcrest.Matchers.*;
public class SimpleByteBufferStoreTests { public class SimpleByteBufferStoreTests {
@Test public void test1BufferNoCache() throws Exception { @Test public void test1BufferNoCache() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(1, 0, true, false); ByteBufferCache cache = new ByteBufferCache(1, 0, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void test1Buffer() throws Exception { @Test public void test1Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(1, 10, true, false); ByteBufferCache cache = new ByteBufferCache(1, 10, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void test3Buffer() throws Exception { @Test public void test3Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(3, 10, true, false); ByteBufferCache cache = new ByteBufferCache(3, 10, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void test10Buffer() throws Exception { @Test public void test10Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(10, 20, true, false); ByteBufferCache cache = new ByteBufferCache(10, 20, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void test15Buffer() throws Exception { @Test public void test15Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(15, 30, true, false); ByteBufferCache cache = new ByteBufferCache(15, 30, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void test40Buffer() throws Exception { @Test public void test40Buffer() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(40, 80, true, false); ByteBufferCache cache = new ByteBufferCache(40, 80, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir); insertData(dir);
verifyData(dir); verifyData(dir);
dir.close(); dir.close();
cache.close();
} }
@Test public void testSimpleLocking() throws Exception { @Test public void testSimpleLocking() throws Exception {
ByteBufferDirectory dir = new ByteBufferDirectory(40, 80, true, false); ByteBufferCache cache = new ByteBufferCache(40, 80, true, false);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
Lock lock = dir.makeLock("testlock"); Lock lock = dir.makeLock("testlock");
@ -94,6 +108,7 @@ public class SimpleByteBufferStoreTests {
lock.release(); lock.release();
assertThat(lock.isLocked(), equalTo(false)); assertThat(lock.isLocked(), equalTo(false));
dir.close(); dir.close();
cache.close();
} }
private void insertData(ByteBufferDirectory dir) throws IOException { private void insertData(ByteBufferDirectory dir) throws IOException {

View File

@ -1,160 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
*/
public class SimpleHeapStoreTests {
@Test public void test1BufferNoCache() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(1, ByteSizeUnit.BYTES), new ByteSizeValue(0, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test1Buffer() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(1, ByteSizeUnit.BYTES), new ByteSizeValue(10, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test3Buffer() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(3, ByteSizeUnit.BYTES), new ByteSizeValue(10, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test10Buffer() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(10, ByteSizeUnit.BYTES), new ByteSizeValue(20, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test15Buffer() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(15, ByteSizeUnit.BYTES), new ByteSizeValue(30, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void test40Buffer() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(40, ByteSizeUnit.BYTES), new ByteSizeValue(80, ByteSizeUnit.BYTES), false);
insertData(dir);
verifyData(dir);
dir.close();
}
@Test public void testSimpeLocking() throws Exception {
HeapDirectory dir = new HeapDirectory(new ByteSizeValue(40, ByteSizeUnit.BYTES), new ByteSizeValue(80, ByteSizeUnit.BYTES), false);
Lock lock = dir.makeLock("testlock");
assertThat(lock.isLocked(), equalTo(false));
assertThat(lock.obtain(200), equalTo(true));
assertThat(lock.isLocked(), equalTo(true));
try {
assertThat(lock.obtain(200), equalTo(false));
assertThat("lock should be thrown", false, equalTo(true));
} catch (LockObtainFailedException e) {
// all is well
}
lock.release();
assertThat(lock.isLocked(), equalTo(false));
dir.close();
}
private void insertData(HeapDirectory dir) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
IndexOutput indexOutput = dir.createOutput("value1");
indexOutput.writeBytes(new byte[]{2, 4, 6, 7, 8}, 5);
indexOutput.writeInt(-1);
indexOutput.writeLong(10);
indexOutput.writeInt(0);
indexOutput.writeInt(0);
indexOutput.writeBytes(test, 8);
indexOutput.writeBytes(test, 5);
indexOutput.seek(0);
indexOutput.writeByte((byte) 8);
if (dir.bufferSizeInBytes() > 4) {
indexOutput.seek(2);
indexOutput.writeBytes(new byte[]{1, 2}, 2);
}
indexOutput.close();
}
private void verifyData(HeapDirectory dir) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
assertThat(dir.fileExists("value1"), equalTo(true));
assertThat(dir.fileLength("value1"), equalTo(38l));
IndexInput indexInput = dir.openInput("value1");
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 8));
assertThat(indexInput.readInt(), equalTo(-1));
assertThat(indexInput.readLong(), equalTo((long) 10));
assertThat(indexInput.readInt(), equalTo(0));
assertThat(indexInput.readInt(), equalTo(0));
indexInput.readBytes(test, 0, 8);
assertThat(test[0], equalTo((byte) 1));
assertThat(test[7], equalTo((byte) 8));
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 1));
assertThat(test[4], equalTo((byte) 5));
indexInput.seek(28);
assertThat(indexInput.readByte(), equalTo((byte) 4));
indexInput.seek(30);
assertThat(indexInput.readByte(), equalTo((byte) 6));
indexInput.seek(0);
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 8));
indexInput.close();
indexInput = dir.openInput("value1");
// iterate over all the data
for (int i = 0; i < 38; i++) {
indexInput.readByte();
}
indexInput.close();
}
}

View File

@ -31,7 +31,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class SimpleNodesInfoTests extends AbstractNodesTests { public class SimpleNodesInfoTests extends AbstractNodesTests {