[STORE] Wrap RateLimiter rather than copy RateLimitedIndexOutput

We clone RateLimitedIndexOutput from lucene just to collect pausing
statistics we can do this in a more straight forward way in a delegating
RateLimiter.

Closes #6625
This commit is contained in:
Simon Willnauer 2014-06-26 11:27:39 +02:00
parent 79af3228ad
commit f0cfdc444f
1 changed files with 19 additions and 51 deletions

View File

@ -22,7 +22,7 @@ import org.apache.lucene.store.IOContext.Context;
import java.io.IOException; import java.io.IOException;
public final class RateLimitedFSDirectory extends FilterDirectory{ public final class RateLimitedFSDirectory extends FilterDirectory {
private final StoreRateLimiting.Provider rateLimitingProvider; private final StoreRateLimiting.Provider rateLimitingProvider;
@ -45,12 +45,9 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
if (type == StoreRateLimiting.Type.NONE || limiter == null) { if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return output; return output;
} }
if (context.context == Context.MERGE) { if (context.context == Context.MERGE || type == StoreRateLimiting.Type.ALL) {
// we are mering, and type is either MERGE or ALL, rate limit... // we are merging, and type is either MERGE or ALL, rate limit...
return new RateLimitedIndexOutput(limiter, rateListener, output); return new RateLimitedIndexOutput(new RateLimiterWrapper(limiter, rateListener), output);
}
if (type == StoreRateLimiting.Type.ALL) {
return new RateLimitedIndexOutput(limiter, rateListener, output);
} }
// we shouldn't really get here... // we shouldn't really get here...
return output; return output;
@ -74,65 +71,36 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
} }
} }
// we wrap the limiter to notify our store if we limited to get statistics
static final class RateLimitedIndexOutput extends BufferedIndexOutput { static final class RateLimiterWrapper extends RateLimiter {
private final RateLimiter delegate;
private final IndexOutput delegate;
private final BufferedIndexOutput bufferedDelegate;
private final RateLimiter rateLimiter;
private final StoreRateLimiting.Listener rateListener; private final StoreRateLimiting.Listener rateListener;
private long bytesSinceLastRateLimit;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) { RateLimiterWrapper(RateLimiter delegate, StoreRateLimiting.Listener rateListener) {
super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE); this.delegate = delegate;
if (delegate instanceof BufferedIndexOutput) {
bufferedDelegate = (BufferedIndexOutput) delegate;
this.delegate = delegate;
} else {
this.delegate = delegate;
bufferedDelegate = null;
}
this.rateLimiter = rateLimiter;
this.rateListener = rateListener; this.rateListener = rateListener;
} }
@Override @Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException { public void setMbPerSec(double mbPerSec) {
bytesSinceLastRateLimit += len; delegate.setMbPerSec(mbPerSec);
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
rateListener.onPause(pause);
}
if (bufferedDelegate != null) {
bufferedDelegate.flushBuffer(b, offset, len);
} else {
delegate.writeBytes(b, offset, len);
}
} }
@Override @Override
public long length() throws IOException { public double getMbPerSec() {
return delegate.length(); return delegate.getMbPerSec();
} }
@Override @Override
public void flush() throws IOException { public long pause(long bytes) {
try { long pause = delegate.pause(bytes);
super.flush(); rateListener.onPause(pause);
} finally { return pause;
delegate.flush();
}
} }
@Override @Override
public void close() throws IOException { public long getMinPauseCheckBytes() {
try { return delegate.getMinPauseCheckBytes();
super.close();
} finally {
delegate.close();
}
} }
} }
} }