LUCENE-4537: Separate RateLimiter from Directory

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1407268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2012-11-08 21:24:43 +00:00
parent f57cdb8fc3
commit d8d3dc2a13
8 changed files with 410 additions and 130 deletions

View File

@ -95,7 +95,11 @@ API Changes
* LUCENE-4520: ValueSource.getSortField no longer throws IOExceptions
(Alan Woodward)
* LUCENE-4537: RateLimiter is now separated from FSDirectory and exposed via
RateLimitingDirectoryWrapper. Any Directory can now be rate-limited.
(Simon Willnauer)
Bug Fixes
* LUCENE-1822: BaseFragListBuilder hard-coded 6 char margin is too naive.

View File

@ -123,9 +123,6 @@ public abstract class FSDirectory extends Directory {
protected final Set<String> staleFiles = synchronizedSet(new HashSet<String>()); // Files written, but not yet sync'ed
private int chunkSize = DEFAULT_READ_CHUNK_SIZE; // LUCENE-1566
// null means no limit
private volatile RateLimiter mergeWriteRateLimiter;
// returns the canonical version of the directory, creating it if it doesn't exist.
private static File getCanonicalPath(File file) throws IOException {
return new File(file.getCanonicalPath());
@ -286,51 +283,7 @@ public abstract class FSDirectory extends Directory {
ensureOpen();
ensureCanWrite(name);
return new FSIndexOutput(this, name, context.context == IOContext.Context.MERGE ? mergeWriteRateLimiter : null);
}
/** Sets the maximum (approx) MB/sec allowed by all write
* IO performed by merging. Pass null to have no limit.
*
* <p><b>NOTE</b>: if merges are already running there is
* no guarantee this new rate will apply to them; it will
* only apply for certain to new merges.
*
* @lucene.experimental */
public void setMaxMergeWriteMBPerSec(Double mbPerSec) {
RateLimiter limiter = mergeWriteRateLimiter;
if (mbPerSec == null) {
if (limiter != null) {
limiter.setMbPerSec(Double.MAX_VALUE);
mergeWriteRateLimiter = null;
}
} else if (limiter != null) {
limiter.setMbPerSec(mbPerSec);
} else {
mergeWriteRateLimiter = new RateLimiter(mbPerSec);
}
}
/**
* Sets the rate limiter to be used to limit (approx) MB/sec allowed
* by all IO performed when merging. Pass null to have no limit.
*
* <p>Passing an instance of rate limiter compared to setting it using
* {@link #setMaxMergeWriteMBPerSec(Double)} allows to use the same limiter
* instance across several directories globally limiting IO when merging
* across them.
*
* @lucene.experimental */
public void setMaxMergeWriteLimiter(RateLimiter mergeWriteRateLimiter) {
this.mergeWriteRateLimiter = mergeWriteRateLimiter;
}
/** See {@link #setMaxMergeWriteMBPerSec}.
*
* @lucene.experimental */
public Double getMaxMergeWriteMBPerSec() {
RateLimiter limiter = mergeWriteRateLimiter;
return limiter == null ? null : limiter.getMbPerSec();
return new FSIndexOutput(this, name);
}
protected void ensureCanWrite(String name) throws IOException {
@ -504,23 +457,18 @@ public abstract class FSDirectory extends Directory {
private final String name;
private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
private final RateLimiter rateLimiter;
public FSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter) throws IOException {
public FSIndexOutput(FSDirectory parent, String name) throws IOException {
this.parent = parent;
this.name = name;
file = new RandomAccessFile(new File(parent.directory, name), "rw");
isOpen = true;
this.rateLimiter = rateLimiter;
}
/** output methods: */
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
assert isOpen;
if (rateLimiter != null) {
rateLimiter.pause(size);
}
file.write(b, offset, size);
}

View File

@ -0,0 +1,223 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.store.IOContext.Context;
/**
*
* A {@link Directory} wrapper that allows {@link IndexOutput} rate limiting using
* {@link Context IO context} specific {@link RateLimiter rate limiters}.
*
* @see #setRateLimiter(RateLimiter, Context)
* @lucene.experimental
*/
public final class RateLimitedDirectoryWrapper extends Directory {
private final Directory delegate;
// we need to be volatile here to make sure we see all the values that are set
// / modified concurrently
private volatile RateLimiter[] contextRateLimiters = new RateLimiter[IOContext.Context
.values().length];
public RateLimitedDirectoryWrapper(Directory wrapped) {
this.delegate = wrapped;
}
public String[] listAll() throws IOException {
ensureOpen();
return delegate.listAll();
}
public boolean fileExists(String name) throws IOException {
ensureOpen();
return delegate.fileExists(name);
}
public void deleteFile(String name) throws IOException {
ensureOpen();
delegate.deleteFile(name);
}
public long fileLength(String name) throws IOException {
ensureOpen();
return delegate.fileLength(name);
}
public IndexOutput createOutput(String name, IOContext context)
throws IOException {
ensureOpen();
final IndexOutput output = delegate.createOutput(name, context);
final RateLimiter limiter = getRateLimiter(context.context);
if (limiter != null) {
return new RateLimitedIndexOutput(limiter, output);
}
return output;
}
public void sync(Collection<String> names) throws IOException {
ensureOpen();
delegate.sync(names);
}
public IndexInput openInput(String name, IOContext context)
throws IOException {
ensureOpen();
return delegate.openInput(name, context);
}
public void close() throws IOException {
isOpen = false;
delegate.close();
}
public IndexInputSlicer createSlicer(String name, IOContext context)
throws IOException {
ensureOpen();
return delegate.createSlicer(name, context);
}
@Override
public Lock makeLock(String name) {
ensureOpen();
return delegate.makeLock(name);
}
@Override
public void clearLock(String name) throws IOException {
ensureOpen();
delegate.clearLock(name);
}
@Override
public void setLockFactory(LockFactory lockFactory) throws IOException {
ensureOpen();
delegate.setLockFactory(lockFactory);
}
@Override
public LockFactory getLockFactory() {
ensureOpen();
return delegate.getLockFactory();
}
@Override
public String getLockID() {
ensureOpen();
return delegate.getLockID();
}
@Override
public String toString() {
return "RateLimitedDirectoryWrapper(" + delegate.toString() + ")";
}
@Override
public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
ensureOpen();
delegate.copy(to, src, dest, context);
}
private RateLimiter getRateLimiter(IOContext.Context context) {
assert context != null;
return contextRateLimiters[context.ordinal()];
}
/**
* Sets the maximum (approx) MB/sec allowed by all write IO performed by
* {@link IndexOutput} created with the given {@link IOContext.Context}. Pass
* <code>null</code> to have no limit.
*
* <p>
* <b>NOTE</b>: For already created {@link IndexOutput} instances there is no
* guarantee this new rate will apply to them; it will only be guaranteed to
* apply for new created {@link IndexOutput} instances.
* <p>
* <b>NOTE</b>: this is an optional operation and might not be respected by
* all Directory implementations. Currently only {@link FSDirectory buffered}
* Directory implementations use rate-limiting.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public void setMaxWriteMBPerSec(Double mbPerSec, IOContext.Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
final int ord = context.ordinal();
final RateLimiter limiter = contextRateLimiters[ord];
if (mbPerSec == null) {
if (limiter != null) {
limiter.setMbPerSec(Double.MAX_VALUE);
contextRateLimiters[ord] = null;
}
} else if (limiter != null) {
limiter.setMbPerSec(mbPerSec);
contextRateLimiters[ord] = limiter; // cross the mem barrier again
} else {
contextRateLimiters[ord] = new RateLimiter.SimpleRateLimiter(mbPerSec);
}
}
/**
* Sets the rate limiter to be used to limit (approx) MB/sec allowed by all IO
* performed with the given {@link Context context}. Pass <code>null</code> to
* have no limit.
*
* <p>
* Passing an instance of rate limiter compared to setting it using
* {@link #setMaxWriteMBPerSec(Double, org.apache.lucene.store.IOContext.Context)}
* allows to use the same limiter instance across several directories globally
* limiting IO across them.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public void setRateLimiter(RateLimiter mergeWriteRateLimiter,
Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
contextRateLimiters[context.ordinal()] = mergeWriteRateLimiter;
}
/**
* See {@link #setMaxWriteMBPerSec}.
*
* @throws IllegalArgumentException
* if context is <code>null</code>
* @throws AlreadyClosedException if the {@link Directory} is already closed
* @lucene.experimental
*/
public Double getMaxWriteMBPerSec(IOContext.Context context) {
ensureOpen();
if (context == null) {
throw new IllegalArgumentException("Context must not be null");
}
RateLimiter limiter = getRateLimiter(context);
return limiter == null ? null : limiter.getMbPerSec();
}
}

View File

@ -0,0 +1,76 @@
package org.apache.lucene.store;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/**
* A {@link RateLimiter rate limiting} {@link IndexOutput}
*
* @lucene.internal
*/
final class RateLimitedIndexOutput extends BufferedIndexOutput {
private final IndexOutput delegate;
private final BufferedIndexOutput bufferedDelegate;
private final RateLimiter rateLimiter;
RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
// TODO should we make buffer size configurable
if (delegate instanceof BufferedIndexOutput) {
bufferedDelegate = (BufferedIndexOutput) delegate;
this.delegate = delegate;
} else {
this.delegate = delegate;
bufferedDelegate = null;
}
this.rateLimiter = rateLimiter;
}
@Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
rateLimiter.pause(len);
if (bufferedDelegate != null) {
bufferedDelegate.flushBuffer(b, offset, len);
} else {
delegate.writeBytes(b, offset, len);
}
}
@Override
public long length() throws IOException {
return delegate.length();
}
@Override
public void flush() throws IOException {
try {
super.flush();
} finally {
delegate.flush();
}
}
@Override
public void close() throws IOException {
try {
super.close();
} finally {
delegate.close();
}
}
}

