Memory Index Store: Separate to two buffer size types, and fix bugs, closes #577.
This commit is contained in:
parent
22fa91efa0
commit
f2eae5b605
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A byte buffer allocator simple allocates byte buffers, and handles releasing
|
||||||
|
* them. Implementation can include special direct buffer cleaning when releasing
|
||||||
|
* a buffer, as well as caching of byte buffers.
|
||||||
|
*
|
||||||
|
* <p>There are two types of buffers that can be allocated, small and big. This
|
||||||
|
* comes in handy when knowing in advance (more or less) the size of the buffers
|
||||||
|
* needed (large files or small), as well as in caching implementations.
|
||||||
|
*/
|
||||||
|
public interface ByteBufferAllocator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to allocator implementations allowing to clean direct buffers.
|
||||||
|
*/
|
||||||
|
public static class Cleaner {
|
||||||
|
public static final boolean CLEAN_SUPPORTED;
|
||||||
|
private static final Method directBufferCleaner;
|
||||||
|
private static final Method directBufferCleanerClean;
|
||||||
|
|
||||||
|
static {
|
||||||
|
Method directBufferCleanerX = null;
|
||||||
|
Method directBufferCleanerCleanX = null;
|
||||||
|
boolean v;
|
||||||
|
try {
|
||||||
|
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
|
||||||
|
directBufferCleanerX.setAccessible(true);
|
||||||
|
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
|
||||||
|
directBufferCleanerCleanX.setAccessible(true);
|
||||||
|
v = true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
v = false;
|
||||||
|
}
|
||||||
|
CLEAN_SUPPORTED = v;
|
||||||
|
directBufferCleaner = directBufferCleanerX;
|
||||||
|
directBufferCleanerClean = directBufferCleanerCleanX;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clean(ByteBuffer buffer) {
|
||||||
|
if (CLEAN_SUPPORTED && buffer.isDirect()) {
|
||||||
|
try {
|
||||||
|
Object cleaner = directBufferCleaner.invoke(buffer);
|
||||||
|
directBufferCleanerClean.invoke(cleaner);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// silently ignore exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static enum Type {
|
||||||
|
SMALL,
|
||||||
|
LARGE
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The size (in bytes) that is allocated for the provided type.
|
||||||
|
*/
|
||||||
|
int sizeInBytes(Type type);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a byte buffer for the specific type.
|
||||||
|
*/
|
||||||
|
ByteBuffer allocate(Type type) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release the buffer.
|
||||||
|
*/
|
||||||
|
void release(ByteBuffer buffer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the allocator, releasing any cached buffers for example.
|
||||||
|
*/
|
||||||
|
void close();
|
||||||
|
}
|
|
@ -17,16 +17,17 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.index.store.memory;
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.store.SingleInstanceLockFactory;
|
import org.apache.lucene.store.SingleInstanceLockFactory;
|
||||||
import org.elasticsearch.cache.memory.ByteBufferCache;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -38,45 +39,69 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
*
|
*
|
||||||
* <p>Each "file" is segmented into one or more byte buffers.
|
* <p>Each "file" is segmented into one or more byte buffers.
|
||||||
*
|
*
|
||||||
* <p>Since its good practice to cache byte buffers, it also provide a simple mechanism to define a cache
|
* <p>If constructed with {@link ByteBufferAllocator}, it allows to control the allocation and release of
|
||||||
* of byte buffers that are reused when possible.
|
* byte buffer. For example, custom implementations can include caching of byte buffers.
|
||||||
*
|
*
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class ByteBufferDirectory extends Directory {
|
public class ByteBufferDirectory extends Directory {
|
||||||
|
|
||||||
final ByteBufferCache byteBufferCache;
|
|
||||||
|
|
||||||
private final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
|
private final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
|
||||||
|
|
||||||
|
private final ByteBufferAllocator allocator;
|
||||||
|
|
||||||
|
private final boolean internalAllocator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new byte buffer directory.
|
* Constructs a new directory using {@link PlainByteBufferAllocator}.
|
||||||
*/
|
*/
|
||||||
public ByteBufferDirectory(ByteBufferCache byteBufferCache) {
|
public ByteBufferDirectory() {
|
||||||
this.byteBufferCache = byteBufferCache;
|
this.allocator = new PlainByteBufferAllocator(false, 1024, 1024 * 10);
|
||||||
|
this.internalAllocator = true;
|
||||||
|
// try {
|
||||||
setLockFactory(new SingleInstanceLockFactory());
|
setLockFactory(new SingleInstanceLockFactory());
|
||||||
|
// } catch (IOException e) {
|
||||||
|
// // will not happen
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
public int bufferSizeInBytes() {
|
/**
|
||||||
return byteBufferCache.bufferSizeInBytes();
|
* Constructs a new byte buffer directory with a custom allocator.
|
||||||
|
*/
|
||||||
|
public ByteBufferDirectory(ByteBufferAllocator allocator) {
|
||||||
|
this.allocator = allocator;
|
||||||
|
this.internalAllocator = false;
|
||||||
|
// try {
|
||||||
|
setLockFactory(new SingleInstanceLockFactory());
|
||||||
|
// } catch (IOException e) {
|
||||||
|
// // will not happen
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public String[] listAll() throws IOException {
|
public void sync(Collection<String> names) throws IOException {
|
||||||
|
// nothing to do here
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] listAll() throws IOException {
|
||||||
return files.keySet().toArray(new String[0]);
|
return files.keySet().toArray(new String[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean fileExists(String name) throws IOException {
|
@Override
|
||||||
|
public boolean fileExists(String name) throws IOException {
|
||||||
return files.containsKey(name);
|
return files.containsKey(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public long fileModified(String name) throws IOException {
|
@Override
|
||||||
|
public long fileModified(String name) throws IOException {
|
||||||
ByteBufferFile file = files.get(name);
|
ByteBufferFile file = files.get(name);
|
||||||
if (file == null)
|
if (file == null)
|
||||||
throw new FileNotFoundException(name);
|
throw new FileNotFoundException(name);
|
||||||
return file.lastModified();
|
return file.getLastModified();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void touchFile(String name) throws IOException {
|
@Override
|
||||||
|
public void touchFile(String name) throws IOException {
|
||||||
ByteBufferFile file = files.get(name);
|
ByteBufferFile file = files.get(name);
|
||||||
if (file == null)
|
if (file == null)
|
||||||
throw new FileNotFoundException(name);
|
throw new FileNotFoundException(name);
|
||||||
|
@ -94,43 +119,59 @@ public class ByteBufferDirectory extends Directory {
|
||||||
ts2 = System.currentTimeMillis();
|
ts2 = System.currentTimeMillis();
|
||||||
} while (ts1 == ts2);
|
} while (ts1 == ts2);
|
||||||
|
|
||||||
file.lastModified(ts2);
|
file.setLastModified(ts2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void deleteFile(String name) throws IOException {
|
@Override
|
||||||
|
public void deleteFile(String name) throws IOException {
|
||||||
ByteBufferFile file = files.remove(name);
|
ByteBufferFile file = files.remove(name);
|
||||||
if (file == null)
|
if (file == null)
|
||||||
throw new FileNotFoundException(name);
|
throw new FileNotFoundException(name);
|
||||||
file.clean();
|
file.clean();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public long fileLength(String name) throws IOException {
|
@Override
|
||||||
|
public long fileLength(String name) throws IOException {
|
||||||
ByteBufferFile file = files.get(name);
|
ByteBufferFile file = files.get(name);
|
||||||
if (file == null)
|
if (file == null)
|
||||||
throw new FileNotFoundException(name);
|
throw new FileNotFoundException(name);
|
||||||
return file.length();
|
return file.getLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public IndexOutput createOutput(String name) throws IOException {
|
@Override
|
||||||
ByteBufferFile file = new ByteBufferFile(this);
|
public IndexOutput createOutput(String name) throws IOException {
|
||||||
|
ByteBufferAllocator.Type allocatorType = ByteBufferAllocator.Type.LARGE;
|
||||||
|
if (name.contains("segments") || name.endsWith(".del")) {
|
||||||
|
allocatorType = ByteBufferAllocator.Type.SMALL;
|
||||||
|
}
|
||||||
|
ByteBufferFile file = new ByteBufferFile(this, allocator.sizeInBytes(allocatorType));
|
||||||
ByteBufferFile existing = files.put(name, file);
|
ByteBufferFile existing = files.put(name, file);
|
||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
existing.clean();
|
existing.clean();
|
||||||
}
|
}
|
||||||
return new ByteBufferIndexOutput(this, file);
|
return new ByteBufferIndexOutput(allocator, allocatorType, file);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public IndexInput openInput(String name) throws IOException {
|
@Override
|
||||||
|
public IndexInput openInput(String name) throws IOException {
|
||||||
ByteBufferFile file = files.get(name);
|
ByteBufferFile file = files.get(name);
|
||||||
if (file == null)
|
if (file == null)
|
||||||
throw new FileNotFoundException(name);
|
throw new FileNotFoundException(name);
|
||||||
return new ByteBufferIndexInput(this, file);
|
return new ByteBufferIndexInput(file);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void close() throws IOException {
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
String[] files = listAll();
|
String[] files = listAll();
|
||||||
for (String file : files) {
|
for (String file : files) {
|
||||||
deleteFile(file);
|
deleteFile(file);
|
||||||
}
|
}
|
||||||
|
if (internalAllocator) {
|
||||||
|
allocator.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void releaseBuffer(ByteBuffer byteBuffer) {
|
||||||
|
allocator.release(byteBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -17,61 +17,65 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.index.store.memory;
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class ByteBufferFile {
|
public class ByteBufferFile {
|
||||||
|
|
||||||
|
private final CopyOnWriteArrayList<ByteBuffer> buffers = new CopyOnWriteArrayList<ByteBuffer>();
|
||||||
private final ByteBufferDirectory dir;
|
private final ByteBufferDirectory dir;
|
||||||
|
final int bufferSize;
|
||||||
private volatile long lastModified = System.currentTimeMillis();
|
|
||||||
|
|
||||||
private volatile long length;
|
private volatile long length;
|
||||||
|
// This is publicly modifiable via Directory.touchFile(), so direct access not supported
|
||||||
|
private volatile long lastModified = System.currentTimeMillis();
|
||||||
|
|
||||||
private volatile ByteBuffer[] buffers;
|
public ByteBufferFile(ByteBufferDirectory dir, int bufferSize) {
|
||||||
|
|
||||||
public ByteBufferFile(ByteBufferDirectory dir) {
|
|
||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
long lastModified() {
|
// For non-stream access from thread that might be concurrent with writing
|
||||||
return lastModified;
|
public long getLength() {
|
||||||
}
|
|
||||||
|
|
||||||
void lastModified(long lastModified) {
|
|
||||||
this.lastModified = lastModified;
|
|
||||||
}
|
|
||||||
|
|
||||||
long length() {
|
|
||||||
return length;
|
return length;
|
||||||
}
|
}
|
||||||
|
|
||||||
void length(long length) {
|
protected void setLength(long length) {
|
||||||
this.length = length;
|
this.length = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer buffer(int i) {
|
// For non-stream access from thread that might be concurrent with writing
|
||||||
return this.buffers[i];
|
public long getLastModified() {
|
||||||
|
return lastModified;
|
||||||
}
|
}
|
||||||
|
|
||||||
int numberOfBuffers() {
|
protected void setLastModified(long lastModified) {
|
||||||
return this.buffers.length;
|
this.lastModified = lastModified;
|
||||||
}
|
}
|
||||||
|
|
||||||
void buffers(ByteBuffer[] buffers) {
|
protected final void addBuffer(ByteBuffer buffer) {
|
||||||
this.buffers = buffers;
|
buffers.add(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final ByteBuffer getBuffer(int index) {
|
||||||
|
return buffers.get(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final int numBuffers() {
|
||||||
|
return buffers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
void clean() {
|
void clean() {
|
||||||
if (buffers != null) {
|
if (buffers != null) {
|
||||||
for (ByteBuffer buffer : buffers) {
|
for (ByteBuffer buffer : buffers) {
|
||||||
dir.byteBufferCache.releaseBuffer(buffer);
|
dir.releaseBuffer(buffer);
|
||||||
}
|
}
|
||||||
buffers = null;
|
buffers.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,171 @@
|
||||||
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import org.apache.lucene.store.IndexInput;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class ByteBufferIndexInput extends IndexInput {
|
||||||
|
|
||||||
|
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();
|
||||||
|
|
||||||
|
private final ByteBufferFile file;
|
||||||
|
private final long length;
|
||||||
|
|
||||||
|
private ByteBuffer currentBuffer;
|
||||||
|
private int currentBufferIndex;
|
||||||
|
|
||||||
|
private long bufferStart;
|
||||||
|
private final int BUFFER_SIZE;
|
||||||
|
|
||||||
|
public ByteBufferIndexInput(ByteBufferFile file) throws IOException {
|
||||||
|
this.file = file;
|
||||||
|
this.length = file.getLength();
|
||||||
|
this.BUFFER_SIZE = file.bufferSize;
|
||||||
|
|
||||||
|
// make sure that we switch to the
|
||||||
|
// first needed buffer lazily
|
||||||
|
currentBufferIndex = -1;
|
||||||
|
currentBuffer = EMPTY_BUFFER;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// nothing to do here
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long length() {
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Override
|
||||||
|
// public short readShort() throws IOException {
|
||||||
|
// try {
|
||||||
|
// return currentBuffer.getShort();
|
||||||
|
// } catch (BufferUnderflowException e) {
|
||||||
|
// return super.readShort();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Override
|
||||||
|
// public int readInt() throws IOException {
|
||||||
|
// try {
|
||||||
|
// return currentBuffer.getInt();
|
||||||
|
// } catch (BufferUnderflowException e) {
|
||||||
|
// return super.readInt();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Override
|
||||||
|
// public long readLong() throws IOException {
|
||||||
|
// try {
|
||||||
|
// return currentBuffer.getLong();
|
||||||
|
// } catch (BufferUnderflowException e) {
|
||||||
|
// return super.readLong();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte readByte() throws IOException {
|
||||||
|
if (!currentBuffer.hasRemaining()) {
|
||||||
|
currentBufferIndex++;
|
||||||
|
switchCurrentBuffer(true);
|
||||||
|
}
|
||||||
|
return currentBuffer.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||||
|
while (len > 0) {
|
||||||
|
if (!currentBuffer.hasRemaining()) {
|
||||||
|
currentBufferIndex++;
|
||||||
|
switchCurrentBuffer(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
int remainInBuffer = currentBuffer.remaining();
|
||||||
|
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
|
||||||
|
currentBuffer.get(b, offset, bytesToCopy);
|
||||||
|
offset += bytesToCopy;
|
||||||
|
len -= bytesToCopy;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getFilePointer() {
|
||||||
|
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void seek(long pos) throws IOException {
|
||||||
|
if (currentBuffer == EMPTY_BUFFER || pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) {
|
||||||
|
currentBufferIndex = (int) (pos / BUFFER_SIZE);
|
||||||
|
if (currentBufferIndex >= file.numBuffers()) {
|
||||||
|
// if we are past EOF, don't throw one here, instead, move it to the last position in the last buffer
|
||||||
|
currentBufferIndex = file.numBuffers() - 1;
|
||||||
|
currentBuffer = currentBufferIndex == -1 ? EMPTY_BUFFER : file.getBuffer(currentBufferIndex);
|
||||||
|
currentBuffer.position(currentBuffer.limit());
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
switchCurrentBuffer(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
currentBuffer.position((int) (pos % BUFFER_SIZE));
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
currentBuffer.position(currentBuffer.limit());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
|
||||||
|
if (currentBufferIndex >= file.numBuffers()) {
|
||||||
|
if (enforceEOF) {
|
||||||
|
throw new IOException("Read past EOF");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ByteBuffer buffer = file.getBuffer(currentBufferIndex);
|
||||||
|
// we must duplicate (and make it read only while we are at it) since we need position and such to be independent
|
||||||
|
currentBuffer = buffer.asReadOnlyBuffer();
|
||||||
|
currentBuffer.position(0);
|
||||||
|
bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
|
||||||
|
// if we are at the tip, limit the current buffer to only whats available to read
|
||||||
|
long buflen = length - bufferStart;
|
||||||
|
if (buflen < BUFFER_SIZE) {
|
||||||
|
currentBuffer.limit((int) buflen);
|
||||||
|
if (enforceEOF && buflen == 0) {
|
||||||
|
throw new IOException("Read past EOF");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object clone() {
|
||||||
|
ByteBufferIndexInput cloned = (ByteBufferIndexInput) super.clone();
|
||||||
|
if (currentBuffer != EMPTY_BUFFER) {
|
||||||
|
cloned.currentBuffer = currentBuffer.asReadOnlyBuffer();
|
||||||
|
cloned.currentBuffer.position(currentBuffer.position());
|
||||||
|
}
|
||||||
|
return cloned;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,37 +17,60 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.index.store.memory;
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
|
||||||
*/
|
*/
|
||||||
public class ByteBufferIndexOutput extends IndexOutput {
|
public class ByteBufferIndexOutput extends IndexOutput {
|
||||||
|
|
||||||
private final ByteBufferDirectory dir;
|
private final ByteBufferAllocator allocator;
|
||||||
|
private final ByteBufferAllocator.Type allocatorType;
|
||||||
|
private final int BUFFER_SIZE;
|
||||||
private final ByteBufferFile file;
|
private final ByteBufferFile file;
|
||||||
|
|
||||||
private ByteBuffer currentBuffer;
|
private ByteBuffer currentBuffer;
|
||||||
private int currentBufferIndex;
|
private int currentBufferIndex;
|
||||||
|
|
||||||
private long bufferStart;
|
private long bufferStart;
|
||||||
private int bufferLength;
|
|
||||||
|
|
||||||
private ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
|
public ByteBufferIndexOutput(ByteBufferAllocator allocator, ByteBufferAllocator.Type allocatorType, ByteBufferFile file) throws IOException {
|
||||||
|
this.allocator = allocator;
|
||||||
public ByteBufferIndexOutput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException {
|
this.allocatorType = allocatorType;
|
||||||
this.dir = dir;
|
this.BUFFER_SIZE = file.bufferSize;
|
||||||
this.file = file;
|
this.file = file;
|
||||||
|
// create the first buffer we write to
|
||||||
switchCurrentBuffer();
|
switchCurrentBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void writeByte(byte b) throws IOException {
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void seek(long pos) throws IOException {
|
||||||
|
// set the file length in case we seek back
|
||||||
|
// and flush() has not been called yet
|
||||||
|
setFileLength();
|
||||||
|
if (pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) {
|
||||||
|
currentBufferIndex = (int) (pos / BUFFER_SIZE);
|
||||||
|
switchCurrentBuffer();
|
||||||
|
}
|
||||||
|
currentBuffer.position((int) (pos % BUFFER_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long length() {
|
||||||
|
return file.getLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeByte(byte b) throws IOException {
|
||||||
if (!currentBuffer.hasRemaining()) {
|
if (!currentBuffer.hasRemaining()) {
|
||||||
currentBufferIndex++;
|
currentBufferIndex++;
|
||||||
switchCurrentBuffer();
|
switchCurrentBuffer();
|
||||||
|
@ -55,7 +78,8 @@ public class ByteBufferIndexOutput extends IndexOutput {
|
||||||
currentBuffer.put(b);
|
currentBuffer.put(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void writeBytes(byte[] b, int offset, int len) throws IOException {
|
@Override
|
||||||
|
public void writeBytes(byte[] b, int offset, int len) throws IOException {
|
||||||
while (len > 0) {
|
while (len > 0) {
|
||||||
if (!currentBuffer.hasRemaining()) {
|
if (!currentBuffer.hasRemaining()) {
|
||||||
currentBufferIndex++;
|
currentBufferIndex++;
|
||||||
|
@ -70,51 +94,32 @@ public class ByteBufferIndexOutput extends IndexOutput {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void flush() throws IOException {
|
|
||||||
file.lastModified(System.currentTimeMillis());
|
|
||||||
setFileLength();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void close() throws IOException {
|
|
||||||
flush();
|
|
||||||
file.buffers(buffers.toArray(new ByteBuffer[buffers.size()]));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public long getFilePointer() {
|
|
||||||
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void seek(long pos) throws IOException {
|
|
||||||
// set the file length in case we seek back
|
|
||||||
// and flush() has not been called yet
|
|
||||||
setFileLength();
|
|
||||||
if (pos < bufferStart || pos >= bufferStart + bufferLength) {
|
|
||||||
currentBufferIndex = (int) (pos / dir.byteBufferCache.bufferSizeInBytes());
|
|
||||||
switchCurrentBuffer();
|
|
||||||
}
|
|
||||||
currentBuffer.position((int) (pos % dir.byteBufferCache.bufferSizeInBytes()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public long length() throws IOException {
|
|
||||||
return file.length();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void switchCurrentBuffer() throws IOException {
|
private void switchCurrentBuffer() throws IOException {
|
||||||
if (currentBufferIndex == buffers.size()) {
|
if (currentBufferIndex == file.numBuffers()) {
|
||||||
currentBuffer = dir.byteBufferCache.acquireBuffer();
|
currentBuffer = allocator.allocate(allocatorType);
|
||||||
buffers.add(currentBuffer);
|
file.addBuffer(currentBuffer);
|
||||||
} else {
|
} else {
|
||||||
currentBuffer = buffers.get(currentBufferIndex);
|
currentBuffer = file.getBuffer(currentBufferIndex);
|
||||||
}
|
}
|
||||||
currentBuffer.position(0);
|
currentBuffer.position(0);
|
||||||
bufferStart = (long) dir.byteBufferCache.bufferSizeInBytes() * (long) currentBufferIndex;
|
bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
|
||||||
bufferLength = currentBuffer.capacity();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setFileLength() {
|
private void setFileLength() {
|
||||||
long pointer = bufferStart + currentBuffer.position();
|
long pointer = bufferStart + currentBuffer.position();
|
||||||
if (pointer > file.length()) {
|
if (pointer > file.getLength()) {
|
||||||
file.length(pointer);
|
file.setLength(pointer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() throws IOException {
|
||||||
|
file.setLastModified(System.currentTimeMillis());
|
||||||
|
setFileLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getFilePointer() {
|
||||||
|
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The caching byte buffer allocator allows to define a global size for both the small and large buffers
|
||||||
|
* allocated. Those will be reused when possible.
|
||||||
|
*/
|
||||||
|
public class CachingByteBufferAllocator extends PlainByteBufferAllocator {
|
||||||
|
|
||||||
|
private final ArrayBlockingQueue<ByteBuffer> smallCache;
|
||||||
|
private final ArrayBlockingQueue<ByteBuffer> largeCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param direct If set to true, will allocate direct buffers (off heap).
|
||||||
|
* @param smallBufferSizeInBytes The size (in bytes) of the small buffer allocation.
|
||||||
|
* @param largeBufferSizeInBytes The size (in bytes) of the large buffer allocation.
|
||||||
|
* @param smallCacheSizeInBytes The size of the small cache buffer in bytes.
|
||||||
|
* @param largeCacheSizeInBytes The size of the large cache buffer in bytes.
|
||||||
|
*/
|
||||||
|
public CachingByteBufferAllocator(boolean direct, int smallBufferSizeInBytes, int largeBufferSizeInBytes,
|
||||||
|
int smallCacheSizeInBytes, int largeCacheSizeInBytes) {
|
||||||
|
super(direct, smallBufferSizeInBytes, largeBufferSizeInBytes);
|
||||||
|
this.smallCache = new ArrayBlockingQueue<ByteBuffer>(smallCacheSizeInBytes / smallBufferSizeInBytes);
|
||||||
|
this.largeCache = new ArrayBlockingQueue<ByteBuffer>(largeCacheSizeInBytes / largeBufferSizeInBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public ByteBuffer allocate(Type type) throws IOException {
|
||||||
|
ByteBuffer buffer = type == Type.SMALL ? smallCache.poll() : largeCache.poll();
|
||||||
|
if (buffer == null) {
|
||||||
|
buffer = super.allocate(type);
|
||||||
|
}
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release(ByteBuffer buffer) {
|
||||||
|
if (buffer.capacity() == smallBufferSizeInBytes) {
|
||||||
|
boolean success = smallCache.offer(buffer);
|
||||||
|
if (!success) {
|
||||||
|
super.release(buffer);
|
||||||
|
}
|
||||||
|
} else if (buffer.capacity() == largeBufferSizeInBytes) {
|
||||||
|
boolean success = largeCache.offer(buffer);
|
||||||
|
if (!success) {
|
||||||
|
super.release(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// otherwise, just ignore it? not our allocation...
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
for (ByteBuffer buffer : smallCache) {
|
||||||
|
super.release(buffer);
|
||||||
|
}
|
||||||
|
smallCache.clear();
|
||||||
|
for (ByteBuffer buffer : largeCache) {
|
||||||
|
super.release(buffer);
|
||||||
|
}
|
||||||
|
largeCache.clear();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.lucene.store.bytebuffer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple byte buffer allocator that does not caching. The direct flag
|
||||||
|
* allows to control if the byte buffer will be allocated off heap or not.
|
||||||
|
*/
|
||||||
|
public class PlainByteBufferAllocator implements ByteBufferAllocator {
|
||||||
|
|
||||||
|
protected final boolean direct;
|
||||||
|
|
||||||
|
protected final int smallBufferSizeInBytes;
|
||||||
|
|
||||||
|
protected final int largeBufferSizeInBytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new plain byte buffer allocator that does no caching.
|
||||||
|
*
|
||||||
|
* @param direct If set to true, will allocate direct buffers (off heap).
|
||||||
|
* @param smallBufferSizeInBytes The size (in bytes) of the small buffer allocation.
|
||||||
|
* @param largeBufferSizeInBytes The size (in bytes) of the large buffer allocation.
|
||||||
|
*/
|
||||||
|
public PlainByteBufferAllocator(boolean direct, int smallBufferSizeInBytes, int largeBufferSizeInBytes) {
|
||||||
|
this.direct = direct;
|
||||||
|
this.smallBufferSizeInBytes = smallBufferSizeInBytes;
|
||||||
|
this.largeBufferSizeInBytes = largeBufferSizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int sizeInBytes(Type type) {
|
||||||
|
return type == Type.SMALL ? smallBufferSizeInBytes : largeBufferSizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteBuffer allocate(Type type) throws IOException {
|
||||||
|
int sizeToAllocate = type == Type.SMALL ? smallBufferSizeInBytes : largeBufferSizeInBytes;
|
||||||
|
if (direct) {
|
||||||
|
return ByteBuffer.allocateDirect(sizeToAllocate);
|
||||||
|
}
|
||||||
|
return ByteBuffer.allocate(sizeToAllocate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release(ByteBuffer buffer) {
|
||||||
|
Cleaner.clean(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
// nothing to do here...
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.cache.memory;
|
package org.elasticsearch.cache.memory;
|
||||||
|
|
||||||
|
import org.apache.lucene.store.bytebuffer.ByteBufferAllocator;
|
||||||
|
import org.apache.lucene.store.bytebuffer.CachingByteBufferAllocator;
|
||||||
|
import org.apache.lucene.store.bytebuffer.PlainByteBufferAllocator;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
@ -26,156 +29,76 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
*/
|
*/
|
||||||
public class ByteBufferCache extends AbstractComponent {
|
public class ByteBufferCache extends AbstractComponent implements ByteBufferAllocator {
|
||||||
|
|
||||||
public static final boolean CLEAN_SUPPORTED;
|
|
||||||
|
|
||||||
private static final Method directBufferCleaner;
|
|
||||||
private static final Method directBufferCleanerClean;
|
|
||||||
|
|
||||||
static {
|
|
||||||
Method directBufferCleanerX = null;
|
|
||||||
Method directBufferCleanerCleanX = null;
|
|
||||||
boolean v;
|
|
||||||
try {
|
|
||||||
directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
|
|
||||||
directBufferCleanerX.setAccessible(true);
|
|
||||||
directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
|
|
||||||
directBufferCleanerCleanX.setAccessible(true);
|
|
||||||
v = true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
v = false;
|
|
||||||
}
|
|
||||||
CLEAN_SUPPORTED = v;
|
|
||||||
directBufferCleaner = directBufferCleanerX;
|
|
||||||
directBufferCleanerClean = directBufferCleanerCleanX;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private final Queue<ByteBuffer> cache;
|
|
||||||
|
|
||||||
private final boolean disableCache;
|
|
||||||
|
|
||||||
private final int bufferSizeInBytes;
|
|
||||||
|
|
||||||
private final long cacheSizeInBytes;
|
|
||||||
|
|
||||||
private final boolean direct;
|
private final boolean direct;
|
||||||
|
|
||||||
private final AtomicLong acquiredBuffers = new AtomicLong();
|
private final ByteSizeValue smallBufferSize;
|
||||||
|
private final ByteSizeValue largeBufferSize;
|
||||||
|
|
||||||
|
private final ByteSizeValue smallCacheSize;
|
||||||
|
private final ByteSizeValue largeCacheSize;
|
||||||
|
|
||||||
|
private final ByteBufferAllocator allocator;
|
||||||
|
|
||||||
public ByteBufferCache() {
|
public ByteBufferCache() {
|
||||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
|
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBufferCache(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct, boolean warmCache) {
|
// really, for testing...
|
||||||
this(ImmutableSettings.settingsBuilder().put("buffer_size", bufferSizeInBytes).put("cache_size", cacheSizeInBytes).put("direct", direct).put("warm_cache", warmCache).build());
|
public ByteBufferCache(int bufferSizeInBytes, int cacheSizeInBytes, boolean direct) {
|
||||||
|
this(ImmutableSettings.settingsBuilder()
|
||||||
|
.put("cache.memory.small_buffer_size", bufferSizeInBytes)
|
||||||
|
.put("cache.memory.small_cache_size", cacheSizeInBytes)
|
||||||
|
.put("cache.memory.large_buffer_size", bufferSizeInBytes)
|
||||||
|
.put("cache.memory.large_cache_size", cacheSizeInBytes)
|
||||||
|
.put("cache.memory.direct", direct).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Inject public ByteBufferCache(Settings settings) {
|
@Inject public ByteBufferCache(Settings settings) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
|
||||||
this.bufferSizeInBytes = (int) componentSettings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
|
|
||||||
long cacheSizeInBytes = componentSettings.getAsBytesSize("cache_size", new ByteSizeValue(200, ByteSizeUnit.MB)).bytes();
|
|
||||||
this.direct = componentSettings.getAsBoolean("direct", true);
|
this.direct = componentSettings.getAsBoolean("direct", true);
|
||||||
boolean warmCache = componentSettings.getAsBoolean("warm_cache", false);
|
this.smallBufferSize = componentSettings.getAsBytesSize("small_buffer_size", new ByteSizeValue(1, ByteSizeUnit.KB));
|
||||||
|
this.largeBufferSize = componentSettings.getAsBytesSize("large_buffer_size", new ByteSizeValue(1, ByteSizeUnit.MB));
|
||||||
|
this.smallCacheSize = componentSettings.getAsBytesSize("small_cache_size", new ByteSizeValue(10, ByteSizeUnit.MB));
|
||||||
|
this.largeCacheSize = componentSettings.getAsBytesSize("large_cache_size", new ByteSizeValue(500, ByteSizeUnit.MB));
|
||||||
|
|
||||||
disableCache = cacheSizeInBytes == 0;
|
if (smallCacheSize.bytes() == 0 || largeCacheSize.bytes() == 0) {
|
||||||
if (!disableCache && cacheSizeInBytes < bufferSizeInBytes) {
|
this.allocator = new PlainByteBufferAllocator(direct, (int) smallBufferSize.bytes(), (int) largeBufferSize.bytes());
|
||||||
throw new IllegalArgumentException("Cache size [" + cacheSizeInBytes + "] is smaller than buffer size [" + bufferSizeInBytes + "]");
|
} else {
|
||||||
|
this.allocator = new CachingByteBufferAllocator(direct, (int) smallBufferSize.bytes(), (int) largeBufferSize.bytes(), (int) smallCacheSize.bytes(), (int) largeCacheSize.bytes());
|
||||||
}
|
}
|
||||||
int numberOfCacheEntries = (int) (cacheSizeInBytes / bufferSizeInBytes);
|
|
||||||
this.cache = disableCache ? null : new ArrayBlockingQueue<ByteBuffer>(numberOfCacheEntries);
|
|
||||||
this.cacheSizeInBytes = disableCache ? 0 : numberOfCacheEntries * bufferSizeInBytes;
|
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("using bytebuffer cache with buffer_size [{}], cache_size [{}], direct [{}], warm_cache [{}]",
|
logger.debug("using bytebuffer cache with small_buffer_size [{}], large_buffer_size [{}], small_cache_size [{}], large_cache_size [{}], direct [{}]",
|
||||||
new ByteSizeValue(bufferSizeInBytes), new ByteSizeValue(cacheSizeInBytes), direct, warmCache);
|
smallBufferSize, largeBufferSize, smallCacheSize, largeCacheSize, direct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteSizeValue bufferSize() {
|
|
||||||
return new ByteSizeValue(bufferSizeInBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteSizeValue cacheSize() {
|
|
||||||
return new ByteSizeValue(cacheSizeInBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteSizeValue allocatedMemory() {
|
|
||||||
return new ByteSizeValue(acquiredBuffers.get() * bufferSizeInBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int bufferSizeInBytes() {
|
|
||||||
return bufferSizeInBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean direct() {
|
public boolean direct() {
|
||||||
return direct;
|
return this.direct;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
if (!disableCache) {
|
allocator.close();
|
||||||
ByteBuffer buffer = cache.poll();
|
|
||||||
while (buffer != null) {
|
|
||||||
closeBuffer(buffer);
|
|
||||||
buffer = cache.poll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
acquiredBuffers.set(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer acquireBuffer() {
|
@Override public int sizeInBytes(Type type) {
|
||||||
acquiredBuffers.incrementAndGet();
|
return allocator.sizeInBytes(type);
|
||||||
if (disableCache) {
|
|
||||||
return createBuffer();
|
|
||||||
}
|
|
||||||
ByteBuffer byteBuffer = cache.poll();
|
|
||||||
if (byteBuffer == null) {
|
|
||||||
// everything is taken, return a new one
|
|
||||||
return createBuffer();
|
|
||||||
}
|
|
||||||
byteBuffer.position(0);
|
|
||||||
return byteBuffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void releaseBuffer(ByteBuffer byteBuffer) {
|
@Override public ByteBuffer allocate(Type type) throws IOException {
|
||||||
acquiredBuffers.decrementAndGet();
|
return allocator.allocate(type);
|
||||||
if (disableCache) {
|
|
||||||
closeBuffer(byteBuffer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
boolean success = cache.offer(byteBuffer);
|
|
||||||
if (!success) {
|
|
||||||
closeBuffer(byteBuffer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuffer createBuffer() {
|
@Override public void release(ByteBuffer buffer) {
|
||||||
if (direct) {
|
allocator.release(buffer);
|
||||||
return ByteBuffer.allocateDirect(bufferSizeInBytes);
|
|
||||||
}
|
|
||||||
return ByteBuffer.allocate(bufferSizeInBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void closeBuffer(ByteBuffer byteBuffer) {
|
|
||||||
if (direct && CLEAN_SUPPORTED) {
|
|
||||||
try {
|
|
||||||
Object cleaner = directBufferCleaner.invoke(byteBuffer);
|
|
||||||
directBufferCleanerClean.invoke(cleaner);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.debug("Failed to clean memory");
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.index.store.fs;
|
package org.elasticsearch.index.store.fs;
|
||||||
|
|
||||||
import org.apache.lucene.store.*;
|
import org.apache.lucene.store.*;
|
||||||
|
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
|
||||||
import org.elasticsearch.cache.memory.ByteBufferCache;
|
import org.elasticsearch.cache.memory.ByteBufferCache;
|
||||||
import org.elasticsearch.common.collect.ImmutableSet;
|
import org.elasticsearch.common.collect.ImmutableSet;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
@ -29,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
import org.elasticsearch.index.store.memory.ByteBufferDirectory;
|
|
||||||
import org.elasticsearch.index.store.support.AbstractStore;
|
import org.elasticsearch.index.store.support.AbstractStore;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
|
@ -1,114 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elastic Search and Shay Banon under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. Elastic Search licenses this
|
|
||||||
* file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.index.store.memory;
|
|
||||||
|
|
||||||
import org.apache.lucene.store.IndexInput;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author kimchy (shay.banon)
|
|
||||||
*/
|
|
||||||
public class ByteBufferIndexInput extends IndexInput {
|
|
||||||
|
|
||||||
private final ByteBufferFile file;
|
|
||||||
private final int bufferSize;
|
|
||||||
private final long length;
|
|
||||||
|
|
||||||
private ByteBuffer currentBuffer;
|
|
||||||
private int currentBufferIndex;
|
|
||||||
|
|
||||||
private long bufferStart;
|
|
||||||
|
|
||||||
|
|
||||||
public ByteBufferIndexInput(ByteBufferDirectory dir, ByteBufferFile file) throws IOException {
|
|
||||||
this.file = file;
|
|
||||||
this.bufferSize = dir.byteBufferCache.bufferSizeInBytes();
|
|
||||||
this.length = file.length();
|
|
||||||
switchCurrentBuffer(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public byte readByte() throws IOException {
|
|
||||||
if (!currentBuffer.hasRemaining()) {
|
|
||||||
currentBufferIndex++;
|
|
||||||
switchCurrentBuffer(true);
|
|
||||||
}
|
|
||||||
return currentBuffer.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
|
|
||||||
while (len > 0) {
|
|
||||||
if (!currentBuffer.hasRemaining()) {
|
|
||||||
currentBufferIndex++;
|
|
||||||
switchCurrentBuffer(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
int remainInBuffer = currentBuffer.remaining();
|
|
||||||
int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
|
|
||||||
currentBuffer.get(b, offset, bytesToCopy);
|
|
||||||
offset += bytesToCopy;
|
|
||||||
len -= bytesToCopy;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void close() throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public long getFilePointer() {
|
|
||||||
return currentBufferIndex < 0 ? 0 : bufferStart + currentBuffer.position();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void seek(long pos) throws IOException {
|
|
||||||
if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + bufferSize) {
|
|
||||||
currentBufferIndex = (int) (pos / bufferSize);
|
|
||||||
switchCurrentBuffer(false);
|
|
||||||
}
|
|
||||||
currentBuffer.position((int) (pos % bufferSize));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public long length() {
|
|
||||||
return length;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void switchCurrentBuffer(boolean enforceEOF) throws IOException {
|
|
||||||
if (currentBufferIndex >= file.numberOfBuffers()) {
|
|
||||||
// end of file reached, no more buffers left
|
|
||||||
if (enforceEOF)
|
|
||||||
throw new IOException("Read past EOF");
|
|
||||||
else {
|
|
||||||
// Force EOF if a read takes place at this position
|
|
||||||
currentBufferIndex--;
|
|
||||||
currentBuffer.position(bufferSize);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// we must duplicate (and make it read only while we are at it) since we need position and such to be independant
|
|
||||||
currentBuffer = file.buffer(currentBufferIndex).asReadOnlyBuffer();
|
|
||||||
currentBuffer.position(0);
|
|
||||||
bufferStart = (long) bufferSize * (long) currentBufferIndex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public Object clone() {
|
|
||||||
ByteBufferIndexInput cloned = (ByteBufferIndexInput) super.clone();
|
|
||||||
cloned.currentBuffer = currentBuffer.asReadOnlyBuffer();
|
|
||||||
return cloned;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.index.store.memory;
|
package org.elasticsearch.index.store.memory;
|
||||||
|
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
|
||||||
import org.elasticsearch.cache.memory.ByteBufferCache;
|
import org.elasticsearch.cache.memory.ByteBufferCache;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.store.Lock;
|
import org.apache.lucene.store.Lock;
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
|
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
|
||||||
import org.elasticsearch.cache.memory.ByteBufferCache;
|
import org.elasticsearch.cache.memory.ByteBufferCache;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@ -37,61 +38,61 @@ import static org.hamcrest.Matchers.*;
|
||||||
public class SimpleByteBufferStoreTests {
|
public class SimpleByteBufferStoreTests {
|
||||||
|
|
||||||
@Test public void test1BufferNoCache() throws Exception {
|
@Test public void test1BufferNoCache() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(1, 0, true, false);
|
ByteBufferCache cache = new ByteBufferCache(1, 0, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 1);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void test1Buffer() throws Exception {
|
@Test public void test1Buffer() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(1, 10, true, false);
|
ByteBufferCache cache = new ByteBufferCache(1, 10, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 1);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void test3Buffer() throws Exception {
|
@Test public void test3Buffer() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(3, 10, true, false);
|
ByteBufferCache cache = new ByteBufferCache(3, 10, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 3);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void test10Buffer() throws Exception {
|
@Test public void test10Buffer() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(10, 20, true, false);
|
ByteBufferCache cache = new ByteBufferCache(10, 20, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 10);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void test15Buffer() throws Exception {
|
@Test public void test15Buffer() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(15, 30, true, false);
|
ByteBufferCache cache = new ByteBufferCache(15, 30, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 15);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void test40Buffer() throws Exception {
|
@Test public void test40Buffer() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(40, 80, true, false);
|
ByteBufferCache cache = new ByteBufferCache(40, 80, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
insertData(dir);
|
insertData(dir, 40);
|
||||||
verifyData(dir);
|
verifyData(dir);
|
||||||
dir.close();
|
dir.close();
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testSimpleLocking() throws Exception {
|
@Test public void testSimpleLocking() throws Exception {
|
||||||
ByteBufferCache cache = new ByteBufferCache(40, 80, true, false);
|
ByteBufferCache cache = new ByteBufferCache(40, 80, true);
|
||||||
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
ByteBufferDirectory dir = new ByteBufferDirectory(cache);
|
||||||
|
|
||||||
Lock lock = dir.makeLock("testlock");
|
Lock lock = dir.makeLock("testlock");
|
||||||
|
@ -111,7 +112,7 @@ public class SimpleByteBufferStoreTests {
|
||||||
cache.close();
|
cache.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertData(ByteBufferDirectory dir) throws IOException {
|
private void insertData(ByteBufferDirectory dir, int bufferSizeInBytes) throws IOException {
|
||||||
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
|
byte[] test = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
|
||||||
IndexOutput indexOutput = dir.createOutput("value1");
|
IndexOutput indexOutput = dir.createOutput("value1");
|
||||||
indexOutput.writeBytes(new byte[]{2, 4, 6, 7, 8}, 5);
|
indexOutput.writeBytes(new byte[]{2, 4, 6, 7, 8}, 5);
|
||||||
|
@ -124,7 +125,7 @@ public class SimpleByteBufferStoreTests {
|
||||||
|
|
||||||
indexOutput.seek(0);
|
indexOutput.seek(0);
|
||||||
indexOutput.writeByte((byte) 8);
|
indexOutput.writeByte((byte) 8);
|
||||||
if (dir.bufferSizeInBytes() > 4) {
|
if (bufferSizeInBytes > 4) {
|
||||||
indexOutput.seek(2);
|
indexOutput.seek(2);
|
||||||
indexOutput.writeBytes(new byte[]{1, 2}, 2);
|
indexOutput.writeBytes(new byte[]{1, 2}, 2);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue