LUCENE-5641: add RateLimiter.getMinPauseCheckBytes for callers to know minimal number of bytes before consulting rate limiter

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1592606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-05-05 18:54:47 +00:00
parent e9d549b889
commit 4c2cca57dd
4 changed files with 118 additions and 26 deletions

View File

@ -162,6 +162,9 @@ Bug fixes
stack frame per character in the expanded suggestion (Robert Muir,
Simon Willnauer, Mike McCandless).
* LUCENE-5641: SimpleRateLimiter would silently rate limit at 8 MB/sec
even if you asked for higher rates. (Mike McCandless)
Test Framework
* LUCENE-5622: Fail tests if they print over the given limit of bytes to

View File

@ -28,6 +28,9 @@ final class RateLimitedIndexOutput extends BufferedIndexOutput {
private final BufferedIndexOutput bufferedDelegate;
private final RateLimiter rateLimiter;
/** How many bytes we've written since we last called rateLimiter.pause. */
private long bytesSinceLastPause;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
// TODO should we make buffer size configurable
if (delegate instanceof BufferedIndexOutput) {
@ -42,7 +45,11 @@ final class RateLimitedIndexOutput extends BufferedIndexOutput {
@Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
rateLimiter.pause(len);
bytesSinceLastPause += len;
if (bytesSinceLastPause > rateLimiter.getMinPauseCheckBytes()) {
rateLimiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0;
}
if (bufferedDelegate != null) {
bufferedDelegate.flushBuffer(b, offset, len);
} else {

View File

@ -22,14 +22,15 @@ import org.apache.lucene.util.ThreadInterruptedException;
/** Abstract base class to rate limit IO. Typically implementations are
* shared across multiple IndexInputs or IndexOutputs (for example
* those involved all merging). Those IndexInputs and
* IndexOutputs would call {@link #pause} whenever they
* want to read bytes or write bytes. */
* IndexOutputs would call {@link #pause} whenever the have read
* or written more than {@link #getMinPauseCheckBytes} bytes. */
public abstract class RateLimiter {
/**
* Sets an updated mb per second rate limit.
*/
public abstract void setMbPerSec(double mbPerSec);
/**
* The current mb per second rate limit.
*/
@ -44,13 +45,19 @@ public abstract class RateLimiter {
* */
public abstract long pause(long bytes);
/** How many bytes caller should add up itself before invoking {@link #pause}. */
public abstract long getMinPauseCheckBytes();
/**
* Simple class to rate limit IO.
*/
public static class SimpleRateLimiter extends RateLimiter {
private final static int MIN_PAUSE_CHECK_MSEC = 5;
private volatile double mbPerSec;
private volatile double nsPerByte;
private volatile long lastNS;
private volatile long minPauseCheckBytes;
private long lastNS;
// TODO: we could also allow eg a sub class to dynamically
// determine the allowed rate, eg if an app wants to
@ -67,8 +74,12 @@ public abstract class RateLimiter {
@Override
public void setMbPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
nsPerByte = 1000000000. / (1024*1024*mbPerSec);
minPauseCheckBytes = (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024);
}
@Override
public long getMinPauseCheckBytes() {
return minPauseCheckBytes;
}
/**
@ -80,35 +91,52 @@ public abstract class RateLimiter {
}
/** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. NOTE: multiple threads
* may safely use this, however the implementation is
* not perfectly thread safe but likely in practice this
* is harmless (just means in some rare cases the rate
* might exceed the target). It's best to call this
* with a biggish count, not one byte at a time.
* @return the pause time in nano seconds
* */
* rate at or below the target. Be sure to only call
* this method when bytes > {@link #getMinPauseCheckBytes},
* otherwise it will pause way too long!
*
* @return the pause time in nano seconds */
@Override
public long pause(long bytes) {
if (bytes == 1) {
return 0;
long startNS = System.nanoTime();
double secondsToPause = (bytes/1024./1024.) / mbPerSec;
long targetNS;
// Sync'd to read + write lastNS:
synchronized (this) {
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
targetNS = lastNS + (long) (1000000000 * secondsToPause);
if (startNS >= targetNS) {
// OK, current time is already beyond the target sleep time,
// no pausing to do.
// Set to startNS, not targetNS, to enforce the instant rate, not
// the "averaaged over all history" rate:
lastNS = startNS;
return 0;
}
lastNS = targetNS;
}
// TODO: this is purely instantaneous rate; maybe we
// should also offer decayed recent history one?
final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
final long startNS;
long curNS = startNS = System.nanoTime();
if (lastNS < curNS) {
lastNS = curNS;
}
long curNS = startNS;
// While loop because Thread.sleep doesn't always sleep
// enough:
while(true) {
while (true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
// NOTE: except maybe on real-time JVMs, minimum realistic sleep time
// is 1 msec; if you pass just 1 nsec the default impl rounds
// this up to 1 msec:
Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
@ -118,6 +146,7 @@ public abstract class RateLimiter {
}
break;
}
return curNS - startNS;
}
}

View File

@ -16,10 +16,17 @@ package org.apache.lucene.store;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Simple testcase for RateLimiter.SimpleRateLimiter
@ -37,4 +44,50 @@ public final class TestRateLimiter extends LuceneTestCase {
assertTrue("we should sleep less than 2 seconds but did: " + convert + " millis", convert < 2000l);
assertTrue("we should sleep at least 1 second but did only: " + convert + " millis", convert > 1000l);
}
public void testThreads() throws Exception {
double targetMBPerSec = 10.0 + 20 * random().nextDouble();
final SimpleRateLimiter limiter = new SimpleRateLimiter(targetMBPerSec);
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 3, 6)];
final AtomicLong totBytes = new AtomicLong();
for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
long bytesSinceLastPause = 0;
for(int i=0;i<500;i++) {
long numBytes = TestUtil.nextInt(random(), 1000, 10000);
totBytes.addAndGet(numBytes);
bytesSinceLastPause += numBytes;
if (bytesSinceLastPause > limiter.getMinPauseCheckBytes()) {
limiter.pause(bytesSinceLastPause);
bytesSinceLastPause = 0;
}
}
}
};
threads[i].start();
}
long startNS = System.nanoTime();
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
long endNS = System.nanoTime();
double actualMBPerSec = (totBytes.get()/1024/1024.)/((endNS-startNS)/1000000000.0);
// TODO: this may false trip .... could be we can only assert that it never exceeds the max, so slow jenkins doesn't trip:
double ratio = actualMBPerSec/targetMBPerSec;
assertTrue("targetMBPerSec=" + targetMBPerSec + " actualMBPerSec=" + actualMBPerSec, ratio >= 0.9 && ratio <= 1.1);
}
}