LUCENE-3416: Allow to pass an instance of RateLimiter to FSDirectory allowing to rate limit merge IO across several directories / instances

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1167353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-09-09 20:35:38 +00:00
parent 710e77f5f7
commit 397d68e080
2 changed files with 37 additions and 11 deletions

View File

@ -123,8 +123,7 @@ public abstract class FSDirectory extends Directory {
private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566 private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
// null means no limite // null means no limite
private Double maxMergeWriteMBPerSec; private volatile RateLimiter mergeWriteRateLimiter;
private RateLimiter mergeWriteRateLimiter;
// returns the canonical version of the directory, creating it if it doesn't exist. // returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException { private static File getCanonicalPath(File file) throws IOException {
@ -305,25 +304,40 @@ public abstract class FSDirectory extends Directory {
* only apply for certain to new merges. * only apply for certain to new merges.
* *
* @lucene.experimental */ * @lucene.experimental */
public synchronized void setMaxMergeWriteMBPerSec(Double mbPerSec) { public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
maxMergeWriteMBPerSec = mbPerSec; RateLimiter limiter = mergeWriteRateLimiter;
if (mbPerSec == null) { if (mbPerSec == null) {
if (mergeWriteRateLimiter != null) { if (limiter != null) {
mergeWriteRateLimiter.setMaxRate(Double.MAX_VALUE); limiter.setMbPerSec(Double.MAX_VALUE);
mergeWriteRateLimiter = null; mergeWriteRateLimiter = null;
} }
} else if (mergeWriteRateLimiter != null) { } else if (limiter != null) {
mergeWriteRateLimiter.setMaxRate(mbPerSec); limiter.setMbPerSec(mbPerSec);
} else { } else {
mergeWriteRateLimiter = new RateLimiter(mbPerSec); mergeWriteRateLimiter = new RateLimiter(mbPerSec);
} }
} }
/**
* Sets the rate limiter to be used to limit (approx) MB/sec allowed
* by all IO performed when merging. Pass null to have no limit.
*
* <p>Passing an instance of rate limiter compared to setting it using
* {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
* instance across several directories globally limiting IO when merging
* across them.
*
* @lucene.experimental */
public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
this.mergeWriteRateLimiter = mergeWriteRateLimiter;
}
/** See {@link #setMaxMergeWriteMBPerSec}. /** See {@link #setMaxMergeWriteMBPerSec}.
* *
* @lucene.experimental */ * @lucene.experimental */
public Double getMaxMergeWriteMBPerSec() { public Double getMaxMergeWriteMBPerSec() {
return maxMergeWriteMBPerSec; RateLimiter limiter = mergeWriteRateLimiter;
return limiter == null ? null : limiter.getMbPerSec();
} }
protected void ensureCanWrite(String name) throws IOException { protected void ensureCanWrite(String name) throws IOException {

View File

@ -26,6 +26,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* want to read bytes or write bytes. */ * want to read bytes or write bytes. */
public class RateLimiter { public class RateLimiter {
private volatile double mbPerSec;
private volatile double nsPerByte; private volatile double nsPerByte;
private volatile long lastNS; private volatile long lastNS;
@ -35,13 +36,24 @@ public class RateLimiter {
/** mbPerSec is the MB/sec max IO rate */ /** mbPerSec is the MB/sec max IO rate */
public RateLimiter(double mbPerSec) { public RateLimiter(double mbPerSec) {
setMaxRate(mbPerSec); setMbPerSec(mbPerSec);
} }
public void setMaxRate(double mbPerSec) { /**
* Sets an updated mb per second rate limit.
*/
public void setMbPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
nsPerByte = 1000000000. / (1024*1024*mbPerSec); nsPerByte = 1000000000. / (1024*1024*mbPerSec);
} }
/**
* The current mb per second rate limit.
*/
public double getMbPerSec() {
return this.mbPerSec;
}
/** Pauses, if necessary, to keep the instantaneous IO /** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. NOTE: multiple threads * rate at or below the target. NOTE: multiple threads
* may safely use this, however the implementation is * may safely use this, however the implementation is