Remove the in memory buffer Lucene store/directory

closes #4994
This commit is contained in:
Shay Banon 2014-02-03 09:52:13 -05:00
parent 56d3e98fff
commit d591972c18
21 changed files with 14 additions and 1544 deletions

View File

@ -1,97 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
/**
* A byte buffer allocator simple allocates byte buffers, and handles releasing
* them. Implementation can include special direct buffer cleaning when releasing
* a buffer, as well as caching of byte buffers.
* <p/>
* <p>There are two types of buffers that can be allocated, small and big. This
* comes in handy when knowing in advance (more or less) the size of the buffers
* needed (large files or small), as well as in caching implementations.
*/
public interface ByteBufferAllocator {
/**
* Helper class to allocator implementations allowing to clean direct buffers.
*/
public static class Cleaner {
public static final boolean CLEAN_SUPPORTED;
private static final Method directBufferCleaner;
private static final Method directBufferCleanerClean;
static {
Method directBufferCleanerX = null;
Method directBufferCleanerCleanX = null;
boolean v;
try {
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
directBufferCleanerX.setAccessible(true);
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
directBufferCleanerCleanX.setAccessible(true);
v = true;
} catch (Exception e) {
v = false;
}
CLEAN_SUPPORTED = v;
directBufferCleaner = directBufferCleanerX;
directBufferCleanerClean = directBufferCleanerCleanX;
}
public static void clean(ByteBuffer buffer) {
if (CLEAN_SUPPORTED && buffer.isDirect()) {
try {
Object cleaner = directBufferCleaner.invoke(buffer);
directBufferCleanerClean.invoke(cleaner);
} catch (Exception e) {
// silently ignore exception
}
}
}
}
public static enum Type {
SMALL,
LARGE
}
/**
* The size (in bytes) that is allocated for the provided type.
*/
int sizeInBytes(Type type);
/**
* Allocate a byte buffer for the specific type.
*/
ByteBuffer allocate(Type type) throws IOException;
/**
* Release the buffer.
*/
void release(ByteBuffer buffer);
/**
* Close the allocator, releasing any cached buffers for example.
*/
void close();
}

View File

@ -1,182 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.store.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* A memory based directory that uses {@link java.nio.ByteBuffer} in order to store the directory content.
* <p/>
* <p>The benefit of using {@link java.nio.ByteBuffer} is the fact that it can be stored in "native" memory
* outside of the JVM heap, thus not incurring the GC overhead of large in memory index.
* <p/>
* <p>Each "file" is segmented into one or more byte buffers.
* <p/>
* <p>If constructed with {@link ByteBufferAllocator}, it allows to control the allocation and release of
* byte buffer. For example, custom implementations can include caching of byte buffers.
*/
public class ByteBufferDirectory extends BaseDirectory {
protected final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
private final ByteBufferAllocator allocator;
private final boolean internalAllocator;
final AtomicLong sizeInBytes = new AtomicLong();
/**
* Constructs a new directory using {@link PlainByteBufferAllocator}.
*/
public ByteBufferDirectory() {
this.allocator = new PlainByteBufferAllocator(false, 1024, 1024 * 10);
this.internalAllocator = true;
try {
setLockFactory(new SingleInstanceLockFactory());
} catch (IOException e) {
// will not happen
}
}
/**
* Constructs a new byte buffer directory with a custom allocator.
*/
public ByteBufferDirectory(ByteBufferAllocator allocator) {
this.allocator = allocator;
this.internalAllocator = false;
try {
setLockFactory(new SingleInstanceLockFactory());
} catch (IOException e) {
// will not happen
}
}
/**
* Returns the size in bytes of the directory, chunk by buffer size.
*/
public long sizeInBytes() {
return sizeInBytes.get();
}
public void sync(Collection<String> names) throws IOException {
// nothing to do here
}
@Override
public String[] listAll() throws IOException {
return files.keySet().toArray(new String[0]);
}
@Override
public boolean fileExists(String name) throws IOException {
return files.containsKey(name);
}
@Override
public void deleteFile(String name) throws IOException {
ByteBufferFile file = files.remove(name);
if (file == null)
throw new FileNotFoundException(name);
sizeInBytes.addAndGet(-file.sizeInBytes());
file.delete();
}
@Override
public long fileLength(String name) throws IOException {
ByteBufferFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return file.getLength();
}
private final static ImmutableSet<String> SMALL_FILES_SUFFIXES = ImmutableSet.of(
"del", // 1 bit per doc
"cfe", // compound file metadata
"si", // segment info
"fnm" // field info (metadata like omit norms etc)
);
private static boolean isSmallFile(String fileName) {
if (fileName.startsWith("segments")) {
return true;
}
if (fileName.lastIndexOf('.') > 0) {
String suffix = fileName.substring(fileName.lastIndexOf('.') + 1);
return SMALL_FILES_SUFFIXES.contains(suffix);
}
return false;
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ByteBufferAllocator.Type allocatorType = ByteBufferAllocator.Type.LARGE;
if (isSmallFile(name)) {
allocatorType = ByteBufferAllocator.Type.SMALL;
}
ByteBufferFileOutput file = new ByteBufferFileOutput(this, allocator.sizeInBytes(allocatorType));
ByteBufferFile existing = files.put(name, file);
if (existing != null) {
sizeInBytes.addAndGet(-existing.sizeInBytes());
existing.delete();
}
return new ByteBufferIndexOutput(this, name, allocator, allocatorType, file);
}
void closeOutput(String name, ByteBufferFileOutput file) {
// we replace the output file with a read only file, with no sync
files.put(name, new ByteBufferFile(file));
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ByteBufferFile file = files.get(name);
if (file == null)
throw new FileNotFoundException(name);
return new ByteBufferIndexInput(name, file);
}
@Override
public void close() throws IOException {
String[] files = listAll();
for (String file : files) {
deleteFile(file);
}
if (internalAllocator) {
allocator.close();
}
}
@Override
public String toString() {
return "byte_buffer";
}
void releaseBuffer(ByteBuffer byteBuffer) {
allocator.release(byteBuffer);
}
}

