LUCENE-4158: Simplify DocumentsWriterStallControl to prevent further deadlocks

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1352535 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2012-06-21 14:02:18 +00:00
parent 0ae03a37cf
commit c4e4b36037
4 changed files with 106 additions and 163 deletions

View File

@ -26,7 +26,6 @@ import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** /**
@ -41,7 +40,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
* space exhaustion. * space exhaustion.
*/ */
final class DocumentsWriterFlushControl implements MemoryController { final class DocumentsWriterFlushControl {
private final long hardMaxBytesPerDWPT; private final long hardMaxBytesPerDWPT;
private long activeBytes = 0; private long activeBytes = 0;
@ -88,7 +87,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
return flushBytes + activeBytes; return flushBytes + activeBytes;
} }
public long stallLimitBytes() { private long stallLimitBytes() {
final double maxRamMB = config.getRAMBufferSizeMB(); final double maxRamMB = config.getRAMBufferSizeMB();
return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE; return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
} }
@ -178,7 +177,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
} }
return flushingDWPT; return flushingDWPT;
} finally { } finally {
stallControl.updateStalled(this); updateStallState();
assert assertMemory(); assert assertMemory();
} }
} }
@ -192,13 +191,30 @@ final class DocumentsWriterFlushControl implements MemoryController {
assert assertMemory(); assert assertMemory();
} finally { } finally {
try { try {
stallControl.updateStalled(this); updateStallState();
} finally { } finally {
notifyAll(); notifyAll();
} }
} }
} }
private final void updateStallState() {
assert Thread.holdsLock(this);
final long limit = stallLimitBytes();
/*
* we block indexing threads if net byte grows due to slow flushes
* yet, for small ram buffers and large documents we can easily
* reach the limit without any ongoing flushes. we need to ensure
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
final boolean stall = ((activeBytes + flushBytes) > limit) &&
(activeBytes < limit) &&
!closed;
stallControl.updateStalled(stall);
}
public synchronized void waitForFlush() { public synchronized void waitForFlush() {
while (flushingWriters.size() != 0) { while (flushingWriters.size() != 0) {
try { try {
@ -238,7 +254,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
// Take it out of the loop this DWPT is stale // Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed); perThreadPool.replaceForFlush(state, closed);
} finally { } finally {
stallControl.updateStalled(this); updateStallState();
} }
} }
@ -288,7 +304,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
} }
return null; return null;
} finally { } finally {
stallControl.updateStalled(this); updateStallState();
} }
} }
@ -304,7 +320,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
synchronized (this) { synchronized (this) {
final DocumentsWriterPerThread poll; final DocumentsWriterPerThread poll;
if ((poll = flushQueue.poll()) != null) { if ((poll = flushQueue.poll()) != null) {
stallControl.updateStalled(this); updateStallState();
return poll; return poll;
} }
fullFlush = this.fullFlush; fullFlush = this.fullFlush;
@ -458,7 +474,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
assert assertBlockedFlushes(documentsWriter.deleteQueue); assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(fullFlushBuffer); flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear(); fullFlushBuffer.clear();
stallControl.updateStalled(this); updateStallState();
} }
assert assertActiveDeleteQueue(documentsWriter.deleteQueue); assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
} }
@ -537,7 +553,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
} }
} finally { } finally {
fullFlush = false; fullFlush = false;
stallControl.updateStalled(this); updateStallState();
} }
} }
@ -572,7 +588,7 @@ final class DocumentsWriterFlushControl implements MemoryController {
fullFlush = false; fullFlush = false;
flushQueue.clear(); flushQueue.clear();
blockedFlushes.clear(); blockedFlushes.clear();
stallControl.updateStalled(this); updateStallState();
} }
} }

View File

@ -16,7 +16,8 @@ package org.apache.lucene.index;
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.IdentityHashMap;
import java.util.Map;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -37,107 +38,81 @@ import org.apache.lucene.util.ThreadInterruptedException;
* continue indexing. * continue indexing.
*/ */
final class DocumentsWriterStallControl { final class DocumentsWriterStallControl {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer { private volatile boolean stalled;
private int numWaiting; // only with assert
Sync() { private boolean wasStalled; // only with assert
setState(0); private final Map<Thread, Boolean> waiting = new IdentityHashMap<Thread, Boolean>(); // only with assert
}
boolean isHealthy() {
return getState() == 0;
}
boolean trySetStalled() {
int state = getState();
return compareAndSetState(state, state + 1);
}
boolean tryReset() {
final int oldState = getState();
if (oldState == 0) {
return true;
}
if (compareAndSetState(oldState, 0)) {
return releaseShared(0);
}
return false;
}
@Override
public int tryAcquireShared(int acquires) {
return getState() == 0 ? 1 : -1;
}
@Override
public boolean tryReleaseShared(int newState) {
return (getState() == 0);
}
}
private final Sync sync = new Sync();
volatile boolean wasStalled = false; // only with asserts
boolean anyStalledThreads() {
return !sync.isHealthy();
}
/** /**
* Update the stalled flag status. This method will set the stalled flag to * Update the stalled flag status. This method will set the stalled flag to
* <code>true</code> iff the number of flushing * <code>true</code> iff the number of flushing
* {@link DocumentsWriterPerThread} is greater than the number of active * {@link DocumentsWriterPerThread} is greater than the number of active
* {@link DocumentsWriterPerThread}. Otherwise it will reset the * {@link DocumentsWriterPerThread}. Otherwise it will reset the
* {@link DocumentsWriterStallControl} to healthy and release all threads waiting on * {@link DocumentsWriterStallControl} to healthy and release all threads
* {@link #waitIfStalled()} * waiting on {@link #waitIfStalled()}
*/ */
void updateStalled(MemoryController controller) { synchronized void updateStalled(boolean stalled) {
do { this.stalled = stalled;
final long netBytes = controller.netBytes(); if (stalled) {
final long flushBytes = controller.flushBytes(); wasStalled = true;
final long limit = controller.stallLimitBytes(); }
assert netBytes >= flushBytes; notifyAll();
assert limit > 0;
/*
* we block indexing threads if net byte grows due to slow flushes
* yet, for small ram buffers and large documents we can easily
* reach the limit without any ongoing flushes. we need to ensure
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
while (netBytes > limit && (netBytes - flushBytes) < limit) {
if (sync.trySetStalled()) {
assert wasStalled = true;
return;
}
}
} while (!sync.tryReset());
} }
/**
* Blocks if documents writing is currently in a stalled state.
*
*/
void waitIfStalled() { void waitIfStalled() {
try { if (stalled) {
sync.acquireSharedInterruptibly(0); synchronized (this) {
} catch (InterruptedException e) { boolean hasWaited = false;
throw new ThreadInterruptedException(e); while (stalled) {
try {
assert hasWaited || incWaiters();
assert (hasWaited = true);
wait();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
assert !hasWaited || decrWaiters();
}
} }
} }
boolean hasBlocked() { // for tests boolean anyStalledThreads() {
return sync.hasQueuedThreads(); return stalled;
} }
static interface MemoryController {
long netBytes(); private boolean incWaiters() {
long flushBytes(); numWaiting++;
long stallLimitBytes(); assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
return numWaiting > 0;
}
private boolean decrWaiters() {
numWaiting--;
assert waiting.remove(Thread.currentThread()) != null;
return numWaiting >= 0;
}
synchronized boolean hasBlocked() { // for tests
return numWaiting > 0;
}
boolean isHealthy() { // for tests
return !stalled; // volatile read!
}
synchronized boolean isThreadQueued(Thread t) { // for tests
return waiting.containsKey(t);
} }
public boolean isHealthy() { synchronized boolean wasStalled() { // for tests
return sync.isHealthy(); return wasStalled;
}
public boolean isThreadQueued(Thread t) {
return sync.isQueued(t);
} }
} }

View File

@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -38,11 +37,8 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
public void testSimpleStall() throws InterruptedException { public void testSimpleStall() throws InterruptedException {
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000; ctrl.updateStalled(false);
memCtrl.netBytes = 1000;
memCtrl.flushBytes = 20;
ctrl.updateStalled(memCtrl);
Thread[] waitThreads = waitThreads(atLeast(1), ctrl); Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads); start(waitThreads);
assertFalse(ctrl.hasBlocked()); assertFalse(ctrl.hasBlocked());
@ -50,43 +46,31 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
join(waitThreads, 10); join(waitThreads, 10);
// now stall threads and wake them up again // now stall threads and wake them up again
memCtrl.netBytes = 1001; ctrl.updateStalled(true);
memCtrl.flushBytes = 100;
ctrl.updateStalled(memCtrl);
waitThreads = waitThreads(atLeast(1), ctrl); waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads); start(waitThreads);
awaitState(100, Thread.State.WAITING, waitThreads); awaitState(100, Thread.State.WAITING, waitThreads);
assertTrue(ctrl.hasBlocked()); assertTrue(ctrl.hasBlocked());
assertTrue(ctrl.anyStalledThreads()); assertTrue(ctrl.anyStalledThreads());
memCtrl.netBytes = 50; ctrl.updateStalled(false);
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl);
assertFalse(ctrl.anyStalledThreads()); assertFalse(ctrl.anyStalledThreads());
join(waitThreads, 500); join(waitThreads, 500);
} }
public void testRandom() throws InterruptedException { public void testRandom() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); ctrl.updateStalled(false);
memCtrl.limit = 1000;
memCtrl.netBytes = 1;
ctrl.updateStalled(memCtrl);
Thread[] stallThreads = new Thread[atLeast(3)]; Thread[] stallThreads = new Thread[atLeast(3)];
for (int i = 0; i < stallThreads.length; i++) { for (int i = 0; i < stallThreads.length; i++) {
final int threadId = i; final int threadId = i;
final int stallProbability = 1 +random().nextInt(10);
stallThreads[i] = new Thread() { stallThreads[i] = new Thread() {
public void run() { public void run() {
int baseBytes = threadId % 2 == 0 ? 500 : 700;
SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000;
memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
int iters = atLeast(1000); int iters = atLeast(1000);
for (int j = 0; j < iters; j++) { for (int j = 0; j < iters; j++) {
memCtrl.netBytes = baseBytes + random().nextInt(1000); ctrl.updateStalled(random().nextInt(stallProbability) == 0);
memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
ctrl.updateStalled(memCtrl);
if (random().nextInt(5) == 0) { // thread 0 only updates if (random().nextInt(5) == 0) { // thread 0 only updates
ctrl.waitIfStalled(); ctrl.waitIfStalled();
} }
@ -102,7 +86,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
*/ */
while ((System.currentTimeMillis() - time) < 100 * 1000 while ((System.currentTimeMillis() - time) < 100 * 1000
&& !terminated(stallThreads)) { && !terminated(stallThreads)) {
ctrl.updateStalled(memCtrl); ctrl.updateStalled(false);
if (random().nextBoolean()) { if (random().nextBoolean()) {
Thread.yield(); Thread.yield();
} else { } else {
@ -116,11 +100,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
public void testAccquireReleaseRace() throws InterruptedException { public void testAccquireReleaseRace() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); ctrl.updateStalled(false);
memCtrl.limit = 1000;
memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl);
final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicBoolean checkPoint = new AtomicBoolean(true); final AtomicBoolean checkPoint = new AtomicBoolean(true);
@ -191,10 +171,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
for (int i = 0; i < threads.length; i++) { for (int i = 0; i < threads.length; i++) {
memCtrl.limit = 1000; ctrl.updateStalled(false);
memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl);
threads[i].join(2000); threads[i].join(2000);
if (threads[i].isAlive() && threads[i] instanceof Waiter) { if (threads[i].isAlive() && threads[i] instanceof Waiter) {
if (threads[i].getState() == Thread.State.WAITING) { if (threads[i].getState() == Thread.State.WAITING) {
@ -290,14 +267,11 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
public void run() { public void run() {
try { try {
SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000;
memCtrl.netBytes = release ? 1 : 2000;
memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
while (!stop.get()) { while (!stop.get()) {
int internalIters = release && random().nextBoolean() ? atLeast(5) : 1; int internalIters = release && random().nextBoolean() ? atLeast(5) : 1;
for (int i = 0; i < internalIters; i++) { for (int i = 0; i < internalIters; i++) {
ctrl.updateStalled(memCtrl); ctrl.updateStalled(random().nextBoolean());
} }
if (checkPoint.get()) { if (checkPoint.get()) {
sync.updateJoin.countDown(); sync.updateJoin.countDown();
@ -379,28 +353,6 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
+ " ms"); + " ms");
} }
private static class SimpleMemCtrl implements MemoryController {
long netBytes;
long limit;
long flushBytes;
@Override
public long netBytes() {
return netBytes;
}
@Override
public long stallLimitBytes() {
return limit;
}
@Override
public long flushBytes() {
return flushBytes;
}
}
private static final class Synchronizer { private static final class Synchronizer {
volatile CountDownLatch waiter; volatile CountDownLatch waiter;
volatile CountDownLatch updateJoin; volatile CountDownLatch updateJoin;

View File

@ -109,7 +109,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
assertTrue(maxRAMBytes < flushControl.peakActiveBytes); assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
} }
if (ensureNotStalled) { if (ensureNotStalled) {
assertFalse(docsWriter.flushControl.stallControl.wasStalled); assertFalse(docsWriter.flushControl.stallControl.wasStalled());
} }
writer.close(); writer.close();
assertEquals(0, flushControl.activeBytes()); assertEquals(0, flushControl.activeBytes());
@ -222,7 +222,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
assertEquals(numDocumentsToIndex, r.numDocs()); assertEquals(numDocumentsToIndex, r.numDocs());
assertEquals(numDocumentsToIndex, r.maxDoc()); assertEquals(numDocumentsToIndex, r.maxDoc());
if (!flushPolicy.flushOnRAM()) { if (!flushPolicy.flushOnRAM()) {
assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled); assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled());
assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked()); assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
} }
r.close(); r.close();
@ -275,7 +275,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
docsWriter.flushControl.stallControl.hasBlocked()); docsWriter.flushControl.stallControl.hasBlocked());
} }
if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) { if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
assertTrue(docsWriter.flushControl.stallControl.wasStalled); assertTrue(docsWriter.flushControl.stallControl.wasStalled());
} }
assertActiveBytesAfter(flushControl); assertActiveBytesAfter(flushControl);
writer.close(true); writer.close(true);