LUCENE-2500: add DirectIOLinuxDirectory, a Linux-only (requires compiling .cpp JNI code per platform) which bypasses the buffer cache so segment merging does not evict cached files

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@957520 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-06-24 12:14:21 +00:00
parent 50ab75baa3
commit f289f22125
5 changed files with 779 additions and 3 deletions

View File

@ -7,6 +7,14 @@ Build
* LUCENE-2413: Moved the demo out of lucene core and into contrib/demo.
(Robert Muir)
New Features
* LUCENE-2500: Added DirectIOLinuxDirectory, a Linux-specific
Directory impl that uses the O_DIRECT flag to bypass the buffer
cache. This is useful to prevent segment merging from evicting
pages from the buffer cache, since fadvise/madvise do not seem.
(Michael McCandless)
======================= Lucene 3.x (not yet released) =======================
Changes in backwards compatibility policy

View File

@ -0,0 +1,365 @@
package org.apache.lucene.store;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* An {@link Directory} implementation that uses the
* Linux-specific O_DIRECT flag to bypass all OS level
* caching. To use this you must compile
* NativePosixUtil.cpp (exposes Linux-specific APIs through
* JNI) for your platform.
*
* <p><b>WARNING</b>: this code is very new and quite easily
* could contain horrible bugs. For example, here's one
* known issue: if you use seek in IndexOutput, and then
* write more than one buffer's worth of bytes, then the
* file will be wrong. Lucene does not do this (only writes
* small number of bytes after seek).
* @lucene.experimental
*/
public class DirectIOLinuxDirectory extends FSDirectory {
private final static long ALIGN = 512;
private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
/** Create a new NIOFSDirectory for the named location.
*
* @param path the path of the directory
* @param lockFactory the lock factory to use, or null for the default
* ({@link NativeFSLockFactory});
* @throws IOException
*/
public DirectIOLinuxDirectory(File path, LockFactory lockFactory) throws IOException {
super(path, lockFactory);
}
/** Create a new NIOFSDirectory for the named location and {@link NativeFSLockFactory}.
*
* @param path the path of the directory
* @throws IOException
*/
public DirectIOLinuxDirectory(File path) throws IOException {
super(path, null);
}
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
ensureOpen();
return new DirectIOLinuxIndexInput(new File(getDirectory(), name), bufferSize);
}
@Override
public IndexOutput createOutput(String name) throws IOException {
ensureOpen();
ensureCanWrite(name);
return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), 4096);
}
private final static class DirectIOLinuxIndexOutput extends IndexOutput {
private final ByteBuffer buffer;
private final FileOutputStream fos;
private final FileChannel channel;
private final int bufferSize;
//private final File path;
private int bufferPos;
private long filePos;
private long fileLength;
private boolean isOpen;
public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
//this.path = path;
bufferSize = 1024*1024;
FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
fos = new FileOutputStream(fd);
//fos = new FileOutputStream(path);
channel = fos.getChannel();
buffer = ByteBuffer.allocateDirect(bufferSize);
this.bufferSize = bufferSize;
isOpen = true;
}
@Override
public void writeByte(byte b) throws IOException {
assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
buffer.put(b);
if (++bufferPos == bufferSize) {
dump();
}
}
@Override
public void writeBytes(byte[] src, int offset, int len) throws IOException {
int toWrite = len;
while(true) {
final int left = bufferSize - bufferPos;
if (left <= toWrite) {
buffer.put(src, offset, left);
toWrite -= left;
offset += left;
bufferPos = bufferSize;
dump();
} else {
buffer.put(src, offset, toWrite);
bufferPos += toWrite;
break;
}
}
}
//@Override
//public void setLength() throws IOException {
// TODO -- how to impl this? neither FOS nor
// FileChannel provides an API?
//}
@Override
public void flush() throws IOException {
// TODO -- I don't think this method is necessary?
}
private void dump() throws IOException {
buffer.flip();
final long limit = filePos + buffer.limit();
if (limit > fileLength) {
// this dump extends the file
fileLength = limit;
} else {
// we had seek'd back & wrote some changes
}
// must always round to next block
buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
assert (filePos & ALIGN_NOT_MASK) == filePos;
//System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
channel.write(buffer, filePos);
filePos += bufferPos;
bufferPos = 0;
buffer.clear();
//System.out.println("dump: done");
// TODO: the case where we'd seek'd back, wrote an
// entire buffer, we must here read the next buffer;
// likely Lucene won't trip on this since we only
// write smallish amounts on seeking back
}
@Override
public long getFilePointer() {
return filePos + bufferPos;
}
// TODO: seek is fragile at best; it can only properly
// handle seek & then change bytes that fit entirely
// within one buffer
@Override
public void seek(long pos) throws IOException {
if (pos != getFilePointer()) {
dump();
final long alignedPos = pos & ALIGN_NOT_MASK;
filePos = alignedPos;
int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
if (n < bufferSize) {
buffer.limit(n);
}
//System.out.println("seek refill=" + n);
final int delta = (int) (pos - alignedPos);
buffer.position(delta);
bufferPos = delta;
}
}
@Override
public long length() throws IOException {
return fileLength;
}
@Override
public void close() throws IOException {
if (isOpen) {
isOpen = false;
try {
dump();
} finally {
try {
//System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
channel.truncate(fileLength);
//System.out.println(" now: " + channel.size());
} finally {
try {
channel.close();
} finally {
fos.close();
//System.out.println(" final len=" + path.length());
}
}
}
}
}
}
private final static class DirectIOLinuxIndexInput extends IndexInput {
private final ByteBuffer buffer;
private final FileInputStream fis;
private final FileChannel channel;
private final int bufferSize;
private boolean isOpen;
private boolean isClone;
private long filePos;
private int bufferPos;
public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
bufferSize = 1024*1024;
FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
fis = new FileInputStream(fd);
channel = fis.getChannel();
this.bufferSize = bufferSize;
buffer = ByteBuffer.allocateDirect(bufferSize);
isOpen = true;
isClone = false;
filePos = -bufferSize;
bufferPos = bufferSize;
//System.out.println("D open " + path + " this=" + this);
}
// for clone
public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
this.fis = null;
channel = other.channel;
this.bufferSize = other.bufferSize;
buffer = ByteBuffer.allocateDirect(bufferSize);
filePos = -bufferSize;
bufferPos = bufferSize;
isOpen = true;
isClone = true;
//System.out.println("D clone this=" + this);
seek(other.getFilePointer());
}
@Override
public void close() throws IOException {
if (isOpen && !isClone) {
try {
channel.close();
} finally {
if (!isClone) {
fis.close();
}
}
}
}
@Override
public long getFilePointer() {
return filePos + bufferPos;
}
@Override
public void seek(long pos) throws IOException {
if (pos != getFilePointer()) {
final long alignedPos = pos & ALIGN_NOT_MASK;
//System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
filePos = alignedPos-bufferSize;
refill();
final int delta = (int) (pos - alignedPos);
buffer.position(delta);
bufferPos = delta;
}
}
@Override
public long length() {
try {
return channel.size();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public byte readByte() throws IOException {
// NOTE: we don't guard against EOF here... ie the
// "final" buffer will typically be filled to less
// than bufferSize
if (bufferPos == bufferSize) {
refill();
}
assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
bufferPos++;
return buffer.get();
}
private void refill() throws IOException {
buffer.clear();
filePos += bufferSize;
bufferPos = 0;
assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
//System.out.println("X refill filePos=" + filePos);
int n = channel.read(buffer, filePos);
if (n < 0) {
throw new IOException("eof");
}
buffer.rewind();
}
@Override
public void readBytes(byte[] dst, int offset, int len) throws IOException {
int toRead = len;
//System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
while(true) {
final int left = bufferSize - bufferPos;
if (left < toRead) {
//System.out.println(" copy " + left);
buffer.get(dst, offset, left);
toRead -= left;
offset += left;
refill();
} else {
//System.out.println(" copy " + toRead);
buffer.get(dst, offset, toRead);
bufferPos += toRead;
//System.out.println(" readBytes done");
break;
}
}
}
@Override
public Object clone() {
try {
return new DirectIOLinuxIndexInput(this);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
}

View File

@ -0,0 +1,303 @@
#include <jni.h>
#include <fcntl.h> // posix_fadvise
#include <string.h> // strerror
#include <errno.h> // errno
#include <unistd.h> // pread
#include <sys/mman.h> // posix_madvise, madvise
// java -cp .:lib/junit-4.7.jar:./build/classes/test:./build/classes/java:./build/classes/demo -Dlucene.version=2.9-dev -DtempDir=build -ea org.junit.runner.JUnitCore org.apache.lucene.index.TestDoc
/*
* Class: org_apache_lucene_store_NativePosixUtil
* Method: posix_fadvise
* Signature: (Ljava/io/FileDescriptor;JJI)V
*/
extern "C"
JNIEXPORT jint JNICALL Java_org_apache_lucene_store_NativePosixUtil_posix_1fadvise(JNIEnv *env, jclass _ignore, jobject fileDescriptor, jlong offset, jlong len, jint advice)
{
jfieldID field_fd;
jmethodID const_fdesc;
jclass ioex = env->FindClass("java/io/IOException");
if (ioex == NULL) {
return -1;
}
jclass fdesc = env->FindClass("java/io/FileDescriptor");
if (fdesc == NULL) {
return -2;
}
// read the int fd field
jfieldID fdField = env->GetFieldID(fdesc, "fd", "I");
if (fdField == NULL) {
return -3;
}
int fd = env->GetIntField(fileDescriptor, fdField);
//printf("fd=%d\n", fd); fflush(stdout);
int osAdvice;
switch(advice) {
case 0:
osAdvice = POSIX_FADV_NORMAL;
break;
case 1:
osAdvice = POSIX_FADV_SEQUENTIAL;
break;
case 2:
osAdvice = POSIX_FADV_RANDOM;
break;
case 3:
osAdvice = POSIX_FADV_WILLNEED;
break;
case 4:
osAdvice = POSIX_FADV_DONTNEED;
break;
case 5:
osAdvice = POSIX_FADV_NOREUSE;
break;
}
int result = posix_fadvise(fd, (off_t) offset, (off_t) len, osAdvice);
if (result == 0) {
// ok
} else {
env->ThrowNew(ioex, strerror(errno));
return -1;
}
return 0;
}
/*
* Class: org_apache_lucene_store_NativePosixUtil
* Method: open_direct
* Signature: (Ljava/lang/String;Z)Ljava/io/FileDescriptor;
*/
extern "C"
JNIEXPORT jobject JNICALL Java_org_apache_lucene_store_NativePosixUtil_open_1direct(JNIEnv *env, jclass _ignore, jstring filename, jboolean readOnly)
{
jfieldID field_fd;
jmethodID const_fdesc;
jclass class_fdesc, class_ioex;
jobject ret;
int fd;
char *fname;
class_ioex = env->FindClass("java/io/IOException");
if (class_ioex == NULL) return NULL;
class_fdesc = env->FindClass("java/io/FileDescriptor");
if (class_fdesc == NULL) return NULL;
fname = (char *) env->GetStringUTFChars(filename, NULL);
if (readOnly) {
fd = open(fname, O_RDONLY | O_DIRECT);
} else {
fd = open(fname, O_RDWR | O_CREAT | O_DIRECT);
}
//printf("open %s -> %d; ro %d\n", fname, fd, readOnly); fflush(stdout);
env->ReleaseStringUTFChars(filename, fname);
if (fd < 0) {
// open returned an error. Throw an IOException with the error string
env->ThrowNew(class_ioex, strerror(errno));
return NULL;
}
// construct a new FileDescriptor
const_fdesc = env->GetMethodID(class_fdesc, "<init>", "()V");
if (const_fdesc == NULL) return NULL;
ret = env->NewObject(class_fdesc, const_fdesc);
// poke the "fd" field with the file descriptor
field_fd = env->GetFieldID(class_fdesc, "fd", "I");
if (field_fd == NULL) return NULL;
env->SetIntField(ret, field_fd, fd);
// and return it
return ret;
}
/*
* Class: org_apache_lucene_store_NativePosixUtil
* Method: pread
* Signature: (Ljava/io/FileDescriptor;JLjava/nio/ByteBuffer;)I
*/
extern "C"
JNIEXPORT jlong JNICALL Java_org_apache_lucene_store_NativePosixUtil_pread(JNIEnv *env, jclass _ignore, jobject jfd, jlong pos, jobject byteBuf)
{
// get int fd:
jclass class_fdesc = env->FindClass("java/io/FileDescriptor");
if (class_fdesc == NULL) {
return -1;
}
jfieldID field_fd = env->GetFieldID(class_fdesc, "fd", "I");
if (field_fd == NULL) {
return -1;
}
const int fd = env->GetIntField(jfd, field_fd);
void *p = env->GetDirectBufferAddress(byteBuf);
if (p == NULL) {
return -1;
}
size_t size = (size_t) env->GetDirectBufferCapacity(byteBuf);
if (size <= 0) {
return -1;
}
size_t numBytesRead = pread(fd, p, (size_t) size, (off_t) pos);
if (numBytesRead == -1) {
jclass class_ioex = env->FindClass("java/io/IOException");
if (class_ioex == NULL) {
return -1;
}
env->ThrowNew(class_ioex, strerror(errno));
return -1;
}
return (jlong) numBytesRead;
}
/*
* Class: org_apache_lucene_store_NativePosixUtil
* Method: posix_madvise
* Signature: (Ljava/nio/ByteBuffer;I)I
*/
extern "C"
JNIEXPORT jint JNICALL Java_org_apache_lucene_store_NativePosixUtil_posix_1madvise(JNIEnv *env, jclass _ignore, jobject buffer, jint advice) {
void *p = env->GetDirectBufferAddress(buffer);
if (p == NULL) {
return -1;
}
size_t size = (size_t) env->GetDirectBufferCapacity(buffer);
if (size <= 0) {
return -1;
}
int page = getpagesize();
// round start down to start of page
long long start = (long long) p;
start = start & (~(page-1));
// round end up to start of page
long long end = start + size;
end = (end + page-1)&(~(page-1));
size = (end-start);
int osAdvice;
switch(advice) {
case 0:
osAdvice = POSIX_MADV_NORMAL;
break;
case 1:
osAdvice = POSIX_MADV_SEQUENTIAL;
break;
case 2:
osAdvice = POSIX_MADV_RANDOM;
break;
case 3:
osAdvice = POSIX_MADV_WILLNEED;
break;
case 4:
osAdvice = POSIX_MADV_DONTNEED;
break;
case 5:
return -1;
break;
}
//printf("DO posix_madvise: %lx %d\n", p, size);fflush(stdout);
if (posix_madvise((void *) start, size, osAdvice) != 0) {
jclass class_ioex = env->FindClass("java/io/IOException");
if (class_ioex == NULL) {
return -1;
}
env->ThrowNew(class_ioex, strerror(errno));
return -1;
}
return 0;
}
/*
* Class: org_apache_lucene_store_NativePosixUtil
* Method: madvise
* Signature: (Ljava/nio/ByteBuffer;I)I
*/
extern "C"
JNIEXPORT jint JNICALL Java_org_apache_lucene_store_NativePosixUtil_madvise(JNIEnv *env, jclass _ignore, jobject buffer, jint advice) {
void *p = env->GetDirectBufferAddress(buffer);
if (p == NULL) {
return -1;
}
size_t size = (size_t) env->GetDirectBufferCapacity(buffer);
if (size <= 0) {
return -1;
}
int page = getpagesize();
// round start down to start of page
long long start = (long long) p;
start = start & (~(page-1));
// round end up to start of page
long long end = start + size;
end = (end + page-1)&(~(page-1));
size = (end-start);
int osAdvice;
switch(advice) {
case 0:
osAdvice = MADV_NORMAL;
break;
case 1:
osAdvice = MADV_SEQUENTIAL;
break;
case 2:
osAdvice = MADV_RANDOM;
break;
case 3:
osAdvice = MADV_WILLNEED;
break;
case 4:
osAdvice = MADV_DONTNEED;
break;
case 5:
return -1;
break;
}
//printf("DO madvise: page=%d p=0x%lx 0x%lx size=0x%lx\n", page, p, start, size);fflush(stdout);
if (madvise((void *) start, size, osAdvice) != 0) {
jclass class_ioex = env->FindClass("java/io/IOException");
if (class_ioex == NULL) {
return -1;
}
env->ThrowNew(class_ioex, strerror(errno));
return -1;
}
return 0;
}

View File

@ -0,0 +1,49 @@
package org.apache.lucene.store;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.FileDescriptor;
import java.nio.ByteBuffer;
public final class NativePosixUtil {
public final static int NORMAL = 0;
public final static int SEQUENTIAL = 1;
public final static int RANDOM = 2;
public final static int WILLNEED = 3;
public final static int DONTNEED = 4;
public final static int NOREUSE = 5;
static {
System.loadLibrary("NativePosixUtil");
}
private static native int posix_fadvise(FileDescriptor fd, long offset, long len, int advise) throws IOException;
public static native int posix_madvise(ByteBuffer buf, int advise) throws IOException;
public static native int madvise(ByteBuffer buf, int advise) throws IOException;
public static native FileDescriptor open_direct(String filename, boolean read) throws IOException;
public static native long pread(FileDescriptor fd, long pos, ByteBuffer byteBuf) throws IOException;
public static void advise(FileDescriptor fd, long offset, long len, int advise) throws IOException {
final int code = posix_fadvise(fd, offset, len, advise);
if (code != 0) {
throw new RuntimeException("posix_fadvise failed code=" + code);
}
}
}

View File

@ -21,6 +21,57 @@
</title>
</head>
<body>
miscellaneous
</body>
</html>
<h2>Misc Tools</h2>
The misc package has various tools for splitting/merging indices,
changing norms, finding high freq terms, and others.
<h2>DirectIOLinuxDirectory</h2>
<p>
<b>NOTE</b>: This uses C++ sources (accessible via JNI), which you'll
have to compile on your platform. Further, this is a very
platform-specific extensions (runs only on Linux, and likely only on
2.6.x kernels).
<p>
DirectIOLinuxDirectory is a Directory implementation that bypasses the
OS's buffer cache for any IndexInput and IndexOutput opened through it
(using the linux-specific O_DIRECT flag).
<p>
Note that doing so typically results in bad performance loss! You
should not use this for searching, but rather for indexing (or maybe
just merging during indexing), to avoid evicting useful pages from the
buffer cache.
See <a target=_top href="http://chbits.blogspot.com/2010/06/lucene-and-fadvisemadvise.html">here</a>
for details.
Steps to build:
<ul>
<li> <tt>cd lucene/contrib/misc/src/java/org/apache/lucene/store</tt>
<li> Compile NativePosixUtil.cpp -> libNativePosixUtil.so. On linux, something like <tt>gcc -fPIC -o libNativePosixUtil.so -shared -Wl,-soname,libNativePosixUtil.so -I$JAVA_HOME/include -I$JAVA_HOME/include/linux NativePosixUtil.cpp -lc -lstdc++</tt>. Add <tt>-m64</tt> if you want to compile 64bit (and java must be run with -d64 so it knows to load a 64bit dynamic lib).
<li> Make sure libNativePosixUtil.so is on your LD_LIBRARY_PATH so java can find it (something like <tt>export LD_LIBRARY_PATH=/path/to/dir:$LD_LIBRARY_PATH</tt>, where /path/to/dir contains libNativePosixUtil.so)
<li> <tt>ant jar</tt> to compile the java source and put that JAR on your CLASSPATH
</ul>
<p>
To use this, you'll likely want to make a custom subclass of
FSDirectory that only opens direct IndexInput/Output for merging. One
hackish way to do this is to check if the current thread's name starts
with "Lucene Merge Thread". Alternatively, you could use this Dir as
is for all indexing ops, but not for searching.
<p>
NativePosixUtil.cpp/java also expose access to the posix_madvise,
madvise, posix_fadvise functions, which are somewhat more cross
platform than O_DIRECT, however, in testing (see above link), these
APIs did not seem to help prevent buffer cache eviction.
</body>
</html>