View File

@ -1,102 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class ByteBufferFile {
final ByteBufferDirectory dir;
final int bufferSize;
final List<ByteBuffer> buffers;
long length;
volatile long lastModified = System.currentTimeMillis();
final AtomicInteger refCount;
long sizeInBytes;
public ByteBufferFile(ByteBufferDirectory dir, int bufferSize) {
this.dir = dir;
this.bufferSize = bufferSize;
this.buffers = new ArrayList<ByteBuffer>();
this.refCount = new AtomicInteger(1);
}
ByteBufferFile(ByteBufferFile file) {
this.dir = file.dir;
this.bufferSize = file.bufferSize;
this.buffers = file.buffers;
this.length = file.length;
this.lastModified = file.lastModified;
this.refCount = file.refCount;
this.sizeInBytes = file.sizeInBytes;
}
public long getLength() {
return length;
}
public long getLastModified() {
return lastModified;
}
void setLastModified(long lastModified) {
this.lastModified = lastModified;
}
long sizeInBytes() {
return sizeInBytes;
}
ByteBuffer getBuffer(int index) {
return buffers.get(index);
}
int numBuffers() {
return buffers.size();
}
void delete() {
decRef();
}
void incRef() {
refCount.incrementAndGet();
}
void decRef() {
if (refCount.decrementAndGet() == 0) {
length = 0;
for (ByteBuffer buffer : buffers) {
dir.releaseBuffer(buffer);
}
buffers.clear();
sizeInBytes = 0;
}
}
}

View File

@ -1,65 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.nio.ByteBuffer;
/**
*/
public class ByteBufferFileOutput extends ByteBufferFile {
public ByteBufferFileOutput(ByteBufferDirectory dir, int bufferSize) {
super(dir, bufferSize);
}
@Override
public synchronized long getLength() {
return super.getLength();
}
@Override
public synchronized long getLastModified() {
return super.getLastModified();
}
synchronized void setLength(long length) {
this.length = length;
}
synchronized final void addBuffer(ByteBuffer buffer) {
buffers.add(buffer);
sizeInBytes += buffer.remaining();
dir.sizeInBytes.addAndGet(buffer.remaining());
}
@Override
synchronized ByteBuffer getBuffer(int index) {
return super.getBuffer(index);
}
@Override
synchronized int numBuffers() {
return super.numBuffers();
}
@Override
synchronized long sizeInBytes() {
return super.sizeInBytes();
}
}

View File

@ -1,198 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.IndexInput;
import java.io.EOFException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
/**
*/
public class ByteBufferIndexInput extends IndexInput {
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();
private final ByteBufferFile file;
private final long length;
private ByteBuffer currentBuffer;
private int currentBufferIndex;
private long bufferStart;
private final int BUFFER_SIZE;
private volatile boolean closed = false;
public ByteBufferIndexInput(String name, ByteBufferFile file) throws IOException {
super("BBIndexInput(name=" + name + ")");
this.file = file;
this.file.incRef();
this.length = file.getLength();
this.BUFFER_SIZE = file.bufferSize;
// make sure that we switch to the
// first needed buffer lazily
currentBufferIndex = -1;
currentBuffer = EMPTY_BUFFER;
}
@Override
public void close() {
// we protected from double closing the index input since
// some tests do that...
if (closed) {
return;
}
closed = true;
file.decRef();
}
@Override
public long length() {
return length;
}
@Override
public short readShort() throws IOException {
try {
currentBuffer.mark();
return currentBuffer.getShort();
} catch (BufferUnderflowException e) {
currentBuffer.reset();
return super.readShort();
}
}
@Override
public int readInt() throws IOException {
try {
currentBuffer.mark();
return currentBuffer.getInt();
} catch (BufferUnderflowException e) {
currentBuffer.reset();
return super.readInt();
}
}
@Override
public long readLong() throws IOException {
try {
currentBuffer.mark();
return currentBuffer.getLong();
} catch (BufferUnderflowException e) {
currentBuffer.reset();
return super.readLong();
}
}
@Override
public byte readByte() throws IOException {
if (!currentBuffer.hasRemaining()) {
currentBufferIndex++;
switchCurrentBuffer(true);
}
return currentBuffer.get();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
while (len > 0) {
if (!currentBuffer.hasRemaining()) {
currentBufferIndex++;
switchCurrentBuffer(true);
}
int remainInBuffer = currentBuffer.remaining();
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
currentBuffer.get(b, offset, bytesToCopy);
offset += bytesToCopy;
len -= bytesToCopy;
}
}
@Override
public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
}
@Override
public void seek(long pos) throws IOException {
if (currentBuffer == EMPTY_BUFFER || pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) {
currentBufferIndex = (int) (pos / BUFFER_SIZE);
switchCurrentBuffer(false);
}
try {
currentBuffer.position((int) (pos % BUFFER_SIZE));
// Grrr, need to wrap in IllegalArgumentException since tests (if not other places)
// expect an IOException...
} catch (IllegalArgumentException e) {
IOException ioException = new IOException("seeking past position");
ioException.initCause(e);
throw ioException;
}
}
private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
if (currentBufferIndex >= file.numBuffers()) {
// end of file reached, no more buffers left
if (enforceEOF) {
throw new EOFException("Read past EOF (resource: " + this + ")");
} else {
// Force EOF if a read takes place at this position
currentBufferIndex--;
currentBuffer.position(currentBuffer.limit());
}
} else {
ByteBuffer buffer = file.getBuffer(currentBufferIndex);
// we must duplicate (and make it read only while we are at it) since we need position and such to be independent
currentBuffer = buffer.asReadOnlyBuffer();
currentBuffer.position(0);
bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
// if we are at the tip, limit the current buffer to only whats available to read
long buflen = length - bufferStart;
if (buflen < BUFFER_SIZE) {
currentBuffer.limit((int) buflen);
}
// we need to enforce EOF here as well...
if (!currentBuffer.hasRemaining()) {
if (enforceEOF) {
throw new EOFException("Read past EOF (resource: " + this + ")");
} else {
// Force EOF if a read takes place at this position
currentBufferIndex--;
currentBuffer.position(currentBuffer.limit());
}
}
}
}
@Override
public IndexInput clone() {
ByteBufferIndexInput cloned = (ByteBufferIndexInput) super.clone();
cloned.file.incRef(); // inc ref on cloned one
if (currentBuffer != EMPTY_BUFFER) {
cloned.currentBuffer = currentBuffer.asReadOnlyBuffer();
cloned.currentBuffer.position(currentBuffer.position());
}
return cloned;
}
}

View File

