From f4919e14b84a5f4a51e2a096f6f38669f65ff7d7 Mon Sep 17 00:00:00 2001 From: Shai Erera Date: Fri, 30 Jul 2010 06:13:13 +0000 Subject: [PATCH] LUCENE-2574: Optimize copies between IndexInput and Output git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@980656 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 6 +- .../lucene/index/CompoundFileReader.java | 11 +++ .../lucene/store/BufferedIndexInput.java | 37 +++++++- .../org/apache/lucene/store/Directory.java | 21 +---- .../org/apache/lucene/store/FSDirectory.java | 84 ++++++------------- .../org/apache/lucene/store/IndexInput.java | 32 ++++++- .../apache/lucene/store/RAMInputStream.java | 28 +++++-- .../apache/lucene/store/RAMOutputStream.java | 23 ++++- .../lucene/store/SimpleFSDirectory.java | 7 ++ 9 files changed, 164 insertions(+), 85 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index ffea4834b0b..b7cae74c9e5 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -204,7 +204,11 @@ Optimizations * LUCENE-2531: Fix issue when sorting by a String field that was causing too many fallbacks to compare-by-value (instead of by-ord). (Mike McCandless) - + +* LUCENE-2574: IndexInput exposes copyBytes(IndexOutput, long) to allow for + efficient copying by sub-classes. Optimized copy is implemented for RAM and FS + streams. (Shai Erera) + ======================= Lucene 3.x (not yet released) ======================= Changes in backwards compatibility policy diff --git a/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java b/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java index cc1f7d8644a..d4e7f210c68 100644 --- a/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java +++ b/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java @@ -303,6 +303,17 @@ public class CompoundFileReader extends Directory { return length; } + @Override + public void copyBytes(IndexOutput out, long numBytes) throws IOException { + // Copy first whatever is in the buffer + numBytes -= flushBuffer(out, numBytes); + + // If there are more bytes left to copy, delegate the copy task to the + // base IndexInput, in case it can do an optimized copy. + if (numBytes > 0) { + base.copyBytes(out, numBytes); + } + } } diff --git a/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java index e29372eb9f8..17f97473429 100644 --- a/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -26,9 +26,9 @@ public abstract class BufferedIndexInput extends IndexInput { public static final int BUFFER_SIZE = 1024; private int bufferSize = BUFFER_SIZE; - + protected byte[] buffer; - + private long bufferStart = 0; // position in file of buffer private int bufferLength = 0; // end of valid bytes private int bufferPosition = 0; // next byte to read @@ -205,4 +205,37 @@ public abstract class BufferedIndexInput extends IndexInput { return clone; } + /** + * Flushes the in-memory bufer to the given output, copying at most + * numBytes. + *

+ * NOTE: this method does not refill the buffer, however it does + * advance the buffer position. + * + * @return the number of bytes actually flushed from the in-memory buffer. + */ + protected int flushBuffer(IndexOutput out, long numBytes) throws IOException { + int toCopy = bufferLength - bufferPosition; + if (toCopy > numBytes) { + toCopy = (int) numBytes; + } + if (toCopy > 0) { + out.writeBytes(buffer, bufferPosition, toCopy); + bufferPosition += toCopy; + } + return toCopy; + } + + @Override + public void copyBytes(IndexOutput out, long numBytes) throws IOException { + assert numBytes >= 0: "numBytes=" + numBytes; + + while (numBytes > 0) { + if (bufferLength == bufferPosition) { + refill(); + } + numBytes -= flushBuffer(out, numBytes); + } + } + } diff --git a/lucene/src/java/org/apache/lucene/store/Directory.java b/lucene/src/java/org/apache/lucene/store/Directory.java index 495ee4f1760..924c3b58ca0 100644 --- a/lucene/src/java/org/apache/lucene/store/Directory.java +++ b/lucene/src/java/org/apache/lucene/store/Directory.java @@ -215,26 +215,11 @@ public abstract class Directory implements Closeable { * overwrite it if it does. */ public void copy(Directory to, String src, String dest) throws IOException { - IndexOutput os = null; - IndexInput is = null; + IndexOutput os = to.createOutput(dest); + IndexInput is = openInput(src); IOException priorException = null; - int bufSize = BufferedIndexOutput.BUFFER_SIZE; - byte[] buf = new byte[bufSize]; try { - // create file in dest directory - os = to.createOutput(dest); - // read current file - is = openInput(src); - // and copy to dest directory - long len = is.length(); - long numRead = 0; - while (numRead < len) { - long left = len - numRead; - int toRead = (int) (bufSize < left ? bufSize : left); - is.readBytes(buf, 0, toRead); - os.writeBytes(buf, toRead); - numRead += toRead; - } + is.copyBytes(os, is.length()); } catch (IOException ioe) { priorException = ioe; } finally { diff --git a/lucene/src/java/org/apache/lucene/store/FSDirectory.java b/lucene/src/java/org/apache/lucene/store/FSDirectory.java index 2520776e751..4f1a9db7234 100644 --- a/lucene/src/java/org/apache/lucene/store/FSDirectory.java +++ b/lucene/src/java/org/apache/lucene/store/FSDirectory.java @@ -18,9 +18,7 @@ package org.apache.lucene.store; */ import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.RandomAccessFile; @@ -35,7 +33,6 @@ import java.util.HashSet; import java.util.Set; import org.apache.lucene.store.SimpleFSDirectory.SimpleFSIndexInput; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.Constants; @@ -438,47 +435,6 @@ public abstract class FSDirectory extends Directory { return chunkSize; } - @Override - public void copy(Directory to, String src, String dest) throws IOException { - if (to instanceof FSDirectory) { - FSDirectory target = (FSDirectory) to; - target.ensureCanWrite(dest); - FileChannel input = null; - FileChannel output = null; - IOException priorException = null; - try { - input = new FileInputStream(new File(directory, src)).getChannel(); - output = new FileOutputStream(new File(target.directory, dest)).getChannel(); - copy(input, output, input.size()); - } catch (IOException ioe) { - priorException = ioe; - } finally { - IOUtils.closeSafely(priorException, input, output); - } - } else { - super.copy(to, src, dest); - } - } - - /** - * Copies the content of a given {@link FileChannel} to a destination one. The - * copy is done in chunks of 2MB because if transferFrom is used without a - * limit when copying a very large file, then an OOM may be thrown (depends on - * the state of the RAM in the machine, as well as the OS used). Performance - * measurements showed that chunk sizes larger than 2MB do not result in much - * faster file copy, therefore we limit the size to be safe with different - * file sizes and systems. - */ - static void copy(FileChannel input, FileChannel output, long numBytes) throws IOException { - long pos = output.position(); - long writeTo = numBytes + pos; - while (pos < writeTo) { - pos += output.transferFrom(input, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos)); - } - // transferFrom does not change the position of the channel. Need to change it manually - output.position(pos); - } - protected static class FSIndexOutput extends BufferedIndexOutput { private final FSDirectory parent; private final String name; @@ -501,23 +457,37 @@ public abstract class FSDirectory extends Directory { @Override public void copyBytes(DataInput input, long numBytes) throws IOException { // Optimized copy only if the number of bytes to copy is larger than the - // buffer size, and the given IndexInput supports FileChannel copying .. + // buffer size, and the given IndexInput supports FileChannel copying. // NOTE: the below check relies on NIOIndexInput extending Simple. If that // changes in the future, we should change the check as well. - if (numBytes > BUFFER_SIZE && input instanceof SimpleFSIndexInput) { - // flush any bytes in the buffer - flush(); - // do the optimized copy - FileChannel in = ((SimpleFSIndexInput) input).file.getChannel(); - FileChannel out = file.getChannel(); - copy(in, out, numBytes); - // corrects the position in super (BufferedIndexOutput), so that calls - // to getFilePointer will return the correct pointer. - // Perhaps a specific method is better? - super.seek(out.position()); - } else { + if (numBytes <= BUFFER_SIZE || !(input instanceof SimpleFSIndexInput)) { super.copyBytes(input, numBytes); + return; } + + SimpleFSIndexInput fsInput = (SimpleFSIndexInput) input; + + // flush any bytes in the buffer + flush(); + + // flush any bytes in the input's buffer. + numBytes -= fsInput.flushBuffer(this, numBytes); + + // do the optimized copy + FileChannel in = fsInput.file.getChannel(); + FileChannel out = file.getChannel(); + long pos = out.position(); + long writeTo = numBytes + pos; + while (pos < writeTo) { + pos += out.transferFrom(in, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos)); + } + // transferFrom does not change the position of the channel. Need to change it manually + out.position(pos); + + // corrects the position in super (BufferedIndexOutput), so that calls + // to getFilePointer will return the correct pointer. + // Perhaps a specific method is better? + super.seek(out.position()); } @Override diff --git a/lucene/src/java/org/apache/lucene/store/IndexInput.java b/lucene/src/java/org/apache/lucene/store/IndexInput.java index 1268c93191d..87dda60de28 100644 --- a/lucene/src/java/org/apache/lucene/store/IndexInput.java +++ b/lucene/src/java/org/apache/lucene/store/IndexInput.java @@ -25,6 +25,9 @@ import java.io.IOException; * @see Directory */ public abstract class IndexInput extends DataInput implements Cloneable,Closeable { + + protected byte[] copyBuf = null; + /** Closes the stream to further operations. */ public abstract void close() throws IOException; @@ -41,4 +44,31 @@ public abstract class IndexInput extends DataInput implements Cloneable,Closeabl /** The number of bytes in the file. */ public abstract long length(); -} \ No newline at end of file + + /** + * Copies numBytes bytes to the given {@link IndexOutput}. + *

+ * NOTE: this method uses an intermediate buffer to copy the bytes. + * Consider overriding it in your implementation, if you can make a better, + * optimized copy. + *

+ * NOTE ensure that there are enough bytes in the input to copy to + * output. Otherwise, different exceptions may be thrown, depending on the + * implementation. + */ + public void copyBytes(IndexOutput out, long numBytes) throws IOException { + assert numBytes >= 0: "numBytes=" + numBytes; + + if (copyBuf == null) { + copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE]; + } + + while (numBytes > 0) { + final int toCopy = (int) (numBytes > copyBuf.length ? copyBuf.length : numBytes); + readBytes(copyBuf, 0, toCopy); + out.writeBytes(copyBuf, 0, toCopy); + numBytes -= toCopy; + } + } + +} diff --git a/lucene/src/java/org/apache/lucene/store/RAMInputStream.java b/lucene/src/java/org/apache/lucene/store/RAMInputStream.java index 8de55114aa5..15f87d5e595 100644 --- a/lucene/src/java/org/apache/lucene/store/RAMInputStream.java +++ b/lucene/src/java/org/apache/lucene/store/RAMInputStream.java @@ -1,7 +1,5 @@ package org.apache.lucene.store; -import java.io.IOException; - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,10 +17,9 @@ import java.io.IOException; * limitations under the License. */ -/** - * A memory-resident {@link IndexInput} implementation. - */ +import java.io.IOException; +/** A memory-resident {@link IndexInput} implementation. */ class RAMInputStream extends IndexInput implements Cloneable { static final int BUFFER_SIZE = RAMOutputStream.BUFFER_SIZE; @@ -104,6 +101,27 @@ class RAMInputStream extends IndexInput implements Cloneable { } } + @Override + public void copyBytes(IndexOutput out, long numBytes) throws IOException { + assert numBytes >= 0: "numBytes=" + numBytes; + + long left = numBytes; + while (left > 0) { + if (bufferPosition == bufferLength) { + ++currentBufferIndex; + switchCurrentBuffer(true); + } + + final int bytesInBuffer = bufferLength - bufferPosition; + final int toCopy = (int) (bytesInBuffer < left ? bytesInBuffer : left); + out.writeBytes(currentBuffer, bufferPosition, toCopy); + bufferPosition += toCopy; + left -= toCopy; + } + + assert left == 0: "Insufficient bytes to copy: numBytes=" + numBytes + " copied=" + (numBytes - left); + } + @Override public long getFilePointer() { return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition; diff --git a/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java b/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java index dd2fe04059e..b07fe07e7fe 100644 --- a/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java +++ b/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java @@ -24,7 +24,6 @@ import java.io.IOException; * * @lucene.internal */ - public class RAMOutputStream extends IndexOutput { static final int BUFFER_SIZE = 1024; @@ -161,4 +160,26 @@ public class RAMOutputStream extends IndexOutput { public long sizeInBytes() { return file.numBuffers() * BUFFER_SIZE; } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + assert numBytes >= 0: "numBytes=" + numBytes; + + while (numBytes > 0) { + if (bufferPosition == bufferLength) { + currentBufferIndex++; + switchCurrentBuffer(); + } + + int toCopy = currentBuffer.length - bufferPosition; + if (numBytes < toCopy) { + toCopy = (int) numBytes; + } + input.readBytes(currentBuffer, bufferPosition, toCopy, false); + numBytes -= toCopy; + bufferPosition += toCopy; + } + + } + } diff --git a/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java b/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java index 8cc520eb604..793d91a7160 100644 --- a/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java +++ b/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java @@ -160,5 +160,12 @@ public class SimpleFSDirectory extends FSDirectory { boolean isFDValid() throws IOException { return file.getFD().valid(); } + + @Override + public void copyBytes(IndexOutput out, long numBytes) throws IOException { + numBytes -= flushBuffer(out, numBytes); + // If out is FSIndexOutput, the copy will be optimized + out.copyBytes(this, numBytes); + } } }