View File

@ -19,75 +19,102 @@ package org.apache.lucene.store;
import org.apache.lucene.util.ThreadInterruptedException;
/** Simple class to rate limit IO. Typically it's shared
* across multiple IndexInputs or IndexOutputs (for example
/** Abstract base class to rate limit IO. Typically implementations are
* shared across multiple IndexInputs or IndexOutputs (for example
* those involved all merging). Those IndexInputs and
* IndexOutputs would call {@link #pause} whenever they
* want to read bytes or write bytes. */
public class RateLimiter {
private volatile double mbPerSec;
private volatile double nsPerByte;
private volatile long lastNS;
// TODO: we could also allow eg a sub class to dynamically
// determine the allowed rate, eg if an app wants to
// change the allowed rate over time or something
/** mbPerSec is the MB/sec max IO rate */
public RateLimiter(double mbPerSec) {
setMbPerSec(mbPerSec);
}
public abstract class RateLimiter {
/**
* Sets an updated mb per second rate limit.
*/
public void setMbPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
nsPerByte = 1000000000. / (1024*1024*mbPerSec);
}
public abstract void setMbPerSec(double mbPerSec);
/**
* The current mb per second rate limit.
*/
public double getMbPerSec() {
return this.mbPerSec;
}
public abstract double getMbPerSec();
/** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. NOTE: multiple threads
* may safely use this, however the implementation is
* not perfectly thread safe but likely in practice this
* is harmless (just means in some rare cases the rate
* might exceed the target). It's best to call this
* with a biggish count, not one byte at a time. */
public void pause(long bytes) {
if (bytes == 1) {
return;
* rate at or below the target.
* <p>
* Note: the implementation is thread-safe
* </p>
* @return the pause time in nano seconds
* */
public abstract long pause(long bytes);
/**
* Simple class to rate limit IO.
*/
public static class SimpleRateLimiter extends RateLimiter {
private volatile double mbPerSec;
private volatile double nsPerByte;
private volatile long lastNS;
// TODO: we could also allow eg a sub class to dynamically
// determine the allowed rate, eg if an app wants to
// change the allowed rate over time or something
/** mbPerSec is the MB/sec max IO rate */
public SimpleRateLimiter(double mbPerSec) {
setMbPerSec(mbPerSec);
}
// TODO: this is purely instantaneous rate; maybe we
// should also offer decayed recent history one?
final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
long curNS = System.nanoTime();
if (lastNS < curNS) {
lastNS = curNS;
/**
* Sets an updated mb per second rate limit.
*/
public void setMbPerSec(double mbPerSec) {
this.mbPerSec = mbPerSec;
nsPerByte = 1000000000. / (1024*1024*mbPerSec);
}
// While loop because Thread.sleep doesn't always sleep
// enough:
while(true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
curNS = System.nanoTime();
continue;
/**
* The current mb per second rate limit.
*/
public double getMbPerSec() {
return this.mbPerSec;
}
/** Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. NOTE: multiple threads
* may safely use this, however the implementation is
* not perfectly thread safe but likely in practice this
* is harmless (just means in some rare cases the rate
* might exceed the target). It's best to call this
* with a biggish count, not one byte at a time.
* @return the pause time in nano seconds
* */
public long pause(long bytes) {
if (bytes == 1) {
return 0;
}
break;
// TODO: this is purely instantaneous rate; maybe we
// should also offer decayed recent history one?
final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
long curNS = System.nanoTime();
if (lastNS < curNS) {
lastNS = curNS;
}
// While loop because Thread.sleep doesn't always sleep
// enough:
while(true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
curNS = System.nanoTime();
continue;
}
break;
}
return targetNS;
}
}
}

View File

@ -96,8 +96,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// is made to delete an open file, we enroll it here.
private Set<String> openFilesDeleted;
final RateLimiter rateLimiter;
private synchronized void init() {
if (openFiles == null) {
openFiles = new HashMap<String,Integer>();
@ -120,19 +118,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
.mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null);
// force wrapping of lockfactory
this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
// 2% of the time use rate limiter
if (randomState.nextInt(50) == 17) {
// Use RateLimiter
double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
}
rateLimiter = new RateLimiter(maxMBPerSec);
} else {
rateLimiter = null;
}
init();
}
@ -447,7 +432,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
ramdir.fileMap.put(name, file);
}
}
//System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
addFileHandle(io, name, Handle.Output);
@ -455,7 +439,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
if (throttling == Throttling.ALWAYS ||
(throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
(throttling == Throttling.SOMETIMES && randomState.nextInt(50) == 0) && !(delegate instanceof RateLimitedDirectoryWrapper)) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: throttling indexOutput");
}