@ -1,132 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
*/
public class ByteBufferIndexOutput extends IndexOutput {
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();
private final ByteBufferDirectory dir;
private final String name;
private final ByteBufferAllocator allocator;
private final ByteBufferAllocator.Type allocatorType;
private final int BUFFER_SIZE;
private final ByteBufferFileOutput file;
private ByteBuffer currentBuffer;
private int currentBufferIndex;
private long bufferStart;
public ByteBufferIndexOutput(ByteBufferDirectory dir, String name, ByteBufferAllocator allocator, ByteBufferAllocator.Type allocatorType, ByteBufferFileOutput file) throws IOException {
this.dir = dir;
this.name = name;
this.allocator = allocator;
this.allocatorType = allocatorType;
this.BUFFER_SIZE = file.bufferSize;
this.file = file;
currentBufferIndex = -1;
currentBuffer = EMPTY_BUFFER;
}
@Override
public void close() throws IOException {
flush();
dir.closeOutput(name, file);
}
@Override
public void seek(long pos) throws IOException {
// set the file length in case we seek back
// and flush() has not been called yet
setFileLength();
if (pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) {
currentBufferIndex = (int) (pos / BUFFER_SIZE);
switchCurrentBuffer();
}
currentBuffer.position((int) (pos % BUFFER_SIZE));
}
@Override
public long length() {
return file.getLength();
}
@Override
public void writeByte(byte b) throws IOException {
if (!currentBuffer.hasRemaining()) {
currentBufferIndex++;
switchCurrentBuffer();
}
currentBuffer.put(b);
}
@Override
public void writeBytes(byte[] b, int offset, int len) throws IOException {
while (len > 0) {
if (!currentBuffer.hasRemaining()) {
currentBufferIndex++;
switchCurrentBuffer();
}
int remainInBuffer = currentBuffer.remaining();
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
currentBuffer.put(b, offset, bytesToCopy);
offset += bytesToCopy;
len -= bytesToCopy;
}
}
private void switchCurrentBuffer() throws IOException {
if (currentBufferIndex == file.numBuffers()) {
currentBuffer = allocator.allocate(allocatorType);
file.addBuffer(currentBuffer);
} else {
currentBuffer = file.getBuffer(currentBufferIndex);
}
currentBuffer.position(0);
bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
}
private void setFileLength() {
long pointer = bufferStart + currentBuffer.position();
if (pointer > file.getLength()) {
file.setLength(pointer);
}
}
@Override
public void flush() throws IOException {
file.setLastModified(System.currentTimeMillis());
setFileLength();
}
@Override
public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
}
}

View File

@ -1,82 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* The caching byte buffer allocator allows to define a global size for both the small and large buffers
* allocated. Those will be reused when possible.
*/
public class CachingByteBufferAllocator extends PlainByteBufferAllocator {
private final BlockingQueue<ByteBuffer> smallCache;
private final BlockingQueue<ByteBuffer> largeCache;
/**
* @param direct If set to true, will allocate direct buffers (off heap).
* @param smallBufferSizeInBytes The size (in bytes) of the small buffer allocation.
* @param largeBufferSizeInBytes The size (in bytes) of the large buffer allocation.
* @param smallCacheSizeInBytes The size of the small cache buffer in bytes.
* @param largeCacheSizeInBytes The size of the large cache buffer in bytes.
*/
public CachingByteBufferAllocator(boolean direct, int smallBufferSizeInBytes, int largeBufferSizeInBytes,
int smallCacheSizeInBytes, int largeCacheSizeInBytes) {
super(direct, smallBufferSizeInBytes, largeBufferSizeInBytes);
this.smallCache = new LinkedBlockingQueue<ByteBuffer>(smallCacheSizeInBytes / smallBufferSizeInBytes);
this.largeCache = new LinkedBlockingQueue<ByteBuffer>(largeCacheSizeInBytes / largeBufferSizeInBytes);
}
public ByteBuffer allocate(Type type) throws IOException {
ByteBuffer buffer = type == Type.SMALL ? smallCache.poll() : largeCache.poll();
if (buffer == null) {
buffer = super.allocate(type);
}
return buffer;
}
public void release(ByteBuffer buffer) {
if (buffer.capacity() == smallBufferSizeInBytes) {
boolean success = smallCache.offer(buffer);
if (!success) {
super.release(buffer);
}
} else if (buffer.capacity() == largeBufferSizeInBytes) {
boolean success = largeCache.offer(buffer);
if (!success) {
super.release(buffer);
}
}
// otherwise, just ignore it? not our allocation...
}
public void close() {
for (ByteBuffer buffer : smallCache) {
super.release(buffer);
}
smallCache.clear();
for (ByteBuffer buffer : largeCache) {
super.release(buffer);
}
largeCache.clear();
}
}

