LUCENE-2574: Optimize copies between IndexInput and Output

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@980656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2010-07-30 06:13:13 +00:00
parent 66660307d9
commit f4919e14b8
9 changed files with 164 additions and 85 deletions

View File

@ -205,6 +205,10 @@ Optimizations
causing too many fallbacks to compare-by-value (instead of by-ord). causing too many fallbacks to compare-by-value (instead of by-ord).
(Mike McCandless) (Mike McCandless)
* LUCENE-2574: IndexInput exposes copyBytes(IndexOutput, long) to allow for
efficient copying by sub-classes. Optimized copy is implemented for RAM and FS
streams. (Shai Erera)
======================= Lucene 3.x (not yet released) ======================= ======================= Lucene 3.x (not yet released) =======================
Changes in backwards compatibility policy Changes in backwards compatibility policy

View File

@ -303,6 +303,17 @@ public class CompoundFileReader extends Directory {
return length; return length;
} }
@Override
public void copyBytes(IndexOutput out, long numBytes) throws IOException {
// Copy first whatever is in the buffer
numBytes -= flushBuffer(out, numBytes);
// If there are more bytes left to copy, delegate the copy task to the
// base IndexInput, in case it can do an optimized copy.
if (numBytes > 0) {
base.copyBytes(out, numBytes);
}
}
} }

View File

@ -205,4 +205,37 @@ public abstract class BufferedIndexInput extends IndexInput {
return clone; return clone;
} }
/**
* Flushes the in-memory bufer to the given output, copying at most
* <code>numBytes</code>.
* <p>
* <b>NOTE:</b> this method does not refill the buffer, however it does
* advance the buffer position.
*
* @return the number of bytes actually flushed from the in-memory buffer.
*/
protected int flushBuffer(IndexOutput out, long numBytes) throws IOException {
int toCopy = bufferLength - bufferPosition;
if (toCopy > numBytes) {
toCopy = (int) numBytes;
}
if (toCopy > 0) {
out.writeBytes(buffer, bufferPosition, toCopy);
bufferPosition += toCopy;
}
return toCopy;
}
@Override
public void copyBytes(IndexOutput out, long numBytes) throws IOException {
assert numBytes >= 0: "numBytes=" + numBytes;
while (numBytes > 0) {
if (bufferLength == bufferPosition) {
refill();
}
numBytes -= flushBuffer(out, numBytes);
}
}
} }

View File

@ -215,26 +215,11 @@ public abstract class Directory implements Closeable {
* overwrite it if it does. * overwrite it if it does.
*/ */
public void copy(Directory to, String src, String dest) throws IOException { public void copy(Directory to, String src, String dest) throws IOException {
IndexOutput os = null; IndexOutput os = to.createOutput(dest);
IndexInput is = null; IndexInput is = openInput(src);
IOException priorException = null; IOException priorException = null;
int bufSize = BufferedIndexOutput.BUFFER_SIZE;
byte[] buf = new byte[bufSize];
try { try {
// create file in dest directory is.copyBytes(os, is.length());
os = to.createOutput(dest);
// read current file
is = openInput(src);
// and copy to dest directory
long len = is.length();
long numRead = 0;
while (numRead < len) {
long left = len - numRead;
int toRead = (int) (bufSize < left ? bufSize : left);
is.readBytes(buf, 0, toRead);
os.writeBytes(buf, toRead);
numRead += toRead;
}
} catch (IOException ioe) { } catch (IOException ioe) {
priorException = ioe; priorException = ioe;
} finally { } finally {

View File

@ -18,9 +18,7 @@ package org.apache.lucene.store;
*/ */
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -35,7 +33,6 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.lucene.store.SimpleFSDirectory.SimpleFSIndexInput; import org.apache.lucene.store.SimpleFSDirectory.SimpleFSIndexInput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
@ -438,47 +435,6 @@ public abstract class FSDirectory extends Directory {
return chunkSize; return chunkSize;
} }
@Override
public void copy(Directory to, String src, String dest) throws IOException {
if (to instanceof FSDirectory) {
FSDirectory target = (FSDirectory) to;
target.ensureCanWrite(dest);
FileChannel input = null;
FileChannel output = null;
IOException priorException = null;
try {
input = new FileInputStream(new File(directory, src)).getChannel();
output = new FileOutputStream(new File(target.directory, dest)).getChannel();
copy(input, output, input.size());
} catch (IOException ioe) {
priorException = ioe;
} finally {
IOUtils.closeSafely(priorException, input, output);
}
} else {
super.copy(to, src, dest);
}
}
/**
* Copies the content of a given {@link FileChannel} to a destination one. The
* copy is done in chunks of 2MB because if transferFrom is used without a
* limit when copying a very large file, then an OOM may be thrown (depends on
* the state of the RAM in the machine, as well as the OS used). Performance
* measurements showed that chunk sizes larger than 2MB do not result in much
* faster file copy, therefore we limit the size to be safe with different
* file sizes and systems.
*/
static void copy(FileChannel input, FileChannel output, long numBytes) throws IOException {
long pos = output.position();
long writeTo = numBytes + pos;
while (pos < writeTo) {
pos += output.transferFrom(input, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos));
}
// transferFrom does not change the position of the channel. Need to change it manually
output.position(pos);
}
protected static class FSIndexOutput extends BufferedIndexOutput { protected static class FSIndexOutput extends BufferedIndexOutput {
private final FSDirectory parent; private final FSDirectory parent;
private final String name; private final String name;
@ -501,23 +457,37 @@ public abstract class FSDirectory extends Directory {
@Override @Override
public void copyBytes(DataInput input, long numBytes) throws IOException { public void copyBytes(DataInput input, long numBytes) throws IOException {
// Optimized copy only if the number of bytes to copy is larger than the // Optimized copy only if the number of bytes to copy is larger than the
// buffer size, and the given IndexInput supports FileChannel copying .. // buffer size, and the given IndexInput supports FileChannel copying.
// NOTE: the below check relies on NIOIndexInput extending Simple. If that // NOTE: the below check relies on NIOIndexInput extending Simple. If that
// changes in the future, we should change the check as well. // changes in the future, we should change the check as well.
if (numBytes > BUFFER_SIZE && input instanceof SimpleFSIndexInput) { if (numBytes <= BUFFER_SIZE || !(input instanceof SimpleFSIndexInput)) {
super.copyBytes(input, numBytes);
return;
}
SimpleFSIndexInput fsInput = (SimpleFSIndexInput) input;
// flush any bytes in the buffer // flush any bytes in the buffer
flush(); flush();
// flush any bytes in the input's buffer.
numBytes -= fsInput.flushBuffer(this, numBytes);
// do the optimized copy // do the optimized copy
FileChannel in = ((SimpleFSIndexInput) input).file.getChannel(); FileChannel in = fsInput.file.getChannel();
FileChannel out = file.getChannel(); FileChannel out = file.getChannel();
copy(in, out, numBytes); long pos = out.position();
long writeTo = numBytes + pos;
while (pos < writeTo) {
pos += out.transferFrom(in, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos));
}
// transferFrom does not change the position of the channel. Need to change it manually
out.position(pos);
// corrects the position in super (BufferedIndexOutput), so that calls // corrects the position in super (BufferedIndexOutput), so that calls
// to getFilePointer will return the correct pointer. // to getFilePointer will return the correct pointer.
// Perhaps a specific method is better? // Perhaps a specific method is better?
super.seek(out.position()); super.seek(out.position());
} else {
super.copyBytes(input, numBytes);
}
} }
@Override @Override

View File

@ -25,6 +25,9 @@ import java.io.IOException;
* @see Directory * @see Directory
*/ */
public abstract class IndexInput extends DataInput implements Cloneable,Closeable { public abstract class IndexInput extends DataInput implements Cloneable,Closeable {
protected byte[] copyBuf = null;
/** Closes the stream to further operations. */ /** Closes the stream to further operations. */
public abstract void close() throws IOException; public abstract void close() throws IOException;
@ -41,4 +44,31 @@ public abstract class IndexInput extends DataInput implements Cloneable,Closeabl
/** The number of bytes in the file. */ /** The number of bytes in the file. */
public abstract long length(); public abstract long length();
/**
* Copies <code>numBytes</code> bytes to the given {@link IndexOutput}.
* <p>
* <b>NOTE:</b> this method uses an intermediate buffer to copy the bytes.
* Consider overriding it in your implementation, if you can make a better,
* optimized copy.
* <p>
* <b>NOTE</b> ensure that there are enough bytes in the input to copy to
* output. Otherwise, different exceptions may be thrown, depending on the
* implementation.
*/
public void copyBytes(IndexOutput out, long numBytes) throws IOException {
assert numBytes >= 0: "numBytes=" + numBytes;
if (copyBuf == null) {
copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE];
}
while (numBytes > 0) {
final int toCopy = (int) (numBytes > copyBuf.length ? copyBuf.length : numBytes);
readBytes(copyBuf, 0, toCopy);
out.writeBytes(copyBuf, 0, toCopy);
numBytes -= toCopy;
}
}
} }

View File

@ -1,7 +1,5 @@
package org.apache.lucene.store; package org.apache.lucene.store;
import java.io.IOException;
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -19,10 +17,9 @@ import java.io.IOException;
* limitations under the License. * limitations under the License.
*/ */
/** import java.io.IOException;
* A memory-resident {@link IndexInput} implementation.
*/
/** A memory-resident {@link IndexInput} implementation. */
class RAMInputStream extends IndexInput implements Cloneable { class RAMInputStream extends IndexInput implements Cloneable {
static final int BUFFER_SIZE = RAMOutputStream.BUFFER_SIZE; static final int BUFFER_SIZE = RAMOutputStream.BUFFER_SIZE;
@ -104,6 +101,27 @@ class RAMInputStream extends IndexInput implements Cloneable {
} }
} }
@Override
public void copyBytes(IndexOutput out, long numBytes) throws IOException {
assert numBytes >= 0: "numBytes=" + numBytes;
long left = numBytes;
while (left > 0) {
if (bufferPosition == bufferLength) {
++currentBufferIndex;
switchCurrentBuffer(true);
}
final int bytesInBuffer = bufferLength - bufferPosition;
final int toCopy = (int) (bytesInBuffer < left ? bytesInBuffer : left);
out.writeBytes(currentBuffer, bufferPosition, toCopy);
bufferPosition += toCopy;
left -= toCopy;
}
assert left == 0: "Insufficient bytes to copy: numBytes=" + numBytes + " copied=" + (numBytes - left);
}
@Override @Override
public long getFilePointer() { public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition; return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;

View File

@ -24,7 +24,6 @@ import java.io.IOException;
* *
* @lucene.internal * @lucene.internal
*/ */
public class RAMOutputStream extends IndexOutput { public class RAMOutputStream extends IndexOutput {
static final int BUFFER_SIZE = 1024; static final int BUFFER_SIZE = 1024;
@ -161,4 +160,26 @@ public class RAMOutputStream extends IndexOutput {
public long sizeInBytes() { public long sizeInBytes() {
return file.numBuffers() * BUFFER_SIZE; return file.numBuffers() * BUFFER_SIZE;
} }
@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
assert numBytes >= 0: "numBytes=" + numBytes;
while (numBytes > 0) {
if (bufferPosition == bufferLength) {
currentBufferIndex++;
switchCurrentBuffer();
}
int toCopy = currentBuffer.length - bufferPosition;
if (numBytes < toCopy) {
toCopy = (int) numBytes;
}
input.readBytes(currentBuffer, bufferPosition, toCopy, false);
numBytes -= toCopy;
bufferPosition += toCopy;
}
}
} }

View File

@ -160,5 +160,12 @@ public class SimpleFSDirectory extends FSDirectory {
boolean isFDValid() throws IOException { boolean isFDValid() throws IOException {
return file.getFD().valid(); return file.getFD().valid();
} }
@Override
public void copyBytes(IndexOutput out, long numBytes) throws IOException {
numBytes -= flushBuffer(out, numBytes);
// If out is FSIndexOutput, the copy will be optimized
out.copyBytes(this, numBytes);
}
} }
} }