LUCENE-4071: DWStallControl can deadlock if stalled and flushMemory can not release the stall state

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1341342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2012-05-22 08:08:33 +00:00
parent 964e90adb3
commit 2ce7899fc3
4 changed files with 50 additions and 20 deletions

View File

@ -190,10 +190,13 @@ final class DocumentsWriterFlushControl implements MemoryController {
Long bytes = flushingWriters.remove(dwpt); Long bytes = flushingWriters.remove(dwpt);
flushBytes -= bytes.longValue(); flushBytes -= bytes.longValue();
perThreadPool.recycle(dwpt); perThreadPool.recycle(dwpt);
stallControl.updateStalled(this);
assert assertMemory(); assert assertMemory();
} finally { } finally {
notifyAll(); try {
stallControl.updateStalled(this);
} finally {
notifyAll();
}
} }
} }

View File

@ -100,9 +100,19 @@ final class DocumentsWriterStallControl {
*/ */
void updateStalled(MemoryController controller) { void updateStalled(MemoryController controller) {
do { do {
// if we have more flushing / blocked DWPT than numActiveDWPT we stall! final long netBytes = controller.netBytes();
// don't stall if we have queued flushes - threads should be hijacked instead final long flushBytes = controller.flushBytes();
while (controller.netBytes() > controller.stallLimitBytes()) { final long limit = controller.stallLimitBytes();
assert netBytes >= flushBytes;
assert limit > 0;
/*
* we block indexing threads if net byte grows due to slow flushes
* yet, for small ram buffers and large documents we can easily
* reach the limit without any ongoing flushes. we need to ensure
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
while (netBytes > limit && (netBytes - flushBytes) < limit) {
if (sync.trySetStalled()) { if (sync.trySetStalled()) {
assert wasStalled = true; assert wasStalled = true;
return; return;
@ -125,6 +135,7 @@ final class DocumentsWriterStallControl {
static interface MemoryController { static interface MemoryController {
long netBytes(); long netBytes();
long flushBytes();
long stallLimitBytes(); long stallLimitBytes();
} }
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController; import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeaks; import com.carrotsearch.randomizedtesting.annotations.ThreadLeaks;
@ -40,6 +41,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000; memCtrl.limit = 1000;
memCtrl.netBytes = 1000; memCtrl.netBytes = 1000;
memCtrl.flushBytes = 20;
ctrl.updateStalled(memCtrl); ctrl.updateStalled(memCtrl);
Thread[] waitThreads = waitThreads(atLeast(1), ctrl); Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads); start(waitThreads);
@ -49,6 +51,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
// now stall threads and wake them up again // now stall threads and wake them up again
memCtrl.netBytes = 1001; memCtrl.netBytes = 1001;
memCtrl.flushBytes = 100;
ctrl.updateStalled(memCtrl); ctrl.updateStalled(memCtrl);
waitThreads = waitThreads(atLeast(1), ctrl); waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads); start(waitThreads);
@ -56,6 +59,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
assertTrue(ctrl.hasBlocked()); assertTrue(ctrl.hasBlocked());
assertTrue(ctrl.anyStalledThreads()); assertTrue(ctrl.anyStalledThreads());
memCtrl.netBytes = 50; memCtrl.netBytes = 50;
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl); ctrl.updateStalled(memCtrl);
assertFalse(ctrl.anyStalledThreads()); assertFalse(ctrl.anyStalledThreads());
join(waitThreads, 500); join(waitThreads, 500);
@ -76,9 +80,12 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000; memCtrl.limit = 1000;
memCtrl.netBytes = 1; memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
int iters = atLeast(1000); int iters = atLeast(1000);
for (int j = 0; j < iters; j++) { for (int j = 0; j < iters; j++) {
memCtrl.netBytes = baseBytes + random().nextInt(1000); memCtrl.netBytes = baseBytes + random().nextInt(1000);
memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
ctrl.updateStalled(memCtrl); ctrl.updateStalled(memCtrl);
if (random().nextInt(5) == 0) { // thread 0 only updates if (random().nextInt(5) == 0) { // thread 0 only updates
ctrl.waitIfStalled(); ctrl.waitIfStalled();
@ -112,6 +119,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000; memCtrl.limit = 1000;
memCtrl.netBytes = 1; memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl); ctrl.updateStalled(memCtrl);
final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicBoolean checkPoint = new AtomicBoolean(true); final AtomicBoolean checkPoint = new AtomicBoolean(true);
@ -143,7 +151,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
if (checkPoint.get()) { if (checkPoint.get()) {
latches[0].await(5, TimeUnit.SECONDS); assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS));
if (!exceptions.isEmpty()) { if (!exceptions.isEmpty()) {
for (Throwable throwable : exceptions) { for (Throwable throwable : exceptions) {
throwable.printStackTrace(); throwable.printStackTrace();
@ -154,7 +162,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
if (!ctrl.anyStalledThreads()) { if (!ctrl.anyStalledThreads()) {
assertTrue( assertTrue(
"control claims no stalled threads but waiter seems to be blocked", "control claims no stalled threads but waiter seems to be blocked",
latches[2].await(3, TimeUnit.SECONDS)); latches[2].await(10, TimeUnit.SECONDS));
} }
checkPoint.set(false); checkPoint.set(false);
@ -171,14 +179,13 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
} }
stop.set(true); stop.set(true);
memCtrl.limit = 1000; latches[1].countDown();
memCtrl.netBytes = 1;
ctrl.updateStalled(memCtrl);
if (checkPoint.get()) {
latches[1].countDown();
}
for (int i = 0; i < threads.length; i++) { for (int i = 0; i < threads.length; i++) {
memCtrl.limit = 1000;
memCtrl.netBytes = 1;
memCtrl.flushBytes = 0;
ctrl.updateStalled(memCtrl);
threads[i].join(2000); threads[i].join(2000);
if (threads[i].isAlive() && threads[i] instanceof Waiter) { if (threads[i].isAlive() && threads[i] instanceof Waiter) {
if (threads[i].getState() == Thread.State.WAITING) { if (threads[i].getState() == Thread.State.WAITING) {
@ -215,9 +222,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
CountDownLatch wait = latches[1]; CountDownLatch wait = latches[1];
join.countDown(); join.countDown();
try { try {
wait.await(); assertTrue(wait.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount());
throw new ThreadInterruptedException(e);
} }
} }
} }
@ -253,6 +261,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
SimpleMemCtrl memCtrl = new SimpleMemCtrl(); SimpleMemCtrl memCtrl = new SimpleMemCtrl();
memCtrl.limit = 1000; memCtrl.limit = 1000;
memCtrl.netBytes = release ? 1 : 2000; memCtrl.netBytes = release ? 1 : 2000;
memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
while (!stop.get()) { while (!stop.get()) {
int internalIters = release && random().nextBoolean() ? atLeast(5) : 1; int internalIters = release && random().nextBoolean() ? atLeast(5) : 1;
for (int i = 0; i < internalIters; i++) { for (int i = 0; i < internalIters; i++) {
@ -263,9 +272,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
CountDownLatch wait = latches[1]; CountDownLatch wait = latches[1];
join.countDown(); join.countDown();
try { try {
wait.await(); assertTrue(wait.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); System.out.println("[Updater] got interrupted - wait count: " + wait.getCount());
throw new ThreadInterruptedException(e);
} }
} }
Thread.yield(); Thread.yield();
@ -338,6 +348,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
private static class SimpleMemCtrl implements MemoryController { private static class SimpleMemCtrl implements MemoryController {
long netBytes; long netBytes;
long limit; long limit;
long flushBytes;
@Override @Override
public long netBytes() { public long netBytes() {
@ -348,6 +359,11 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
public long stallLimitBytes() { public long stallLimitBytes() {
return limit; return limit;
} }
@Override
public long flushBytes() {
return flushBytes;
}
} }
} }

View File

@ -36,7 +36,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase {
boolean anyOpenDelFiles = false; boolean anyOpenDelFiles = false;
DirectoryReader r = IndexReader.open(writer, true); DirectoryReader r = DirectoryReader.open(writer, true);
while (System.currentTimeMillis() < stopTime && !failed.get()) { while (System.currentTimeMillis() < stopTime && !failed.get()) {
if (random().nextBoolean()) { if (random().nextBoolean()) {
@ -63,7 +63,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase {
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: now open"); System.out.println("TEST: now open");
} }
r = IndexReader.open(writer, true); r = DirectoryReader.open(writer, true);
} }
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: got new reader=" + r); System.out.println("TEST: got new reader=" + r);
@ -110,7 +110,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase {
r2 = writer.getReader(); r2 = writer.getReader();
} else { } else {
writer.commit(); writer.commit();
r2 = IndexReader.open(dir); r2 = DirectoryReader.open(dir);
} }
return newSearcher(r2); return newSearcher(r2);
} }