View File

@ -1,67 +0,0 @@
package org.apache.lucene.store.bytebuffer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A simple byte buffer allocator that does not caching. The direct flag
* allows to control if the byte buffer will be allocated off heap or not.
*/
public class PlainByteBufferAllocator implements ByteBufferAllocator {
protected final boolean direct;
protected final int smallBufferSizeInBytes;
protected final int largeBufferSizeInBytes;
/**
* Constructs a new plain byte buffer allocator that does no caching.
*
* @param direct If set to true, will allocate direct buffers (off heap).
* @param smallBufferSizeInBytes The size (in bytes) of the small buffer allocation.
* @param largeBufferSizeInBytes The size (in bytes) of the large buffer allocation.
*/
public PlainByteBufferAllocator(boolean direct, int smallBufferSizeInBytes, int largeBufferSizeInBytes) {
this.direct = direct;
this.smallBufferSizeInBytes = smallBufferSizeInBytes;
this.largeBufferSizeInBytes = largeBufferSizeInBytes;
}
public int sizeInBytes(Type type) {
return type == Type.SMALL ? smallBufferSizeInBytes : largeBufferSizeInBytes;
}
public ByteBuffer allocate(Type type) throws IOException {
int sizeToAllocate = type == Type.SMALL ? smallBufferSizeInBytes : largeBufferSizeInBytes;
if (direct) {
return ByteBuffer.allocateDirect(sizeToAllocate);
}
return ByteBuffer.allocate(sizeToAllocate);
}
public void release(ByteBuffer buffer) {
Cleaner.clean(buffer);
}
public void close() {
// nothing to do here...
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class NodeCache extends AbstractComponent implements ClusterStateListener {
private final ClusterService clusterService;
private final ByteBufferCache byteBufferCache;
@Inject
public NodeCache(Settings settings, ByteBufferCache byteBufferCache, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.byteBufferCache = byteBufferCache;
clusterService.add(this);
}
public void close() {
clusterService.remove(this);
byteBufferCache.close();
}
public ByteBufferCache byteBuffer() {
return byteBufferCache;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class NodeCacheModule extends AbstractModule {
private final Settings settings;
public NodeCacheModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(NodeCache.class).asEagerSingleton();
bind(ByteBufferCache.class).asEagerSingleton();
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.memory;
import org.apache.lucene.store.bytebuffer.ByteBufferAllocator;
import org.apache.lucene.store.bytebuffer.CachingByteBufferAllocator;
import org.apache.lucene.store.bytebuffer.PlainByteBufferAllocator;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
*
*/
public class ByteBufferCache extends AbstractComponent implements ByteBufferAllocator {
private final boolean direct;
private final ByteSizeValue smallBufferSize;
private final ByteSizeValue largeBufferSize;
private final ByteSizeValue smallCacheSize;
private final ByteSizeValue largeCacheSize;
private final ByteBufferAllocator allocator;
public ByteBufferCache() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
// really, for testing...
public ByteBufferCache(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct) {
this(ImmutableSettings.settingsBuilder()
.put("cache.memory.small_buffer_size", bufferSizeInBytes)
.put("cache.memory.small_cache_size", cacheSizeInBytes)
.put("cache.memory.large_buffer_size", bufferSizeInBytes)
.put("cache.memory.large_cache_size", cacheSizeInBytes)
.put("cache.memory.direct", direct).build());
}
@Inject
public ByteBufferCache(Settings settings) {
super(settings);
this.direct = componentSettings.getAsBoolean("direct", true);
this.smallBufferSize = componentSettings.getAsBytesSize("small_buffer_size", new ByteSizeValue(1, ByteSizeUnit.KB));
this.largeBufferSize = componentSettings.getAsBytesSize("large_buffer_size", new ByteSizeValue(1, ByteSizeUnit.MB));
this.smallCacheSize = componentSettings.getAsBytesSize("small_cache_size", new ByteSizeValue(10, ByteSizeUnit.MB));
this.largeCacheSize = componentSettings.getAsBytesSize("large_cache_size", new ByteSizeValue(500, ByteSizeUnit.MB));
if (smallCacheSize.bytes() == 0 || largeCacheSize.bytes() == 0) {
this.allocator = new PlainByteBufferAllocator(direct, (int) smallBufferSize.bytes(), (int) largeBufferSize.bytes());
} else {
this.allocator = new CachingByteBufferAllocator(direct, (int) smallBufferSize.bytes(), (int) largeBufferSize.bytes(), (int) smallCacheSize.bytes(), (int) largeCacheSize.bytes());
}
if (logger.isDebugEnabled()) {
logger.debug("using bytebuffer cache with small_buffer_size [{}], large_buffer_size [{}], small_cache_size [{}], large_cache_size [{}], direct [{}]",
smallBufferSize, largeBufferSize, smallCacheSize, largeCacheSize, direct);
}
}
public boolean direct() {
return this.direct;
}
public void close() {
allocator.close();
}
@Override
public int sizeInBytes(Type type) {
return allocator.sizeInBytes(type);
}
@Override
public ByteBuffer allocate(Type type) throws IOException {
return allocator.allocate(type);
}
@Override
public void release(ByteBuffer buffer) {
allocator.release(buffer);
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.store.fs.MmapFsIndexStoreModule; import org.elasticsearch.index.store.fs.MmapFsIndexStoreModule;
import org.elasticsearch.index.store.fs.NioFsIndexStoreModule; import org.elasticsearch.index.store.fs.NioFsIndexStoreModule;
import org.elasticsearch.index.store.fs.SimpleFsIndexStoreModule; import org.elasticsearch.index.store.fs.SimpleFsIndexStoreModule;
import org.elasticsearch.index.store.memory.MemoryIndexStoreModule;
import org.elasticsearch.index.store.ram.RamIndexStoreModule; import org.elasticsearch.index.store.ram.RamIndexStoreModule;
/** /**
@ -58,7 +57,7 @@ public class IndexStoreModule extends AbstractModule implements SpawnModules {
if ("ram".equalsIgnoreCase(storeType)) { if ("ram".equalsIgnoreCase(storeType)) {
indexStoreModule = RamIndexStoreModule.class; indexStoreModule = RamIndexStoreModule.class;
} else if ("memory".equalsIgnoreCase(storeType)) { } else if ("memory".equalsIgnoreCase(storeType)) {
indexStoreModule = MemoryIndexStoreModule.class; indexStoreModule = RamIndexStoreModule.class;
} else if ("fs".equalsIgnoreCase(storeType)) { } else if ("fs".equalsIgnoreCase(storeType)) {
// nothing to set here ... (we default to fs) // nothing to set here ... (we default to fs)
} else if ("simplefs".equalsIgnoreCase(storeType) || "simple_fs".equals(storeType)) { } else if ("simplefs".equalsIgnoreCase(storeType) || "simple_fs".equals(storeType)) {

View File

@ -1,91 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.store.DirectoryUtils;
import org.apache.lucene.store.bytebuffer.ByteBufferAllocator;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.apache.lucene.store.bytebuffer.ByteBufferFile;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
*/
public final class ByteBufferDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
private final ByteBufferCache byteBufferCache;
@Inject
public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, ByteBufferCache byteBufferCache) {
super(shardId, indexSettings);
this.byteBufferCache = byteBufferCache;
}
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override
public Directory[] build() {
return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)};
}
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
CustomByteBufferDirectory leaf = DirectoryUtils.getLeaf(dir, CustomByteBufferDirectory.class);
assert leaf != null;
leaf.renameTo(from, to);
}
@Override
public void fullDelete(Directory dir) {
}
static class CustomByteBufferDirectory extends ByteBufferDirectory {
CustomByteBufferDirectory() {
}
CustomByteBufferDirectory(ByteBufferAllocator allocator) {
super(allocator);
}
public void renameTo(String from, String to) throws IOException {
ByteBufferFile fromFile = files.get(from);
if (fromFile == null)
throw new FileNotFoundException(from);
ByteBufferFile toFile = files.get(to);
if (toFile != null) {
files.remove(from);
}
files.put(to, fromFile);
}
}
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats;
/**
*
*/
public class ByteBufferIndexStore extends AbstractIndexStore {
private final boolean direct;
@Inject
public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService,
ByteBufferCache byteBufferCache, IndicesStore indicesStore) {
super(index, indexSettings, indexService, indicesStore);
this.direct = byteBufferCache.direct();
}
@Override
public boolean persistent() {
return false;
}
@Override
public Class<? extends DirectoryService> shardDirectory() {
return ByteBufferDirectoryService.class;
}
@Override
public ByteSizeValue backingStoreTotalSpace() {
if (direct) {
// TODO, we can use sigar...
return new ByteSizeValue(-1, ByteSizeUnit.BYTES);
}
return JvmInfo.jvmInfo().mem().heapMax();
}
@Override
public ByteSizeValue backingStoreFreeSpace() {
if (direct) {
return new ByteSizeValue(-1, ByteSizeUnit.BYTES);
}
return JvmStats.jvmStats().mem().heapUsed();
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.store.IndexStore;
/**
*
*/
public class MemoryIndexStoreModule extends AbstractModule {
private final Settings settings;
public MemoryIndexStoreModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(IndexStore.class).to(ByteBufferIndexStore.class).asEagerSingleton();
}
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.index.store.ram; package org.elasticsearch.index.store.ram;
import org.apache.lucene.store.*; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.RAMFile;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -74,5 +76,10 @@ public final class RamDirectoryService extends AbstractIndexShardComponent imple
} }
fileMap.put(to, fromFile); fileMap.put(to, fromFile);
} }
@Override
public String toString() {
return "ram";
}
} }
} }