View File

@ -77,11 +77,6 @@ public class MockIndexOutputWrapper extends IndexOutput {
public void writeBytes(byte[] b, int offset, int len) throws IOException {
long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
long realUsage = 0;
if (dir.rateLimiter != null && len >= 1000) {
dir.rateLimiter.pause(len);
}
// If MockRAMDir crashed since we were opened, then
// don't write anything:
if (dir.crashed)

View File

@ -37,6 +37,7 @@ import org.apache.lucene.search.*;
import org.apache.lucene.search.FieldCache.CacheEntry;
import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
import org.apache.lucene.store.*;
import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.util.FieldCacheSanityChecker.Insanity;
import org.junit.*;
@ -947,6 +948,27 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(random)) {
directory = new NRTCachingDirectory(directory, random.nextDouble(), random.nextDouble());
}
if (rarely(random)) {
final double maxMBPerSec = 10 + 5*(random.nextDouble()-0.5);
if (LuceneTestCase.VERBOSE) {
System.out.println("LuceneTestCase: will rate limit output IndexOutput to " + maxMBPerSec + " MB/sec");
}
final RateLimitedDirectoryWrapper rateLimitedDirectoryWrapper = new RateLimitedDirectoryWrapper(directory);
switch (random.nextInt(10)) {
case 3: // sometimes rate limit on flush
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
break;
case 2: // sometimes rate limit flush & merge
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.FLUSH);
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
break;
default:
rateLimitedDirectoryWrapper.setMaxWriteMBPerSec(maxMBPerSec, Context.MERGE);
}
directory = rateLimitedDirectoryWrapper;
}
if (bare) {
BaseDirectoryWrapper base = new BaseDirectoryWrapper(directory);
@ -954,6 +976,7 @@ public abstract class LuceneTestCase extends Assert {
return base;
} else {
MockDirectoryWrapper mock = new MockDirectoryWrapper(random, directory);
mock.setThrottling(TEST_THROTTLING);
closeAfterSuite(new CloseableDirectory(mock, suiteFailureMarker));
return mock;