only use rateLimiter for sleeping in MDW/MockIO

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1392604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-10-01 21:07:37 +00:00
parent 251744ce34
commit 637541add4
3 changed files with 22 additions and 177 deletions

View File

@ -37,7 +37,6 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThrottledIndexOutput;
import org.apache.lucene.util._TestUtil;
/**
@ -73,7 +72,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
private Set<String> openFilesForWrite = new HashSet<String>();
Set<String> openLocks = Collections.synchronizedSet(new HashSet<String>());
volatile boolean crashed;
private ThrottledIndexOutput throttledOutput;
private Throttling throttling = Throttling.SOMETIMES;
final AtomicInteger inputCloneCount = new AtomicInteger();
@ -112,22 +110,16 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// called from different threads; else test failures may
// not be reproducible from the original seed
this.randomState = new Random(random.nextInt());
this.throttledOutput = new ThrottledIndexOutput(ThrottledIndexOutput
.mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null);
// force wrapping of lockfactory
this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
// 2% of the time use rate limiter
if (randomState.nextInt(50) == 17) {
// Use RateLimiter
double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
}
rateLimiter = new RateLimiter(maxMBPerSec);
} else {
rateLimiter = null;
// NOTE: we init rateLimiter always but we only
// sometimes use it (by default) in createOutput:
double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5);
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec");
}
rateLimiter = new RateLimiter(maxMBPerSec);
init();
}
@ -441,22 +433,25 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
ramdir.fileMap.put(name, file);
}
}
//System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name);
addFileHandle(io, name, Handle.Output);
openFilesForWrite.add(name);
RateLimiter thisRateLimiter;
// throttling REALLY slows down tests, so don't do it very often for SOMETIMES.
if (throttling == Throttling.ALWAYS ||
(throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: throttling indexOutput");
}
return throttledOutput.newFromDelegate(io);
thisRateLimiter = rateLimiter;
} else {
return io;
thisRateLimiter = null;
}
//System.out.println(Thread.currentThread().getName() + ": MDW: create " + name);
IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name, thisRateLimiter);
addFileHandle(io, name, Handle.Output);
openFilesForWrite.add(name);
return io;
}
private static enum Handle {

View File

@ -32,15 +32,17 @@ public class MockIndexOutputWrapper extends IndexOutput {
private MockDirectoryWrapper dir;
private final IndexOutput delegate;
private boolean first=true;
private final RateLimiter rateLimiter;
final String name;
byte[] singleByte = new byte[1];
/** Construct an empty output buffer. */
public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name) {
public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name, RateLimiter rateLimiter) {
this.dir = dir;
this.name = name;
this.delegate = delegate;
this.rateLimiter = rateLimiter;
}
@Override
@ -78,8 +80,8 @@ public class MockIndexOutputWrapper extends IndexOutput {
long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes();
long realUsage = 0;
if (dir.rateLimiter != null && len >= 1000) {
dir.rateLimiter.pause(len);
if (rateLimiter != null && len >= 1000) {
rateLimiter.pause(len);
}
// If MockRAMDir crashed since we were opened, then

View File

@ -1,152 +0,0 @@
package org.apache.lucene.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
/**
* Intentionally slow IndexOutput for testing.
*/
public class ThrottledIndexOutput extends IndexOutput {
public static final int DEFAULT_MIN_WRITTEN_BYTES = 1024;
private final int bytesPerSecond;
private IndexOutput delegate;
private long flushDelayMillis;
private long closeDelayMillis;
private long seekDelayMillis;
private long pendingBytes;
private long minBytesWritten;
private long timeElapsed;
private final byte[] bytes = new byte[1];
public ThrottledIndexOutput newFromDelegate(IndexOutput output) {
return new ThrottledIndexOutput(bytesPerSecond, flushDelayMillis,
closeDelayMillis, seekDelayMillis, minBytesWritten, output);
}
public ThrottledIndexOutput(int bytesPerSecond, long delayInMillis,
IndexOutput delegate) {
this(bytesPerSecond, delayInMillis, delayInMillis, delayInMillis,
DEFAULT_MIN_WRITTEN_BYTES, delegate);
}
public ThrottledIndexOutput(int bytesPerSecond, long delays,
int minBytesWritten, IndexOutput delegate) {
this(bytesPerSecond, delays, delays, delays, minBytesWritten, delegate);
}
public static final int mBitsToBytes(int mbits) {
return mbits * 125000;
}
public ThrottledIndexOutput(int bytesPerSecond, long flushDelayMillis,
long closeDelayMillis, long seekDelayMillis, long minBytesWritten,
IndexOutput delegate) {
assert bytesPerSecond > 0;
this.delegate = delegate;
this.bytesPerSecond = bytesPerSecond;
this.flushDelayMillis = flushDelayMillis;
this.closeDelayMillis = closeDelayMillis;
this.seekDelayMillis = seekDelayMillis;
this.minBytesWritten = minBytesWritten;
}
@Override
public void flush() throws IOException {
sleep(flushDelayMillis);
delegate.flush();
}
@Override
public void close() throws IOException {
try {
sleep(closeDelayMillis + getDelay(true));
} finally {
delegate.close();
}
}
@Override
public long getFilePointer() {
return delegate.getFilePointer();
}
@Override
public void seek(long pos) throws IOException {
sleep(seekDelayMillis);
delegate.seek(pos);
}
@Override
public long length() throws IOException {
return delegate.length();
}
@Override
public void writeByte(byte b) throws IOException {
bytes[0] = b;
writeBytes(bytes, 0, 1);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
final long before = System.nanoTime();
delegate.writeBytes(b, offset, length);
timeElapsed += System.nanoTime() - before;
pendingBytes += length;
sleep(getDelay(false));
}
protected long getDelay(boolean closing) {
if (pendingBytes > 0 && (closing || pendingBytes > minBytesWritten)) {
long actualBps = (timeElapsed / pendingBytes) * 1000000000l; // nano to sec
if (actualBps > bytesPerSecond) {
long expected = (pendingBytes * 1000l / bytesPerSecond) ;
final long delay = expected - (timeElapsed / 1000000l) ;
pendingBytes = 0;
timeElapsed = 0;
return delay;
}
}
return 0;
}
private static final void sleep(long ms) {
if (ms <= 0)
return;
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
@Override
public void setLength(long length) throws IOException {
delegate.setLength(length);
}
@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
delegate.copyBytes(input, numBytes);
}
}