View File

@ -25,8 +25,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionModule;
import org.elasticsearch.bulk.udp.BulkUdpModule; import org.elasticsearch.bulk.udp.BulkUdpModule;
import org.elasticsearch.bulk.udp.BulkUdpService; import org.elasticsearch.bulk.udp.BulkUdpService;
import org.elasticsearch.cache.NodeCache;
import org.elasticsearch.cache.NodeCacheModule;
import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.CacheRecyclerModule; import org.elasticsearch.cache.recycler.CacheRecyclerModule;
import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler;
@ -157,7 +155,6 @@ public final class InternalNode implements Node {
modules.add(new SettingsModule(settings)); modules.add(new SettingsModule(settings));
modules.add(new NodeModule(this)); modules.add(new NodeModule(this));
modules.add(new NetworkModule()); modules.add(new NetworkModule());
modules.add(new NodeCacheModule(settings));
modules.add(new ScriptModule(settings)); modules.add(new ScriptModule(settings));
modules.add(new EnvironmentModule(environment)); modules.add(new EnvironmentModule(environment));
modules.add(new NodeEnvironmentModule(nodeEnvironment)); modules.add(new NodeEnvironmentModule(nodeEnvironment));
@ -349,9 +346,6 @@ public final class InternalNode implements Node {
injector.getInstance(plugin).close(); injector.getInstance(plugin).close();
} }
stopWatch.stop().start("node_cache");
injector.getInstance(NodeCache.class).close();
stopWatch.stop().start("script"); stopWatch.stop().start("script");
injector.getInstance(ScriptService.class).close(); injector.getInstance(ScriptService.class).close();

