mirror of https://github.com/apache/lucene.git
LUCENE-753: Add new NIOFSDirectory implementation to use java.nio.* for access the same file concurrently with no locking
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@690539 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4b9a300730
commit
bfff632dbc
|
@ -130,6 +130,14 @@ API Changes
|
||||||
subclasses can create their own subclasses of each Query type.
|
subclasses can create their own subclasses of each Query type.
|
||||||
(John Wang via Mike McCandless)
|
(John Wang via Mike McCandless)
|
||||||
|
|
||||||
|
20. LUCENE-753: Added new Directory implementation
|
||||||
|
org.apache.lucene.store.NIOFSDirectory, which uses java.nio's
|
||||||
|
FileChannel to do file reads. On most non-Windows platforms, with
|
||||||
|
many threads sharing a single searcher, this may yield sizable
|
||||||
|
improvement to query throughput when compared to FSDirectory,
|
||||||
|
which only allows a single thread to read from an open file at a
|
||||||
|
time. (Jason Rutherglen via Mike McCandless)
|
||||||
|
|
||||||
Bug fixes
|
Bug fixes
|
||||||
|
|
||||||
1. LUCENE-1134: Fixed BooleanQuery.rewrite to only optimize a single
|
1. LUCENE-1134: Fixed BooleanQuery.rewrite to only optimize a single
|
||||||
|
|
|
@ -225,7 +225,7 @@ final class CompoundFileWriter {
|
||||||
|
|
||||||
while(remainder > 0) {
|
while(remainder > 0) {
|
||||||
int len = (int) Math.min(chunk, remainder);
|
int len = (int) Math.min(chunk, remainder);
|
||||||
is.readBytes(buffer, 0, len);
|
is.readBytes(buffer, 0, len, false);
|
||||||
os.writeBytes(buffer, len);
|
os.writeBytes(buffer, len);
|
||||||
remainder -= len;
|
remainder -= len;
|
||||||
if (checkAbort != null)
|
if (checkAbort != null)
|
||||||
|
|
|
@ -27,7 +27,7 @@ public abstract class BufferedIndexInput extends IndexInput {
|
||||||
|
|
||||||
private int bufferSize = BUFFER_SIZE;
|
private int bufferSize = BUFFER_SIZE;
|
||||||
|
|
||||||
private byte[] buffer;
|
protected byte[] buffer;
|
||||||
|
|
||||||
private long bufferStart = 0; // position in file of buffer
|
private long bufferStart = 0; // position in file of buffer
|
||||||
private int bufferLength = 0; // end of valid bytes
|
private int bufferLength = 0; // end of valid bytes
|
||||||
|
@ -49,7 +49,7 @@ public abstract class BufferedIndexInput extends IndexInput {
|
||||||
|
|
||||||
/** Change the buffer size used by this IndexInput */
|
/** Change the buffer size used by this IndexInput */
|
||||||
public void setBufferSize(int newSize) {
|
public void setBufferSize(int newSize) {
|
||||||
assert buffer == null || bufferSize == buffer.length;
|
assert buffer == null || bufferSize == buffer.length: "buffer=" + buffer + " bufferSize=" + bufferSize + " buffer.length=" + (buffer != null ? buffer.length : 0);
|
||||||
if (newSize != bufferSize) {
|
if (newSize != bufferSize) {
|
||||||
checkBufferSize(newSize);
|
checkBufferSize(newSize);
|
||||||
bufferSize = newSize;
|
bufferSize = newSize;
|
||||||
|
@ -68,11 +68,16 @@ public abstract class BufferedIndexInput extends IndexInput {
|
||||||
bufferStart += bufferPosition;
|
bufferStart += bufferPosition;
|
||||||
bufferPosition = 0;
|
bufferPosition = 0;
|
||||||
bufferLength = numToCopy;
|
bufferLength = numToCopy;
|
||||||
buffer = newBuffer;
|
newBuffer(newBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void newBuffer(byte[] newBuffer) {
|
||||||
|
// Subclasses can do something here
|
||||||
|
buffer = newBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
/** Returns buffer size. @see #setBufferSize */
|
/** Returns buffer size. @see #setBufferSize */
|
||||||
public int getBufferSize() {
|
public int getBufferSize() {
|
||||||
return bufferSize;
|
return bufferSize;
|
||||||
|
@ -146,7 +151,7 @@ public abstract class BufferedIndexInput extends IndexInput {
|
||||||
throw new IOException("read past EOF");
|
throw new IOException("read past EOF");
|
||||||
|
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
buffer = new byte[bufferSize]; // allocate buffer lazily
|
newBuffer(new byte[bufferSize]); // allocate buffer lazily
|
||||||
seekInternal(bufferStart);
|
seekInternal(bufferStart);
|
||||||
}
|
}
|
||||||
readInternal(buffer, 0, newLength);
|
readInternal(buffer, 0, newLength);
|
||||||
|
|
|
@ -541,10 +541,10 @@ public class FSDirectory extends Directory {
|
||||||
|
|
||||||
protected static class FSIndexInput extends BufferedIndexInput {
|
protected static class FSIndexInput extends BufferedIndexInput {
|
||||||
|
|
||||||
private static class Descriptor extends RandomAccessFile {
|
protected static class Descriptor extends RandomAccessFile {
|
||||||
// remember if the file is open, so that we don't try to close it
|
// remember if the file is open, so that we don't try to close it
|
||||||
// more than once
|
// more than once
|
||||||
private boolean isOpen;
|
protected volatile boolean isOpen;
|
||||||
long position;
|
long position;
|
||||||
final long length;
|
final long length;
|
||||||
|
|
||||||
|
@ -570,7 +570,7 @@ public class FSDirectory extends Directory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Descriptor file;
|
protected final Descriptor file;
|
||||||
boolean isClone;
|
boolean isClone;
|
||||||
|
|
||||||
public FSIndexInput(File path) throws IOException {
|
public FSIndexInput(File path) throws IOException {
|
||||||
|
@ -633,7 +633,7 @@ public class FSDirectory extends Directory {
|
||||||
|
|
||||||
// remember if the file is open, so that we don't try to close it
|
// remember if the file is open, so that we don't try to close it
|
||||||
// more than once
|
// more than once
|
||||||
private boolean isOpen;
|
private volatile boolean isOpen;
|
||||||
|
|
||||||
public FSIndexOutput(File path) throws IOException {
|
public FSIndexOutput(File path) throws IOException {
|
||||||
file = new RandomAccessFile(path, "rw");
|
file = new RandomAccessFile(path, "rw");
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
package org.apache.lucene.store;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to You under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NIO version of FSDirectory. Uses FileChannel.read(ByteBuffer dst, long position) method
|
||||||
|
* which allows multiple threads to read from the file without synchronizing. FSDirectory
|
||||||
|
* synchronizes in the FSIndexInput.readInternal method which can cause pileups when there
|
||||||
|
* are many threads accessing the Directory concurrently.
|
||||||
|
*
|
||||||
|
* This class only uses FileChannel when reading; writing
|
||||||
|
* with an IndexOutput is inherited from FSDirectory.
|
||||||
|
*
|
||||||
|
* Note: NIOFSDirectory is not recommended on Windows because of a bug
|
||||||
|
* in how FileChannel.read is implemented in Sun's JRE.
|
||||||
|
* Inside of the implementation the position is apparently
|
||||||
|
* synchronized. See here for details:
|
||||||
|
|
||||||
|
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6265734
|
||||||
|
*
|
||||||
|
* @see FSDirectory
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class NIOFSDirectory extends FSDirectory {
|
||||||
|
|
||||||
|
// Inherit javadoc
|
||||||
|
public IndexInput openInput(String name, int bufferSize) throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
return new NIOFSIndexInput(new File(getFile(), name), bufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class NIOFSIndexInput extends FSDirectory.FSIndexInput {
|
||||||
|
|
||||||
|
private ByteBuffer byteBuf; // wraps the buffer for NIO
|
||||||
|
|
||||||
|
private byte[] otherBuffer;
|
||||||
|
private ByteBuffer otherByteBuf;
|
||||||
|
|
||||||
|
final FileChannel channel;
|
||||||
|
|
||||||
|
public NIOFSIndexInput(File path, int bufferSize) throws IOException {
|
||||||
|
super(path, bufferSize);
|
||||||
|
channel = file.getChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void newBuffer(byte[] newBuffer) {
|
||||||
|
super.newBuffer(newBuffer);
|
||||||
|
byteBuf = ByteBuffer.wrap(newBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (!isClone && file.isOpen) {
|
||||||
|
// Close the channel & file
|
||||||
|
try {
|
||||||
|
channel.close();
|
||||||
|
} finally {
|
||||||
|
file.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void readInternal(byte[] b, int offset, int len) throws IOException {
|
||||||
|
|
||||||
|
final ByteBuffer bb;
|
||||||
|
|
||||||
|
// Determine the ByteBuffer we should use
|
||||||
|
if (b == buffer && 0 == offset) {
|
||||||
|
// Use our own pre-wrapped byteBuf:
|
||||||
|
assert byteBuf != null;
|
||||||
|
byteBuf.clear();
|
||||||
|
byteBuf.limit(len);
|
||||||
|
bb = byteBuf;
|
||||||
|
} else {
|
||||||
|
if (offset == 0) {
|
||||||
|
if (otherBuffer != b) {
|
||||||
|
// Now wrap this other buffer; with compound
|
||||||
|
// file, we are repeatedly called with its
|
||||||
|
// buffer, so we wrap it once and then re-use it
|
||||||
|
// on subsequent calls
|
||||||
|
otherBuffer = b;
|
||||||
|
otherByteBuf = ByteBuffer.wrap(b);
|
||||||
|
} else
|
||||||
|
otherByteBuf.clear();
|
||||||
|
otherByteBuf.limit(len);
|
||||||
|
bb = otherByteBuf;
|
||||||
|
} else
|
||||||
|
// Always wrap when offset != 0
|
||||||
|
bb = ByteBuffer.wrap(b, offset, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
long pos = getFilePointer();
|
||||||
|
while (bb.hasRemaining()) {
|
||||||
|
int i = channel.read(bb, pos);
|
||||||
|
if (i == -1)
|
||||||
|
throw new IOException("read past EOF");
|
||||||
|
pos += i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -175,9 +175,9 @@ public class TestBufferedIndexInput extends LuceneTestCase {
|
||||||
Term aaa = new Term("content", "aaa");
|
Term aaa = new Term("content", "aaa");
|
||||||
Term bbb = new Term("content", "bbb");
|
Term bbb = new Term("content", "bbb");
|
||||||
Term ccc = new Term("content", "ccc");
|
Term ccc = new Term("content", "ccc");
|
||||||
assertEquals(reader.docFreq(ccc), 37);
|
assertEquals(37, reader.docFreq(ccc));
|
||||||
reader.deleteDocument(0);
|
reader.deleteDocument(0);
|
||||||
assertEquals(reader.docFreq(aaa), 37);
|
assertEquals(37, reader.docFreq(aaa));
|
||||||
dir.tweakBufferSizes();
|
dir.tweakBufferSizes();
|
||||||
reader.deleteDocument(4);
|
reader.deleteDocument(4);
|
||||||
assertEquals(reader.docFreq(bbb), 37);
|
assertEquals(reader.docFreq(bbb), 37);
|
||||||
|
@ -205,7 +205,7 @@ public class TestBufferedIndexInput extends LuceneTestCase {
|
||||||
|
|
||||||
List allIndexInputs = new ArrayList();
|
List allIndexInputs = new ArrayList();
|
||||||
|
|
||||||
Random rand = new Random();
|
Random rand = new Random(788);
|
||||||
|
|
||||||
private Directory dir;
|
private Directory dir;
|
||||||
|
|
||||||
|
@ -220,12 +220,12 @@ public class TestBufferedIndexInput extends LuceneTestCase {
|
||||||
|
|
||||||
public void tweakBufferSizes() {
|
public void tweakBufferSizes() {
|
||||||
Iterator it = allIndexInputs.iterator();
|
Iterator it = allIndexInputs.iterator();
|
||||||
int count = 0;
|
//int count = 0;
|
||||||
while(it.hasNext()) {
|
while(it.hasNext()) {
|
||||||
BufferedIndexInput bii = (BufferedIndexInput) it.next();
|
BufferedIndexInput bii = (BufferedIndexInput) it.next();
|
||||||
int bufferSize = 1024+(int) Math.abs(rand.nextInt() % 32768);
|
int bufferSize = 1024+(int) Math.abs(rand.nextInt() % 32768);
|
||||||
bii.setBufferSize(bufferSize);
|
bii.setBufferSize(bufferSize);
|
||||||
count++;
|
//count++;
|
||||||
}
|
}
|
||||||
//System.out.println("tweak'd " + count + " buffer sizes");
|
//System.out.println("tweak'd " + count + " buffer sizes");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue