improve buffered output

- use the same buffer size if wrapping a buffered output
- directly call flush on the checksum wrapper
This commit is contained in:
Shay Banon 2014-01-09 12:17:02 +01:00
parent eb63bb259d
commit dbecc713d6
3 changed files with 34 additions and 22 deletions

View File

@ -17,10 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.common.lucene.store; package org.apache.lucene.store;
import org.apache.lucene.store.BufferedIndexOutput;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException; import java.io.IOException;
import java.util.zip.Checksum; import java.util.zip.Checksum;
@ -29,15 +26,19 @@ import java.util.zip.Checksum;
*/ */
public class BufferedChecksumIndexOutput extends BufferedIndexOutput { public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
private final IndexOutput out; private final IndexOutput delegate;
private final BufferedIndexOutput bufferedDelegate;
private final Checksum digest; private final Checksum digest;
public BufferedChecksumIndexOutput(IndexOutput out, Checksum digest) { public BufferedChecksumIndexOutput(IndexOutput delegate, Checksum digest) {
// we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
// to the output without being copied over to the delegate buffer if (delegate instanceof BufferedIndexOutput) {
super(BufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64); bufferedDelegate = (BufferedIndexOutput) delegate;
this.out = out; this.delegate = delegate;
} else {
this.delegate = delegate;
bufferedDelegate = null;
}
this.digest = digest; this.digest = digest;
} }
@ -46,7 +47,7 @@ public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
} }
public IndexOutput underlying() { public IndexOutput underlying() {
return this.out; return this.delegate;
} }
// don't override it, base class method simple reads from input and writes to this output // don't override it, base class method simple reads from input and writes to this output
@ -59,14 +60,18 @@ public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
try { try {
super.close(); super.close();
} finally { } finally {
out.close(); delegate.close();
} }
} }
@Override @Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException { protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
out.writeBytes(b, offset, len); if (bufferedDelegate != null) {
bufferedDelegate.flushBuffer(b, offset, len);
} else {
delegate.writeBytes(b, offset, len);
}
digest.update(b, offset, len); digest.update(b, offset, len);
} }
@ -77,8 +82,11 @@ public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
super.flush(); try {
out.flush(); super.flush();
} finally {
delegate.flush();
}
} }
@Override @Override
@ -87,21 +95,21 @@ public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
// but a checksum of the bytes written to this stream, which is the same for each // but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene // type of file in lucene
super.seek(pos); super.seek(pos);
out.seek(pos); delegate.seek(pos);
} }
@Override @Override
public long length() throws IOException { public long length() throws IOException {
return out.length(); return delegate.length();
} }
@Override @Override
public void setLength(long length) throws IOException { public void setLength(long length) throws IOException {
out.setLength(length); delegate.setLength(length);
} }
@Override @Override
public String toString() { public String toString() {
return out.toString(); return delegate.toString();
} }
} }

View File

@ -83,7 +83,7 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
private final StoreRateLimiting.Listener rateListener; private final StoreRateLimiting.Listener rateListener;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) { RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
// TODO if Lucene exposed in BufferedIndexOutput#getBufferSize, we could initialize it if the delegate is buffered super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
if (delegate instanceof BufferedIndexOutput) { if (delegate instanceof BufferedIndexOutput) {
bufferedDelegate = (BufferedIndexOutput) delegate; bufferedDelegate = (BufferedIndexOutput) delegate;
this.delegate = delegate; this.delegate = delegate;
@ -126,6 +126,11 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
} }
} }
@Override
public void setLength(long length) throws IOException {
delegate.setLength(length);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.store.BufferedChecksumIndexOutput;
import org.elasticsearch.common.lucene.store.ChecksumIndexOutput; import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;