From f0cfdc444f45d88777249b4ac38f98f95eb63568 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jun 2014 11:27:39 +0200 Subject: [PATCH] [STORE] Wrap RateLimiter rather than copy RateLimitedIndexOutput We clone RateLimitedIndexOutput from lucene just to collect pausing statistics we can do this in a more straight forward way in a delegating RateLimiter. Closes #6625 --- .../lucene/store/RateLimitedFSDirectory.java | 70 +++++-------------- 1 file changed, 19 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java b/src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java index 30dafb48df5..22032473262 100644 --- a/src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java +++ b/src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java @@ -22,7 +22,7 @@ import org.apache.lucene.store.IOContext.Context; import java.io.IOException; -public final class RateLimitedFSDirectory extends FilterDirectory{ +public final class RateLimitedFSDirectory extends FilterDirectory { private final StoreRateLimiting.Provider rateLimitingProvider; @@ -45,12 +45,9 @@ public final class RateLimitedFSDirectory extends FilterDirectory{ if (type == StoreRateLimiting.Type.NONE || limiter == null) { return output; } - if (context.context == Context.MERGE) { - // we are mering, and type is either MERGE or ALL, rate limit... - return new RateLimitedIndexOutput(limiter, rateListener, output); - } - if (type == StoreRateLimiting.Type.ALL) { - return new RateLimitedIndexOutput(limiter, rateListener, output); + if (context.context == Context.MERGE || type == StoreRateLimiting.Type.ALL) { + // we are merging, and type is either MERGE or ALL, rate limit... + return new RateLimitedIndexOutput(new RateLimiterWrapper(limiter, rateListener), output); } // we shouldn't really get here... return output; @@ -74,65 +71,36 @@ public final class RateLimitedFSDirectory extends FilterDirectory{ } } - - static final class RateLimitedIndexOutput extends BufferedIndexOutput { - - private final IndexOutput delegate; - private final BufferedIndexOutput bufferedDelegate; - private final RateLimiter rateLimiter; + // we wrap the limiter to notify our store if we limited to get statistics + static final class RateLimiterWrapper extends RateLimiter { + private final RateLimiter delegate; private final StoreRateLimiting.Listener rateListener; - private long bytesSinceLastRateLimit; - RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) { - super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE); - if (delegate instanceof BufferedIndexOutput) { - bufferedDelegate = (BufferedIndexOutput) delegate; - this.delegate = delegate; - } else { - this.delegate = delegate; - bufferedDelegate = null; - } - this.rateLimiter = rateLimiter; + RateLimiterWrapper(RateLimiter delegate, StoreRateLimiting.Listener rateListener) { + this.delegate = delegate; this.rateListener = rateListener; } @Override - protected void flushBuffer(byte[] b, int offset, int len) throws IOException { - bytesSinceLastRateLimit += len; - if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) { - long pause = rateLimiter.pause(bytesSinceLastRateLimit); - bytesSinceLastRateLimit = 0; - rateListener.onPause(pause); - } - if (bufferedDelegate != null) { - bufferedDelegate.flushBuffer(b, offset, len); - } else { - delegate.writeBytes(b, offset, len); - } - + public void setMbPerSec(double mbPerSec) { + delegate.setMbPerSec(mbPerSec); } @Override - public long length() throws IOException { - return delegate.length(); + public double getMbPerSec() { + return delegate.getMbPerSec(); } @Override - public void flush() throws IOException { - try { - super.flush(); - } finally { - delegate.flush(); - } + public long pause(long bytes) { + long pause = delegate.pause(bytes); + rateListener.onPause(pause); + return pause; } @Override - public void close() throws IOException { - try { - super.close(); - } finally { - delegate.close(); - } + public long getMinPauseCheckBytes() { + return delegate.getMinPauseCheckBytes(); } } }