diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java index fcb66de5e32..0c24d7ba49f 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java @@ -41,6 +41,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoDeletionPolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.ThrottledIndexOutput; import org.apache.lucene.util._TestUtil; /** @@ -76,6 +77,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { private Set openFilesForWrite = new HashSet(); Set openLocks = Collections.synchronizedSet(new HashSet()); volatile boolean crashed; + private ThrottledIndexOutput throttledOutput; private Throttling throttling = Throttling.SOMETIMES; final AtomicInteger inputCloneCount = new AtomicInteger(); @@ -114,16 +116,22 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { // called from different threads; else test failures may // not be reproducible from the original seed this.randomState = new Random(random.nextInt()); + this.throttledOutput = new ThrottledIndexOutput(ThrottledIndexOutput + .mBitsToBytes(40 + randomState.nextInt(10)), 5 + randomState.nextInt(5), null); // force wrapping of lockfactory this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory()); - // NOTE: we init rateLimiter always but we only - // sometimes use it (by default) in createOutput: - double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5); - if (LuceneTestCase.VERBOSE) { - System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec"); + // 2% of the time use rate limiter + if (randomState.nextInt(50) == 17) { + // Use RateLimiter + double maxMBPerSec = 10 + 5*(randomState.nextDouble()-0.5); + if (LuceneTestCase.VERBOSE) { + System.out.println("MockDirectoryWrapper: will rate limit output IO to " + maxMBPerSec + " MB/sec"); + } + rateLimiter = new RateLimiter(maxMBPerSec); + } else { + rateLimiter = null; } - rateLimiter = new RateLimiter(maxMBPerSec); init(); } @@ -439,25 +447,22 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { ramdir.fileMap.put(name, file); } } - - RateLimiter thisRateLimiter; - + + //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name); + IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name); + addFileHandle(io, name, Handle.Output); + openFilesForWrite.add(name); + // throttling REALLY slows down tests, so don't do it very often for SOMETIMES. if (throttling == Throttling.ALWAYS || (throttling == Throttling.SOMETIMES && rateLimiter == null && randomState.nextInt(50) == 0)) { if (LuceneTestCase.VERBOSE) { System.out.println("MockDirectoryWrapper: throttling indexOutput"); } - thisRateLimiter = rateLimiter; + return throttledOutput.newFromDelegate(io); } else { - thisRateLimiter = null; + return io; } - - //System.out.println(Thread.currentThread().getName() + ": MDW: create " + name); - IndexOutput io = new MockIndexOutputWrapper(this, delegate.createOutput(name, LuceneTestCase.newIOContext(randomState, context)), name, thisRateLimiter); - addFileHandle(io, name, Handle.Output); - openFilesForWrite.add(name); - return io; } private static enum Handle { diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java index fd77adcd962..058dc978efc 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockIndexOutputWrapper.java @@ -32,17 +32,15 @@ public class MockIndexOutputWrapper extends IndexOutput { private MockDirectoryWrapper dir; private final IndexOutput delegate; private boolean first=true; - private final RateLimiter rateLimiter; final String name; byte[] singleByte = new byte[1]; /** Construct an empty output buffer. */ - public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name, RateLimiter rateLimiter) { + public MockIndexOutputWrapper(MockDirectoryWrapper dir, IndexOutput delegate, String name) { this.dir = dir; this.name = name; this.delegate = delegate; - this.rateLimiter = rateLimiter; } @Override @@ -80,8 +78,8 @@ public class MockIndexOutputWrapper extends IndexOutput { long freeSpace = dir.maxSize == 0 ? 0 : dir.maxSize - dir.sizeInBytes(); long realUsage = 0; - if (rateLimiter != null && len >= 1000) { - rateLimiter.pause(len); + if (dir.rateLimiter != null && len >= 1000) { + dir.rateLimiter.pause(len); } // If MockRAMDir crashed since we were opened, then diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/ThrottledIndexOutput.java b/lucene/test-framework/src/java/org/apache/lucene/util/ThrottledIndexOutput.java new file mode 100644 index 00000000000..b25106b7d67 --- /dev/null +++ b/lucene/test-framework/src/java/org/apache/lucene/util/ThrottledIndexOutput.java @@ -0,0 +1,152 @@ +package org.apache.lucene.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.IOException; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexOutput; + +/** + * Intentionally slow IndexOutput for testing. + */ +public class ThrottledIndexOutput extends IndexOutput { + public static final int DEFAULT_MIN_WRITTEN_BYTES = 1024; + private final int bytesPerSecond; + private IndexOutput delegate; + private long flushDelayMillis; + private long closeDelayMillis; + private long seekDelayMillis; + private long pendingBytes; + private long minBytesWritten; + private long timeElapsed; + private final byte[] bytes = new byte[1]; + + public ThrottledIndexOutput newFromDelegate(IndexOutput output) { + return new ThrottledIndexOutput(bytesPerSecond, flushDelayMillis, + closeDelayMillis, seekDelayMillis, minBytesWritten, output); + } + + public ThrottledIndexOutput(int bytesPerSecond, long delayInMillis, + IndexOutput delegate) { + this(bytesPerSecond, delayInMillis, delayInMillis, delayInMillis, + DEFAULT_MIN_WRITTEN_BYTES, delegate); + } + + public ThrottledIndexOutput(int bytesPerSecond, long delays, + int minBytesWritten, IndexOutput delegate) { + this(bytesPerSecond, delays, delays, delays, minBytesWritten, delegate); + } + + public static final int mBitsToBytes(int mbits) { + return mbits * 125000; + } + + public ThrottledIndexOutput(int bytesPerSecond, long flushDelayMillis, + long closeDelayMillis, long seekDelayMillis, long minBytesWritten, + IndexOutput delegate) { + assert bytesPerSecond > 0; + this.delegate = delegate; + this.bytesPerSecond = bytesPerSecond; + this.flushDelayMillis = flushDelayMillis; + this.closeDelayMillis = closeDelayMillis; + this.seekDelayMillis = seekDelayMillis; + this.minBytesWritten = minBytesWritten; + } + + @Override + public void flush() throws IOException { + sleep(flushDelayMillis); + delegate.flush(); + } + + @Override + public void close() throws IOException { + try { + sleep(closeDelayMillis + getDelay(true)); + } finally { + delegate.close(); + } + } + + @Override + public long getFilePointer() { + return delegate.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + sleep(seekDelayMillis); + delegate.seek(pos); + } + + @Override + public long length() throws IOException { + return delegate.length(); + } + + @Override + public void writeByte(byte b) throws IOException { + bytes[0] = b; + writeBytes(bytes, 0, 1); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + final long before = System.nanoTime(); + delegate.writeBytes(b, offset, length); + timeElapsed += System.nanoTime() - before; + pendingBytes += length; + sleep(getDelay(false)); + + } + + protected long getDelay(boolean closing) { + if (pendingBytes > 0 && (closing || pendingBytes > minBytesWritten)) { + long actualBps = (timeElapsed / pendingBytes) * 1000000000l; // nano to sec + if (actualBps > bytesPerSecond) { + long expected = (pendingBytes * 1000l / bytesPerSecond) ; + final long delay = expected - (timeElapsed / 1000000l) ; + pendingBytes = 0; + timeElapsed = 0; + return delay; + } + } + return 0; + + } + + private static final void sleep(long ms) { + if (ms <= 0) + return; + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + } + + @Override + public void setLength(long length) throws IOException { + delegate.setLength(length); + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + delegate.copyBytes(input, numBytes); + } +}