View File

@ -1,179 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.*;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class SimpleByteBufferStoreTests extends ElasticsearchTestCase {
@Test
public void test1BufferNoCache() throws Exception {
ByteBufferCache cache = new ByteBufferCache(1, 0, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 1);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void test1Buffer() throws Exception {
ByteBufferCache cache = new ByteBufferCache(1, 10, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 1);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void test3Buffer() throws Exception {
ByteBufferCache cache = new ByteBufferCache(3, 10, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 3);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void test10Buffer() throws Exception {
ByteBufferCache cache = new ByteBufferCache(10, 20, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 10);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void test15Buffer() throws Exception {
ByteBufferCache cache = new ByteBufferCache(15, 30, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 15);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void test40Buffer() throws Exception {
ByteBufferCache cache = new ByteBufferCache(40, 80, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
insertData(dir, 40);
verifyData(dir);
dir.close();
cache.close();
}
@Test
public void testSimpleLocking() throws Exception {
ByteBufferCache cache = new ByteBufferCache(40, 80, true);
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
Lock lock = dir.makeLock("testlock");
assertThat(lock.isLocked(), equalTo(false));
assertThat(lock.obtain(200), equalTo(true));
assertThat(lock.isLocked(), equalTo(true));
try {
assertThat(lock.obtain(200), equalTo(false));
assertThat("lock should be thrown", false, equalTo(true));
} catch (LockObtainFailedException e) {
// all is well
}
lock.release();
assertThat(lock.isLocked(), equalTo(false));
dir.close();
cache.close();
}
private void insertData(ByteBufferDirectory dir, int bufferSizeInBytes) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
IndexOutput indexOutput = dir.createOutput("value1", IOContext.DEFAULT);
indexOutput.writeBytes(new byte[]{2, 4, 6, 7, 8}, 5);
indexOutput.writeInt(-1);
indexOutput.writeLong(10);
indexOutput.writeInt(0);
indexOutput.writeInt(0);
indexOutput.writeBytes(test, 8);
indexOutput.writeBytes(test, 5);
indexOutput.seek(0);
indexOutput.writeByte((byte) 8);
if (bufferSizeInBytes > 4) {
indexOutput.seek(2);
indexOutput.writeBytes(new byte[]{1, 2}, 2);
}
indexOutput.close();
}
private void verifyData(ByteBufferDirectory dir) throws IOException {
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
assertThat(dir.fileExists("value1"), equalTo(true));
assertThat(dir.fileLength("value1"), equalTo(38l));
IndexInput indexInput = dir.openInput("value1", IOContext.DEFAULT);
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 8));
assertThat(indexInput.readInt(), equalTo(-1));
assertThat(indexInput.readLong(), equalTo((long) 10));
assertThat(indexInput.readInt(), equalTo(0));
assertThat(indexInput.readInt(), equalTo(0));
indexInput.readBytes(test, 0, 8);
assertThat(test[0], equalTo((byte) 1));
assertThat(test[7], equalTo((byte) 8));
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 1));
assertThat(test[4], equalTo((byte) 5));
indexInput.seek(28);
assertThat(indexInput.readByte(), equalTo((byte) 4));
indexInput.seek(30);
assertThat(indexInput.readByte(), equalTo((byte) 6));
indexInput.seek(0);
indexInput.readBytes(test, 0, 5);
assertThat(test[0], equalTo((byte) 8));
indexInput.close();
indexInput = dir.openInput("value1", IOContext.DEFAULT);
// iterate over all the data
for (int i = 0; i < 38; i++) {
indexInput.readByte();
}
indexInput.close();
}
}

View File

@ -94,7 +94,7 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
dataPaths = dataPaths(); dataPaths = dataPaths();
assertThat(storeString, equalTo("store(least_used[byte_buffer])")); assertThat(storeString, equalTo("store(least_used[ram])"));
createIndexWithoutRateLimitingStoreType("test", "niofs", "least_used"); createIndexWithoutRateLimitingStoreType("test", "niofs", "least_used");
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();

View File

@ -25,7 +25,6 @@ import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling; import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -36,7 +35,6 @@ import org.elasticsearch.index.store.fs.FsDirectoryService;
import org.elasticsearch.index.store.fs.MmapFsDirectoryService; import org.elasticsearch.index.store.fs.MmapFsDirectoryService;
import org.elasticsearch.index.store.fs.NioFsDirectoryService; import org.elasticsearch.index.store.fs.NioFsDirectoryService;
import org.elasticsearch.index.store.fs.SimpleFsDirectoryService; import org.elasticsearch.index.store.fs.SimpleFsDirectoryService;
import org.elasticsearch.index.store.memory.ByteBufferDirectoryService;
import org.elasticsearch.index.store.ram.RamDirectoryService; import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -123,14 +121,8 @@ public class MockDirectoryHelper {
} }
} }
public DirectoryService randomRamDirecoryService(ByteBufferCache byteBufferCache) { public DirectoryService randomRamDirectoryService() {
switch (random.nextInt(2)) {
case 0:
return new RamDirectoryService(shardId, indexSettings); return new RamDirectoryService(shardId, indexSettings);
default:
return new ByteBufferDirectoryService(shardId, indexSettings, byteBufferCache);
}
} }
public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper { public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.test.store; package org.elasticsearch.test.store;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -35,10 +34,10 @@ public class MockRamDirectoryService extends AbstractIndexShardComponent impleme
private final DirectoryService delegateService; private final DirectoryService delegateService;
@Inject @Inject
public MockRamDirectoryService(ShardId shardId, Settings indexSettings, ByteBufferCache byteBufferCache) { public MockRamDirectoryService(ShardId shardId, Settings indexSettings) {
super(shardId, indexSettings); super(shardId, indexSettings);
helper = new MockDirectoryHelper(shardId, indexSettings, logger); helper = new MockDirectoryHelper(shardId, indexSettings, logger);
delegateService = helper.randomRamDirecoryService(byteBufferCache); delegateService = helper.randomRamDirectoryService();
} }
@Override @Override