mirror of https://github.com/apache/lucene.git
LUCENE-8982: Make NativeUnixDirectory pure java with FileChannel direct IO flag, and rename to DirectIODirectory (#2052)
LUCENE-8982: Make NativeUnixDirectory pure java with FileChannel direct IO flag, and rename to DirectIODirectory
This commit is contained in:
parent
eb24e95731
commit
a7747b63b4
|
@ -15,6 +15,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.tools.ant.taskdefs.condition.Os
|
||||
|
||||
// This is the master switch to disable all tasks that compile
|
||||
// native (cpp) code.
|
||||
rootProject.ext {
|
||||
|
@ -52,7 +54,7 @@ configure(javaProjectsWithNativeDeps, {
|
|||
}
|
||||
|
||||
// Only copy and attach native deps if native build is enabled.
|
||||
if (buildNative) {
|
||||
if (buildNative && Os.isFamily(Os.FAMILY_WINDOWS)) {
|
||||
task copyNativeDeps(type: Sync) {
|
||||
from configurations.nativeDeps
|
||||
into nativeDepsDir
|
||||
|
|
|
@ -63,7 +63,7 @@ grant {
|
|||
permission java.lang.RuntimePermission "setContextClassLoader";
|
||||
|
||||
// Needed for loading native library (lucene:misc:native) in lucene:misc
|
||||
permission java.lang.RuntimePermission "loadLibrary.LuceneNativeIO";
|
||||
permission java.lang.RuntimePermission "getFileStoreAttributes";
|
||||
permission java.lang.RuntimePermission "writeFileDescriptor";
|
||||
|
||||
// TestLockFactoriesMultiJVM opens a random port on 127.0.0.1 (port 0 = ephemeral port range):
|
||||
|
|
|
@ -174,6 +174,9 @@ Improvements
|
|||
|
||||
* LUCENE-9605: Update snowball to d8cf01ddf37a, adds Yiddish stemmer. (Robert Muir)
|
||||
|
||||
* LUCENE-8982: Make NativeUnixDirectory pure java with FileChannel direct IO flag,
|
||||
and rename to DirectIODirectory (Zach Chen, Uwe Schindler, Mike McCandless, Dawid Weiss).
|
||||
|
||||
Bug fixes
|
||||
|
||||
* LUCENE-8663: NRTCachingDirectory.slowFileExists may open a file while
|
||||
|
|
|
@ -30,8 +30,6 @@ library {
|
|||
|
||||
// Native build for Windows platform will be added in later stage
|
||||
targetMachines = [
|
||||
machines.linux.x86_64,
|
||||
machines.macOS.x86_64,
|
||||
machines.windows.x86_64
|
||||
]
|
||||
|
||||
|
@ -39,8 +37,6 @@ library {
|
|||
// (plugin won't find the toolchain).
|
||||
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
|
||||
source.from file("${projectDir}/src/main/windows")
|
||||
} else if (Os.isFamily(Os.FAMILY_UNIX) || Os.isFamily(Os.FAMILY_MAC)) {
|
||||
source.from file("${projectDir}/src/main/posix")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,11 +47,7 @@ tasks.withType(CppCompile).configureEach {
|
|||
// is present.
|
||||
systemIncludes.from file("${javaHome}/include")
|
||||
|
||||
for (def path : [
|
||||
file("${javaHome}/include/win32"),
|
||||
file("${javaHome}/include/darwin"),
|
||||
file("${javaHome}/include/linux"),
|
||||
file("${javaHome}/include/solaris")]) {
|
||||
for (def path : [file("${javaHome}/include/win32")]) {
|
||||
if (path.exists()) {
|
||||
systemIncludes.from path
|
||||
}
|
||||
|
|
|
@ -1,346 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to You under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
#ifdef LINUX
|
||||
#define DIRECT_FLAG O_DIRECT | O_NOATIME
|
||||
#define LINUX
|
||||
#elif __APPLE__
|
||||
#define DIRECT_FLAG 0
|
||||
#define OSX
|
||||
#else
|
||||
#define DIRECT_FLAG O_DIRECT // __unix__ is not used as even Linux falls under it.
|
||||
#endif
|
||||
|
||||
#include <jni.h>
|
||||
#include <fcntl.h> // posix_fadvise, constants for open
|
||||
#include <string.h> // strerror
|
||||
#include <errno.h> // errno
|
||||
#include <unistd.h> // pread
|
||||
#include <sys/mman.h> // posix_madvise, madvise
|
||||
#include <sys/types.h> // constants for open
|
||||
#include <sys/stat.h> // constants for open
|
||||
|
||||
// java -cp .:lib/junit-4.10.jar:./build/classes/test:./build/classes/java:./build/classes/demo -Dlucene.version=2.9-dev -DtempDir=build -ea org.junit.runner.JUnitCore org.apache.lucene.index.TestDoc
|
||||
|
||||
#ifdef LINUX
|
||||
/*
|
||||
* Class: org_apache_lucene_misc_store_NativePosixUtil
|
||||
* Method: posix_fadvise
|
||||
* Signature: (Ljava/io/FileDescriptor;JJI)V
|
||||
*/
|
||||
extern "C"
|
||||
JNIEXPORT jint JNICALL Java_org_apache_lucene_misc_store_NativePosixUtil_posix_1fadvise(JNIEnv *env, jclass _ignore, jobject fileDescriptor, jlong offset, jlong len, jint advice)
|
||||
{
|
||||
jfieldID field_fd;
|
||||
jmethodID const_fdesc;
|
||||
|
||||
jclass ioex = env->FindClass("java/io/IOException");
|
||||
if (ioex == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
jclass fdesc = env->FindClass("java/io/FileDescriptor");
|
||||
if (fdesc == NULL) {
|
||||
return -2;
|
||||
}
|
||||
|
||||
// read the int fd field
|
||||
jfieldID fdField = env->GetFieldID(fdesc, "fd", "I");
|
||||
if (fdField == NULL) {
|
||||
return -3;
|
||||
}
|
||||
|
||||
int fd = env->GetIntField(fileDescriptor, fdField);
|
||||
//printf("fd=%d\n", fd); fflush(stdout);
|
||||
|
||||
int osAdvice;
|
||||
switch(advice) {
|
||||
|
||||
case 0:
|
||||
osAdvice = POSIX_FADV_NORMAL;
|
||||
break;
|
||||
case 1:
|
||||
osAdvice = POSIX_FADV_SEQUENTIAL;
|
||||
break;
|
||||
case 2:
|
||||
osAdvice = POSIX_FADV_RANDOM;
|
||||
break;
|
||||
case 3:
|
||||
osAdvice = POSIX_FADV_WILLNEED;
|
||||
break;
|
||||
case 4:
|
||||
osAdvice = POSIX_FADV_DONTNEED;
|
||||
break;
|
||||
case 5:
|
||||
osAdvice = POSIX_FADV_NOREUSE;
|
||||
break;
|
||||
}
|
||||
|
||||
int result = posix_fadvise(fd, (off_t) offset, (off_t) len, osAdvice);
|
||||
if (result == 0) {
|
||||
// ok
|
||||
} else {
|
||||
env->ThrowNew(ioex, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Class: org_apache_lucene_misc_store_NativePosixUtil
|
||||
* Method: open_direct
|
||||
* Signature: (Ljava/lang/String;Z)Ljava/io/FileDescriptor;
|
||||
*/
|
||||
extern "C"
|
||||
JNIEXPORT jobject JNICALL Java_org_apache_lucene_misc_store_NativePosixUtil_open_1direct(JNIEnv *env, jclass _ignore, jstring filename, jboolean readOnly)
|
||||
{
|
||||
jfieldID field_fd;
|
||||
jmethodID const_fdesc;
|
||||
jclass class_fdesc, class_ioex;
|
||||
jobject ret;
|
||||
int fd;
|
||||
char *fname;
|
||||
|
||||
class_ioex = env->FindClass("java/io/IOException");
|
||||
if (class_ioex == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
class_fdesc = env->FindClass("java/io/FileDescriptor");
|
||||
if (class_fdesc == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
fname = (char *) env->GetStringUTFChars(filename, NULL);
|
||||
|
||||
if (readOnly) {
|
||||
fd = open(fname, O_RDONLY | DIRECT_FLAG);
|
||||
#ifdef OSX
|
||||
fcntl(fd, F_NOCACHE, 1);
|
||||
#endif
|
||||
} else {
|
||||
fd = open(fname, O_RDWR | O_CREAT | DIRECT_FLAG, 0666);
|
||||
#ifdef OSX
|
||||
fcntl(fd, F_NOCACHE, 1);
|
||||
#endif
|
||||
}
|
||||
|
||||
//printf("open %s -> %d; ro %d\n", fname, fd, readOnly); fflush(stdout);
|
||||
|
||||
env->ReleaseStringUTFChars(filename, fname);
|
||||
|
||||
if (fd < 0) {
|
||||
// open returned an error. Throw an IOException with the error string
|
||||
env->ThrowNew(class_ioex, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// construct a new FileDescriptor
|
||||
const_fdesc = env->GetMethodID(class_fdesc, "<init>", "()V");
|
||||
if (const_fdesc == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
ret = env->NewObject(class_fdesc, const_fdesc);
|
||||
|
||||
// poke the "fd" field with the file descriptor
|
||||
field_fd = env->GetFieldID(class_fdesc, "fd", "I");
|
||||
if (field_fd == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
env->SetIntField(ret, field_fd, fd);
|
||||
|
||||
// and return it
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_lucene_misc_store_NativePosixUtil
|
||||
* Method: pread
|
||||
* Signature: (Ljava/io/FileDescriptor;JLjava/nio/ByteBuffer;)I
|
||||
*/
|
||||
extern "C"
|
||||
JNIEXPORT jlong JNICALL Java_org_apache_lucene_misc_store_NativePosixUtil_pread(JNIEnv *env, jclass _ignore, jobject jfd, jlong pos, jobject byteBuf)
|
||||
{
|
||||
// get int fd:
|
||||
jclass class_fdesc = env->FindClass("java/io/FileDescriptor");
|
||||
if (class_fdesc == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
jfieldID field_fd = env->GetFieldID(class_fdesc, "fd", "I");
|
||||
if (field_fd == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int fd = env->GetIntField(jfd, field_fd);
|
||||
|
||||
void *p = env->GetDirectBufferAddress(byteBuf);
|
||||
if (p == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t size = (size_t) env->GetDirectBufferCapacity(byteBuf);
|
||||
if (size <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t numBytesRead = pread(fd, p, (size_t) size, (off_t) pos);
|
||||
if (numBytesRead == -1) {
|
||||
jclass class_ioex = env->FindClass("java/io/IOException");
|
||||
if (class_ioex == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
env->ThrowNew(class_ioex, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return (jlong) numBytesRead;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class: org_apache_lucene_misc_store_NativePosixUtil
|
||||
* Method: posix_madvise
|
||||
* Signature: (Ljava/nio/ByteBuffer;I)I
|
||||
*/
|
||||
extern "C"
|
||||
JNIEXPORT jint JNICALL Java_org_apache_lucene_misc_store_NativePosixUtil_posix_1madvise(JNIEnv *env, jclass _ignore, jobject buffer, jint advice) {
|
||||
void *p = env->GetDirectBufferAddress(buffer);
|
||||
if (p == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t size = (size_t) env->GetDirectBufferCapacity(buffer);
|
||||
if (size <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int page = getpagesize();
|
||||
|
||||
// round start down to start of page
|
||||
long long start = (long long) p;
|
||||
start = start & (~(page-1));
|
||||
|
||||
// round end up to start of page
|
||||
long long end = start + size;
|
||||
end = (end + page-1)&(~(page-1));
|
||||
size = (end-start);
|
||||
|
||||
int osAdvice;
|
||||
switch(advice) {
|
||||
case 0:
|
||||
osAdvice = POSIX_MADV_NORMAL;
|
||||
break;
|
||||
case 1:
|
||||
osAdvice = POSIX_MADV_SEQUENTIAL;
|
||||
break;
|
||||
case 2:
|
||||
osAdvice = POSIX_MADV_RANDOM;
|
||||
break;
|
||||
case 3:
|
||||
osAdvice = POSIX_MADV_WILLNEED;
|
||||
break;
|
||||
case 4:
|
||||
osAdvice = POSIX_MADV_DONTNEED;
|
||||
break;
|
||||
case 5:
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
//printf("DO posix_madvise: %lx %d\n", p, size);fflush(stdout);
|
||||
|
||||
if (posix_madvise((void *) start, size, osAdvice) != 0) {
|
||||
jclass class_ioex = env->FindClass("java/io/IOException");
|
||||
if (class_ioex == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
env->ThrowNew(class_ioex, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Class: org_apache_lucene_misc_store_NativePosixUtil
|
||||
* Method: madvise
|
||||
* Signature: (Ljava/nio/ByteBuffer;I)I
|
||||
*/
|
||||
extern "C"
|
||||
JNIEXPORT jint JNICALL Java_org_apache_lucene_misc_store_NativePosixUtil_madvise(JNIEnv *env, jclass _ignore, jobject buffer, jint advice) {
|
||||
void *p = env->GetDirectBufferAddress(buffer);
|
||||
if (p == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t size = (size_t) env->GetDirectBufferCapacity(buffer);
|
||||
if (size <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int page = getpagesize();
|
||||
|
||||
// round start down to start of page
|
||||
long long start = (long long) p;
|
||||
start = start & (~(page-1));
|
||||
|
||||
// round end up to start of page
|
||||
long long end = start + size;
|
||||
end = (end + page-1)&(~(page-1));
|
||||
size = (end-start);
|
||||
|
||||
int osAdvice;
|
||||
switch(advice) {
|
||||
case 0:
|
||||
osAdvice = MADV_NORMAL;
|
||||
break;
|
||||
case 1:
|
||||
osAdvice = MADV_SEQUENTIAL;
|
||||
break;
|
||||
case 2:
|
||||
osAdvice = MADV_RANDOM;
|
||||
break;
|
||||
case 3:
|
||||
osAdvice = MADV_WILLNEED;
|
||||
break;
|
||||
case 4:
|
||||
osAdvice = MADV_DONTNEED;
|
||||
break;
|
||||
case 5:
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
//printf("DO madvise: page=%d p=0x%lx 0x%lx size=0x%lx\n", page, p, start, size);fflush(stdout);
|
||||
|
||||
if (madvise((void *) start, size, osAdvice) != 0) {
|
||||
jclass class_ioex = env->FindClass("java/io/IOException");
|
||||
if (class_ioex == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
env->ThrowNew(class_ioex, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,443 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.misc.store;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.zip.CRC32;
|
||||
import java.util.zip.Checksum;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.apache.lucene.store.IOContext.Context;
|
||||
|
||||
/**
|
||||
* A {@link Directory} implementation for all Unixes and Windows that uses DIRECT I/O to bypass OS
|
||||
* level IO caching during merging. For all other cases (searching, writing) we delegate to the
|
||||
* provided Directory instance.
|
||||
*
|
||||
* <p>See <a href="{@docRoot}/overview-summary.html#DirectIODirectory">Overview</a> for more
|
||||
* details.
|
||||
*
|
||||
* <p><b>WARNING</b>: this code is very new and quite easily could contain horrible bugs.
|
||||
*
|
||||
* <p>This directory passes Solr and Lucene tests on Linux, OS X, and Windows; other systems should
|
||||
* work but have not been tested! Use at your own risk.
|
||||
*
|
||||
* <p>@throws UnsupportedOperationException if the operating system, file system or JDK does not
|
||||
* support Direct I/O or a sufficient equivalent.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class DirectIODirectory extends FilterDirectory {
|
||||
|
||||
/**
|
||||
* Default buffer size before writing to disk (256 KB); larger means less IO load but more RAM and
|
||||
* direct buffer storage space consumed during merging.
|
||||
*/
|
||||
public static final int DEFAULT_MERGE_BUFFER_SIZE = 256 * 1024;
|
||||
|
||||
/** Default min expected merge size before direct IO is used (10 MB): */
|
||||
public static final long DEFAULT_MIN_BYTES_DIRECT = 10 * 1024 * 1024;
|
||||
|
||||
private final int blockSize, mergeBufferSize;
|
||||
private final long minBytesDirect;
|
||||
|
||||
volatile boolean isOpen = true;
|
||||
|
||||
/**
|
||||
* Reference to {@code com.sun.nio.file.ExtendedOpenOption.DIRECT} by reflective class and enum
|
||||
* lookup. There are two reasons for using this instead of directly referencing
|
||||
* ExtendedOpenOption.DIRECT:
|
||||
*
|
||||
* <ol>
|
||||
* <li>ExtendedOpenOption.DIRECT is OpenJDK's internal proprietary API. This API causes
|
||||
* un-suppressible(?) warning to be emitted when compiling with --release flag and value N,
|
||||
* where N is smaller than the the version of javac used for compilation. For details,
|
||||
* please refer to https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8259039.
|
||||
* <li>It is possible that Lucene is run using JDK that does not support
|
||||
* ExtendedOpenOption.DIRECT. In such a case, dynamic lookup allows us to bail out with
|
||||
* UnsupportedOperationException with meaningful error message.
|
||||
* </ol>
|
||||
*
|
||||
* <p>This reference is {@code null}, if the JDK does not support direct I/O.
|
||||
*/
|
||||
static final OpenOption ExtendedOpenOption_DIRECT; // visible for test
|
||||
|
||||
static {
|
||||
OpenOption option;
|
||||
try {
|
||||
final Class<? extends OpenOption> clazz =
|
||||
Class.forName("com.sun.nio.file.ExtendedOpenOption").asSubclass(OpenOption.class);
|
||||
option =
|
||||
Arrays.stream(clazz.getEnumConstants())
|
||||
.filter(e -> e.toString().equalsIgnoreCase("DIRECT"))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
} catch (Exception e) {
|
||||
option = null;
|
||||
}
|
||||
ExtendedOpenOption_DIRECT = option;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new DirectIODirectory for the named location.
|
||||
*
|
||||
* @param delegate Directory for non-merges, also used as reference to file system path.
|
||||
* @param mergeBufferSize Size of buffer to use for merging.
|
||||
* @param minBytesDirect Merges, or files to be opened for reading, smaller than this will not use
|
||||
* direct IO. See {@link #DEFAULT_MIN_BYTES_DIRECT} and {@link #useDirectIO}.
|
||||
* @throws IOException If there is a low-level I/O error
|
||||
*/
|
||||
public DirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minBytesDirect)
|
||||
throws IOException {
|
||||
super(delegate);
|
||||
this.blockSize = Math.toIntExact(Files.getFileStore(delegate.getDirectory()).getBlockSize());
|
||||
this.mergeBufferSize = mergeBufferSize;
|
||||
this.minBytesDirect = minBytesDirect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new DirectIODirectory for the named location.
|
||||
*
|
||||
* @param delegate Directory for non-merges, also used as reference to file system path.
|
||||
* @throws IOException If there is a low-level I/O error
|
||||
*/
|
||||
public DirectIODirectory(FSDirectory delegate) throws IOException {
|
||||
this(delegate, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT);
|
||||
}
|
||||
|
||||
/** @return the underlying file system directory */
|
||||
public Path getDirectory() {
|
||||
return ((FSDirectory) in).getDirectory();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void ensureOpen() throws AlreadyClosedException {
|
||||
if (!isOpen) {
|
||||
throw new AlreadyClosedException("this Directory is closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if direct IO should be used for a file. By default this tests if it is a merge
|
||||
* context and if the merge or file length extends the minimum size (see {@link
|
||||
* #DEFAULT_MIN_BYTES_DIRECT}). Subclasses may override method to enforce direct IO for specific
|
||||
* file types.
|
||||
*
|
||||
* @param name file name (unused by default implementation)
|
||||
* @param context information about merge size
|
||||
* @param fileLength if available, gives the file length. Will be empty when requesting an {@link
|
||||
* IndexOutput}.
|
||||
* @return {@code true} if direct IO should be used; {@code false} if input/output should be
|
||||
* requested from delegate directory.
|
||||
*/
|
||||
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
|
||||
return context.context == Context.MERGE
|
||||
&& context.mergeInfo.estimatedMergeBytes >= minBytesDirect
|
||||
&& fileLength.orElse(minBytesDirect) >= minBytesDirect;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) {
|
||||
return new DirectIOIndexInput(getDirectory().resolve(name), blockSize, mergeBufferSize);
|
||||
} else {
|
||||
return in.openInput(name, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
if (useDirectIO(name, context, OptionalLong.empty())) {
|
||||
return new DirectIOIndexOutput(
|
||||
getDirectory().resolve(name), name, blockSize, mergeBufferSize);
|
||||
} else {
|
||||
return in.createOutput(name, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
isOpen = false;
|
||||
super.close();
|
||||
}
|
||||
|
||||
private static OpenOption getDirectOpenOption() {
|
||||
if (ExtendedOpenOption_DIRECT == null) {
|
||||
throw new UnsupportedOperationException(
|
||||
"com.sun.nio.file.ExtendedOpenOption.DIRECT is not available in the current JDK version.");
|
||||
}
|
||||
return ExtendedOpenOption_DIRECT;
|
||||
}
|
||||
|
||||
private static final class DirectIOIndexOutput extends IndexOutput {
|
||||
private final ByteBuffer buffer;
|
||||
private final FileChannel channel;
|
||||
private final Checksum digest;
|
||||
|
||||
private long filePos;
|
||||
private boolean isOpen;
|
||||
|
||||
/**
|
||||
* Creates a new instance of DirectIOIndexOutput for writing index output with direct IO
|
||||
* bypassing OS buffer
|
||||
*
|
||||
* @throws UnsupportedOperationException if the operating system, file system or JDK does not
|
||||
* support Direct I/O or a sufficient equivalent.
|
||||
*/
|
||||
public DirectIOIndexOutput(Path path, String name, int blockSize, int bufferSize)
|
||||
throws IOException {
|
||||
super("DirectIOIndexOutput(path=\"" + path.toString() + "\")", name);
|
||||
|
||||
channel =
|
||||
FileChannel.open(
|
||||
path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, getDirectOpenOption());
|
||||
buffer = ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize);
|
||||
digest = new BufferedChecksum(new CRC32());
|
||||
|
||||
isOpen = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
buffer.put(b);
|
||||
digest.update(b);
|
||||
if (!buffer.hasRemaining()) {
|
||||
dump();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] src, int offset, int len) throws IOException {
|
||||
int toWrite = len;
|
||||
while (true) {
|
||||
final int left = buffer.remaining();
|
||||
if (left <= toWrite) {
|
||||
buffer.put(src, offset, left);
|
||||
digest.update(src, offset, left);
|
||||
toWrite -= left;
|
||||
offset += left;
|
||||
dump();
|
||||
} else {
|
||||
buffer.put(src, offset, toWrite);
|
||||
digest.update(src, offset, toWrite);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void dump() throws IOException {
|
||||
final int size = buffer.position();
|
||||
|
||||
// we need to rewind, as we have to write full blocks (we truncate file later):
|
||||
buffer.rewind();
|
||||
|
||||
channel.write(buffer, filePos);
|
||||
filePos += size;
|
||||
|
||||
buffer.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return filePos + buffer.position();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getChecksum() {
|
||||
return digest.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (isOpen) {
|
||||
isOpen = false;
|
||||
try {
|
||||
dump();
|
||||
} finally {
|
||||
try (FileChannel ch = channel) {
|
||||
ch.truncate(getFilePointer());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DirectIOIndexInput extends IndexInput {
|
||||
private final ByteBuffer buffer;
|
||||
private final FileChannel channel;
|
||||
private final int blockSize;
|
||||
|
||||
private boolean isOpen;
|
||||
private boolean isClone;
|
||||
private long filePos;
|
||||
|
||||
/**
|
||||
* Creates a new instance of DirectIOIndexInput for reading index input with direct IO bypassing
|
||||
* OS buffer
|
||||
*
|
||||
* @throws UnsupportedOperationException if the operating system, file system or JDK does not
|
||||
* support Direct I/O or a sufficient equivalent.
|
||||
*/
|
||||
public DirectIOIndexInput(Path path, int blockSize, int bufferSize) throws IOException {
|
||||
super("DirectIOIndexInput(path=\"" + path + "\")");
|
||||
this.blockSize = blockSize;
|
||||
|
||||
this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption());
|
||||
this.buffer = ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize);
|
||||
|
||||
isOpen = true;
|
||||
isClone = false;
|
||||
filePos = -bufferSize;
|
||||
buffer.limit(0);
|
||||
}
|
||||
|
||||
// for clone
|
||||
private DirectIOIndexInput(DirectIOIndexInput other) throws IOException {
|
||||
super(other.toString());
|
||||
this.channel = other.channel;
|
||||
this.blockSize = other.blockSize;
|
||||
|
||||
final int bufferSize = other.buffer.capacity();
|
||||
this.buffer = ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize);
|
||||
|
||||
isOpen = true;
|
||||
isClone = true;
|
||||
filePos = -bufferSize;
|
||||
buffer.limit(0);
|
||||
seek(other.getFilePointer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (isOpen && !isClone) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
long filePointer = filePos + buffer.position();
|
||||
|
||||
// opening the input and immediately calling getFilePointer without calling readX (and thus
|
||||
// refill) first,
|
||||
// will result in negative value equal to bufferSize being returned,
|
||||
// due to the initialization method filePos = -bufferSize used in constructor.
|
||||
assert filePointer == -buffer.capacity() || filePointer >= 0
|
||||
: "filePointer should either be initial value equal to negative buffer capacity, or larger than or equal to 0";
|
||||
return Math.max(filePointer, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
if (pos != getFilePointer()) {
|
||||
final long alignedPos = pos - (pos % blockSize);
|
||||
filePos = alignedPos - buffer.capacity();
|
||||
|
||||
final int delta = (int) (pos - alignedPos);
|
||||
refill();
|
||||
try {
|
||||
buffer.position(delta);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new EOFException("read past EOF: " + this);
|
||||
}
|
||||
}
|
||||
assert pos == getFilePointer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
try {
|
||||
return channel.size();
|
||||
} catch (IOException ioe) {
|
||||
throw new UncheckedIOException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
if (!buffer.hasRemaining()) {
|
||||
refill();
|
||||
}
|
||||
return buffer.get();
|
||||
}
|
||||
|
||||
private void refill() throws IOException {
|
||||
filePos += buffer.capacity();
|
||||
|
||||
// BaseDirectoryTestCase#testSeekPastEOF test for consecutive read past EOF,
|
||||
// hence throwing EOFException early to maintain buffer state (position in particular)
|
||||
if (filePos > channel.size()) {
|
||||
throw new EOFException("read past EOF: " + this);
|
||||
}
|
||||
|
||||
buffer.clear();
|
||||
try {
|
||||
// read may return -1 here iff filePos == channel.size(), but that's ok as it just reaches
|
||||
// EOF
|
||||
// when filePos > channel.size(), an EOFException will be thrown from above
|
||||
channel.read(buffer, filePos);
|
||||
} catch (IOException ioe) {
|
||||
throw new IOException(ioe.getMessage() + ": " + this, ioe);
|
||||
}
|
||||
|
||||
buffer.flip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] dst, int offset, int len) throws IOException {
|
||||
int toRead = len;
|
||||
while (true) {
|
||||
final int left = buffer.remaining();
|
||||
if (left < toRead) {
|
||||
buffer.get(dst, offset, left);
|
||||
toRead -= left;
|
||||
offset += left;
|
||||
refill();
|
||||
} else {
|
||||
buffer.get(dst, offset, toRead);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DirectIOIndexInput clone() {
|
||||
try {
|
||||
return new DirectIOIndexInput(this);
|
||||
} catch (IOException ioe) {
|
||||
throw new UncheckedIOException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput slice(String sliceDescription, long offset, long length) {
|
||||
// TODO: is this the right thing to do?
|
||||
return BufferedIndexInput.wrap(sliceDescription, this, offset, length);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.misc.store;
|
||||
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/** Provides JNI access to native methods such as madvise() for {@link NativeUnixDirectory} */
|
||||
public final class NativePosixUtil {
|
||||
public static final int NORMAL = 0;
|
||||
public static final int SEQUENTIAL = 1;
|
||||
public static final int RANDOM = 2;
|
||||
public static final int WILLNEED = 3;
|
||||
public static final int DONTNEED = 4;
|
||||
public static final int NOREUSE = 5;
|
||||
|
||||
static {
|
||||
System.loadLibrary("LuceneNativeIO");
|
||||
}
|
||||
|
||||
private static native int posix_fadvise(FileDescriptor fd, long offset, long len, int advise)
|
||||
throws IOException;
|
||||
|
||||
public static native int posix_madvise(ByteBuffer buf, int advise) throws IOException;
|
||||
|
||||
public static native int madvise(ByteBuffer buf, int advise) throws IOException;
|
||||
|
||||
public static native FileDescriptor open_direct(String filename, boolean read) throws IOException;
|
||||
|
||||
public static native long pread(FileDescriptor fd, long pos, ByteBuffer byteBuf)
|
||||
throws IOException;
|
||||
|
||||
public static void advise(FileDescriptor fd, long offset, long len, int advise)
|
||||
throws IOException {
|
||||
final int code = posix_fadvise(fd, offset, len, advise);
|
||||
if (code != 0) {
|
||||
throw new RuntimeException("posix_fadvise failed code=" + code);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,451 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.misc.store;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Path;
|
||||
import org.apache.lucene.store.BufferedIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.store.FSLockFactory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IOContext.Context;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.LockFactory;
|
||||
import org.apache.lucene.util.SuppressForbidden;
|
||||
|
||||
// TODO
|
||||
// - newer Linux kernel versions (after 2.6.29) have
|
||||
// improved MADV_SEQUENTIAL (and hopefully also
|
||||
// FADV_SEQUENTIAL) interaction with the buffer
|
||||
// cache; we should explore using that instead of direct
|
||||
// IO when context is merge
|
||||
|
||||
/**
|
||||
* A {@link Directory} implementation for all Unixes that uses DIRECT I/O to bypass OS level IO
|
||||
* caching during merging. For all other cases (searching, writing) we delegate to the provided
|
||||
* Directory instance.
|
||||
*
|
||||
* <p>See <a href="{@docRoot}/overview-summary.html#NativeUnixDirectory">Overview</a> for more
|
||||
* details.
|
||||
*
|
||||
* <p>To use this you must compile NativePosixUtil.cpp (exposes Linux-specific APIs through JNI) for
|
||||
* your platform, by running <code>./gradlew build</code>, and then putting the resulting <code>
|
||||
* libLuceneNativeIO.so</code> or <code>libLuceneNativeIO.dylib</code> (from <code>
|
||||
* lucene/misc/native/build/lib/release/platform/</code>) onto your dynamic linker search path.
|
||||
*
|
||||
* <p><b>WARNING</b>: this code is very new and quite easily could contain horrible bugs. For
|
||||
* example, here's one known issue: if you use seek in <code>IndexOutput</code>, and then write more
|
||||
* than one buffer's worth of bytes, then the file will be wrong. Lucene does not do this today
|
||||
* (only writes small number of bytes after seek), but that may change.
|
||||
*
|
||||
* <p>This directory passes Solr and Lucene tests on Linux and OS X; other Unixes should work but
|
||||
* have not been tested! Use at your own risk.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class NativeUnixDirectory extends FSDirectory {
|
||||
|
||||
// TODO: this is OS dependent, but likely 512 is the LCD
|
||||
private static final long ALIGN = 512;
|
||||
private static final long ALIGN_NOT_MASK = ~(ALIGN - 1);
|
||||
|
||||
/**
|
||||
* Default buffer size before writing to disk (256 KB); larger means less IO load but more RAM and
|
||||
* direct buffer storage space consumed during merging.
|
||||
*/
|
||||
public static final int DEFAULT_MERGE_BUFFER_SIZE = 262144;
|
||||
|
||||
/** Default min expected merge size before direct IO is used (10 MB): */
|
||||
public static final long DEFAULT_MIN_BYTES_DIRECT = 10 * 1024 * 1024;
|
||||
|
||||
private final int mergeBufferSize;
|
||||
private final long minBytesDirect;
|
||||
private final Directory delegate;
|
||||
|
||||
/**
|
||||
* Create a new NIOFSDirectory for the named location.
|
||||
*
|
||||
* @param path the path of the directory
|
||||
* @param lockFactory to use
|
||||
* @param mergeBufferSize Size of buffer to use for merging. See {@link
|
||||
* #DEFAULT_MERGE_BUFFER_SIZE}.
|
||||
* @param minBytesDirect Merges, or files to be opened for reading, smaller than this will not use
|
||||
* direct IO. See {@link #DEFAULT_MIN_BYTES_DIRECT}
|
||||
* @param delegate fallback Directory for non-merges
|
||||
* @throws IOException If there is a low-level I/O error
|
||||
*/
|
||||
public NativeUnixDirectory(
|
||||
Path path,
|
||||
int mergeBufferSize,
|
||||
long minBytesDirect,
|
||||
LockFactory lockFactory,
|
||||
Directory delegate)
|
||||
throws IOException {
|
||||
super(path, lockFactory);
|
||||
if ((mergeBufferSize & ALIGN) != 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"mergeBufferSize must be 0 mod " + ALIGN + " (got: " + mergeBufferSize + ")");
|
||||
}
|
||||
this.mergeBufferSize = mergeBufferSize;
|
||||
this.minBytesDirect = minBytesDirect;
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new NIOFSDirectory for the named location.
|
||||
*
|
||||
* @param path the path of the directory
|
||||
* @param lockFactory the lock factory to use
|
||||
* @param delegate fallback Directory for non-merges
|
||||
* @throws IOException If there is a low-level I/O error
|
||||
*/
|
||||
public NativeUnixDirectory(Path path, LockFactory lockFactory, Directory delegate)
|
||||
throws IOException {
|
||||
this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, lockFactory, delegate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new NIOFSDirectory for the named location with {@link FSLockFactory#getDefault()}.
|
||||
*
|
||||
* @param path the path of the directory
|
||||
* @param delegate fallback Directory for non-merges
|
||||
* @throws IOException If there is a low-level I/O error
|
||||
*/
|
||||
public NativeUnixDirectory(Path path, Directory delegate) throws IOException {
|
||||
this(
|
||||
path,
|
||||
DEFAULT_MERGE_BUFFER_SIZE,
|
||||
DEFAULT_MIN_BYTES_DIRECT,
|
||||
FSLockFactory.getDefault(),
|
||||
delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
if (context.context != Context.MERGE
|
||||
|| context.mergeInfo.estimatedMergeBytes < minBytesDirect
|
||||
|| fileLength(name) < minBytesDirect) {
|
||||
return delegate.openInput(name, context);
|
||||
} else {
|
||||
return new NativeUnixIndexInput(getDirectory().resolve(name), mergeBufferSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
if (context.context != Context.MERGE
|
||||
|| context.mergeInfo.estimatedMergeBytes < minBytesDirect) {
|
||||
return delegate.createOutput(name, context);
|
||||
} else {
|
||||
return new NativeUnixIndexOutput(getDirectory().resolve(name), name, mergeBufferSize);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor")
|
||||
private static final class NativeUnixIndexOutput extends IndexOutput {
|
||||
private final ByteBuffer buffer;
|
||||
private final FileOutputStream fos;
|
||||
private final FileChannel channel;
|
||||
private final int bufferSize;
|
||||
|
||||
// private final File path;
|
||||
|
||||
private int bufferPos;
|
||||
private long filePos;
|
||||
private long fileLength;
|
||||
private boolean isOpen;
|
||||
|
||||
public NativeUnixIndexOutput(Path path, String name, int bufferSize) throws IOException {
|
||||
super("NativeUnixIndexOutput(path=\"" + path.toString() + "\")", name);
|
||||
// this.path = path;
|
||||
final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
|
||||
fos = new FileOutputStream(fd);
|
||||
// fos = new FileOutputStream(path);
|
||||
channel = fos.getChannel();
|
||||
buffer = ByteBuffer.allocateDirect(bufferSize);
|
||||
this.bufferSize = bufferSize;
|
||||
isOpen = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
assert bufferPos == buffer.position()
|
||||
: "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
|
||||
buffer.put(b);
|
||||
if (++bufferPos == bufferSize) {
|
||||
dump();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] src, int offset, int len) throws IOException {
|
||||
int toWrite = len;
|
||||
while (true) {
|
||||
final int left = bufferSize - bufferPos;
|
||||
if (left <= toWrite) {
|
||||
buffer.put(src, offset, left);
|
||||
toWrite -= left;
|
||||
offset += left;
|
||||
bufferPos = bufferSize;
|
||||
dump();
|
||||
} else {
|
||||
buffer.put(src, offset, toWrite);
|
||||
bufferPos += toWrite;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void setLength() throws IOException {
|
||||
// TODO -- how to impl this? neither FOS nor
|
||||
// FileChannel provides an API?
|
||||
// }
|
||||
|
||||
private void dump() throws IOException {
|
||||
buffer.flip();
|
||||
final long limit = filePos + buffer.limit();
|
||||
if (limit > fileLength) {
|
||||
// this dump extends the file
|
||||
fileLength = limit;
|
||||
} else {
|
||||
// we had seek'd back & wrote some changes
|
||||
}
|
||||
|
||||
// must always round to next block
|
||||
buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
|
||||
|
||||
assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit()
|
||||
: "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
|
||||
assert (filePos & ALIGN_NOT_MASK) == filePos;
|
||||
// System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" +
|
||||
// buffer.limit() + " fos=" + fos);
|
||||
channel.write(buffer, filePos);
|
||||
filePos += bufferPos;
|
||||
bufferPos = 0;
|
||||
buffer.clear();
|
||||
// System.out.println("dump: done");
|
||||
|
||||
// TODO: the case where we'd seek'd back, wrote an
|
||||
// entire buffer, we must here read the next buffer;
|
||||
// likely Lucene won't trip on this since we only
|
||||
// write smallish amounts on seeking back
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return filePos + bufferPos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getChecksum() throws IOException {
|
||||
throw new UnsupportedOperationException("this directory currently does not work at all!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (isOpen) {
|
||||
isOpen = false;
|
||||
try {
|
||||
dump();
|
||||
} finally {
|
||||
try {
|
||||
// System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + "
|
||||
// path=" + path);
|
||||
channel.truncate(fileLength);
|
||||
// System.out.println(" now: " + channel.size());
|
||||
} finally {
|
||||
try {
|
||||
channel.close();
|
||||
} finally {
|
||||
fos.close();
|
||||
// System.out.println(" final len=" + path.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor")
|
||||
private static final class NativeUnixIndexInput extends IndexInput {
|
||||
private final ByteBuffer buffer;
|
||||
private final FileInputStream fis;
|
||||
private final FileChannel channel;
|
||||
private final int bufferSize;
|
||||
|
||||
private boolean isOpen;
|
||||
private boolean isClone;
|
||||
private long filePos;
|
||||
private int bufferPos;
|
||||
|
||||
public NativeUnixIndexInput(Path path, int bufferSize) throws IOException {
|
||||
super("NativeUnixIndexInput(path=\"" + path + "\")");
|
||||
final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
|
||||
fis = new FileInputStream(fd);
|
||||
channel = fis.getChannel();
|
||||
this.bufferSize = bufferSize;
|
||||
buffer = ByteBuffer.allocateDirect(bufferSize);
|
||||
isOpen = true;
|
||||
isClone = false;
|
||||
filePos = -bufferSize;
|
||||
bufferPos = bufferSize;
|
||||
// System.out.println("D open " + path + " this=" + this);
|
||||
}
|
||||
|
||||
// for clone
|
||||
public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException {
|
||||
super(other.toString());
|
||||
this.fis = null;
|
||||
channel = other.channel;
|
||||
this.bufferSize = other.bufferSize;
|
||||
buffer = ByteBuffer.allocateDirect(bufferSize);
|
||||
filePos = -bufferSize;
|
||||
bufferPos = bufferSize;
|
||||
isOpen = true;
|
||||
isClone = true;
|
||||
// System.out.println("D clone this=" + this);
|
||||
seek(other.getFilePointer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (isOpen && !isClone) {
|
||||
try {
|
||||
channel.close();
|
||||
} finally {
|
||||
if (!isClone) {
|
||||
fis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return filePos + bufferPos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
if (pos != getFilePointer()) {
|
||||
final long alignedPos = pos & ALIGN_NOT_MASK;
|
||||
filePos = alignedPos - bufferSize;
|
||||
|
||||
final int delta = (int) (pos - alignedPos);
|
||||
if (delta != 0) {
|
||||
refill();
|
||||
buffer.position(delta);
|
||||
bufferPos = delta;
|
||||
} else {
|
||||
// force refill on next read
|
||||
bufferPos = bufferSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
try {
|
||||
return channel.size();
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("IOException during length(): " + this, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
// NOTE: we don't guard against EOF here... ie the
|
||||
// "final" buffer will typically be filled to less
|
||||
// than bufferSize
|
||||
if (bufferPos == bufferSize) {
|
||||
refill();
|
||||
}
|
||||
assert bufferPos == buffer.position()
|
||||
: "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
|
||||
bufferPos++;
|
||||
return buffer.get();
|
||||
}
|
||||
|
||||
private void refill() throws IOException {
|
||||
buffer.clear();
|
||||
filePos += bufferSize;
|
||||
bufferPos = 0;
|
||||
assert (filePos & ALIGN_NOT_MASK) == filePos
|
||||
: "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
|
||||
// System.out.println("X refill filePos=" + filePos);
|
||||
int n;
|
||||
try {
|
||||
n = channel.read(buffer, filePos);
|
||||
} catch (IOException ioe) {
|
||||
throw new IOException(ioe.getMessage() + ": " + this, ioe);
|
||||
}
|
||||
if (n < 0) {
|
||||
throw new EOFException("read past EOF: " + this);
|
||||
}
|
||||
buffer.rewind();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] dst, int offset, int len) throws IOException {
|
||||
int toRead = len;
|
||||
// System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" +
|
||||
// length() + " this=" + this);
|
||||
while (true) {
|
||||
final int left = bufferSize - bufferPos;
|
||||
if (left < toRead) {
|
||||
// System.out.println(" copy " + left);
|
||||
buffer.get(dst, offset, left);
|
||||
toRead -= left;
|
||||
offset += left;
|
||||
refill();
|
||||
} else {
|
||||
// System.out.println(" copy " + toRead);
|
||||
buffer.get(dst, offset, toRead);
|
||||
bufferPos += toRead;
|
||||
// System.out.println(" readBytes done");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NativeUnixIndexInput clone() {
|
||||
try {
|
||||
return new NativeUnixIndexInput(this);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("IOException during clone: " + this, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
|
||||
// TODO: is this the right thing to do?
|
||||
return BufferedIndexInput.wrap(sliceDescription, this, offset, length);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,15 +27,11 @@
|
|||
The misc package has various tools for splitting/merging indices,
|
||||
changing norms, finding high freq terms, and others.
|
||||
|
||||
<a id="NativeUnixDirectory"></a>
|
||||
<h2>NativeUnixDirectory</h2>
|
||||
<a id="DirectIODirectory"></a>
|
||||
<h2>DirectIODirectory</h2>
|
||||
|
||||
<p>
|
||||
<b>NOTE</b>: This uses C++ sources (accessible via JNI), which you'll
|
||||
have to compile on your platform.
|
||||
|
||||
<p>
|
||||
{@link org.apache.lucene.misc.store.NativeUnixDirectory} is a Directory implementation that bypasses the
|
||||
{@link org.apache.lucene.misc.store.DirectIODirectory} is a Directory implementation that bypasses the
|
||||
OS's buffer cache (using direct IO) for any IndexInput and IndexOutput
|
||||
used during merging of segments larger than a specified size (default
|
||||
10 MB). This avoids evicting hot pages that are still in-use for
|
||||
|
@ -45,23 +41,6 @@ searching, keeping search more responsive while large merges run.
|
|||
See <a target="_top" href="http://blog.mikemccandless.com/2010/06/lucene-and-fadvisemadvise.html">this blog post</a>
|
||||
for details.
|
||||
|
||||
<p>Steps to build (from the project's root directory):
|
||||
<ul>
|
||||
<li>Compile both the native library part (<code>libLuceneNativeIO</code>) and Java sources with:
|
||||
<code>./gradlew -p lucene/misc build</code>.</li>
|
||||
|
||||
<li>The native library will be located in the <code>lucene/misc/native/build/lib/main/release/<i>your-platform</i></code> folder.</li>
|
||||
|
||||
<li>On Unix-ish systems, make sure <code>libNativePosixUtil.so</code> is on your
|
||||
<code>LD_LIBRARY_PATH</code> so java can find it (something like <code>export LD_LIBRARY_PATH=/path/to/dir:$LD_LIBRARY_PATH</code>,
|
||||
where /path/to/dir contains <code>libLuceneNativeIO.so</code>).</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
The native library exposes access to the <code>posix_madvise</code>,
|
||||
<code>madvise</code>, <code>posix_fadvise</code> functions, which are somewhat more cross
|
||||
platform than <code>O_DIRECT</code>, however, in testing (see above link), these
|
||||
APIs did not seem to help prevent buffer cache eviction.
|
||||
</body>
|
||||
|
||||
</html>
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.misc.store;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.LifecycleScope;
|
||||
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import org.apache.lucene.store.ByteBuffersDirectory;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.MergeInfo;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
public class NativeUnixDirectoryTest extends LuceneTestCase {
|
||||
@Rule
|
||||
public static TestRule requiresNative =
|
||||
new NativeLibEnableRule(
|
||||
EnumSet.of(
|
||||
NativeLibEnableRule.OperatingSystem.MAC,
|
||||
NativeLibEnableRule.OperatingSystem.FREE_BSD,
|
||||
NativeLibEnableRule.OperatingSystem.LINUX));
|
||||
|
||||
public void testLibraryLoaded() throws IOException {
|
||||
try (ByteBuffersDirectory ramDir = new ByteBuffersDirectory();
|
||||
Directory dir =
|
||||
new NativeUnixDirectory(RandomizedTest.newTempDir(LifecycleScope.TEST), ramDir)) {
|
||||
MergeInfo mergeInfo = new MergeInfo(1000, Integer.MAX_VALUE, true, 1);
|
||||
dir.createOutput("test", new IOContext(mergeInfo)).close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.misc.store;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.OptionalLong;
|
||||
import org.apache.lucene.document.*;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.PhraseQuery;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class TestDirectIODirectory extends BaseDirectoryTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void checkSupported() {
|
||||
assumeTrue(
|
||||
"This test required a JDK version that has support for ExtendedOpenOption.DIRECT",
|
||||
DirectIODirectory.ExtendedOpenOption_DIRECT != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectIODirectory getDirectory(Path path) throws IOException {
|
||||
return new DirectIODirectory(FSDirectory.open(path)) {
|
||||
@Override
|
||||
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void testIndexWriteRead() throws IOException {
|
||||
try (Directory dir = getDirectory(createTempDir("testDirectIODirectory"))) {
|
||||
try (RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) {
|
||||
Document doc = new Document();
|
||||
Field field = newField("field", "foo bar", TextField.TYPE_STORED);
|
||||
doc.add(field);
|
||||
|
||||
iw.addDocument(doc);
|
||||
iw.commit();
|
||||
}
|
||||
|
||||
try (IndexReader ir = DirectoryReader.open(dir)) {
|
||||
IndexSearcher s = newSearcher(ir);
|
||||
assertEquals(1, s.count(new PhraseQuery("field", "foo", "bar")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testIllegalEOFWithFileSizeMultipleOfBlockSize() throws Exception {
|
||||
Path path = createTempDir("testIllegalEOF");
|
||||
final int fileSize = Math.toIntExact(Files.getFileStore(path).getBlockSize()) * 2;
|
||||
|
||||
try (Directory dir = getDirectory(path)) {
|
||||
IndexOutput o = dir.createOutput("out", newIOContext(random()));
|
||||
byte[] b = new byte[fileSize];
|
||||
o.writeBytes(b, 0, fileSize);
|
||||
o.close();
|
||||
IndexInput i = dir.openInput("out", newIOContext(random()));
|
||||
i.seek(fileSize);
|
||||
i.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testUseDirectIODefaults() throws Exception {
|
||||
Path path = createTempDir("testUseDirectIODefaults");
|
||||
try (DirectIODirectory dir = new DirectIODirectory(FSDirectory.open(path))) {
|
||||
long largeSize = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT + random().nextInt(10_000);
|
||||
long smallSize =
|
||||
random().nextInt(Math.toIntExact(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT));
|
||||
int numDocs = random().nextInt(1000);
|
||||
|
||||
assertFalse(dir.useDirectIO("dummy", IOContext.DEFAULT, OptionalLong.empty()));
|
||||
|
||||
assertTrue(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, largeSize, true, -1)),
|
||||
OptionalLong.empty()));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, smallSize, true, -1)),
|
||||
OptionalLong.empty()));
|
||||
|
||||
assertTrue(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, largeSize, true, -1)),
|
||||
OptionalLong.of(largeSize)));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, smallSize, true, -1)),
|
||||
OptionalLong.of(smallSize)));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, smallSize, true, -1)),
|
||||
OptionalLong.of(largeSize)));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new MergeInfo(numDocs, largeSize, true, -1)),
|
||||
OptionalLong.of(smallSize)));
|
||||
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy", new IOContext(new FlushInfo(numDocs, largeSize)), OptionalLong.empty()));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy", new IOContext(new FlushInfo(numDocs, smallSize)), OptionalLong.empty()));
|
||||
assertFalse(
|
||||
dir.useDirectIO(
|
||||
"dummy",
|
||||
new IOContext(new FlushInfo(numDocs, largeSize)),
|
||||
OptionalLong.of(largeSize)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1305,8 +1305,8 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
|
|||
});
|
||||
|
||||
// Make sure we cannot open it for reading:
|
||||
expectThrows(
|
||||
NoSuchFileException.class,
|
||||
expectThrowsAnyOf(
|
||||
Arrays.asList(NoSuchFileException.class, FileNotFoundException.class),
|
||||
() -> {
|
||||
fsDir.openInput(fileName, IOContext.DEFAULT);
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue