From 2ce7899fc3b26deacadd7e67dda954aa66ad650d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 22 May 2012 08:08:33 +0000 Subject: [PATCH] LUCENE-4071: DWStallControl can deadlock if stalled and flushMemory can not release the stall state git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1341342 13f79535-47bb-0310-9956-ffa450edef68 --- .../index/DocumentsWriterFlushControl.java | 7 +++- .../index/DocumentsWriterStallControl.java | 17 ++++++-- .../TestDocumentsWriterStallControl.java | 40 +++++++++++++------ .../apache/lucene/index/TestNRTThreads.java | 6 +-- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index bb57bb01bf8..39d2359509f 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -190,10 +190,13 @@ final class DocumentsWriterFlushControl implements MemoryController { Long bytes = flushingWriters.remove(dwpt); flushBytes -= bytes.longValue(); perThreadPool.recycle(dwpt); - stallControl.updateStalled(this); assert assertMemory(); } finally { - notifyAll(); + try { + stallControl.updateStalled(this); + } finally { + notifyAll(); + } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index 80dbf9b01e4..93b22e4e47b 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -100,9 +100,19 @@ final class DocumentsWriterStallControl { */ void updateStalled(MemoryController controller) { do { - // if we have more flushing / blocked DWPT than numActiveDWPT we stall! - // don't stall if we have queued flushes - threads should be hijacked instead - while (controller.netBytes() > controller.stallLimitBytes()) { + final long netBytes = controller.netBytes(); + final long flushBytes = controller.flushBytes(); + final long limit = controller.stallLimitBytes(); + assert netBytes >= flushBytes; + assert limit > 0; + /* + * we block indexing threads if net byte grows due to slow flushes + * yet, for small ram buffers and large documents we can easily + * reach the limit without any ongoing flushes. we need to ensure + * that we don't stall/block if an ongoing or pending flush can + * not free up enough memory to release the stall lock. + */ + while (netBytes > limit && (netBytes - flushBytes) < limit) { if (sync.trySetStalled()) { assert wasStalled = true; return; @@ -125,6 +135,7 @@ final class DocumentsWriterStallControl { static interface MemoryController { long netBytes(); + long flushBytes(); long stallLimitBytes(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java index 46052131015..3a9fe2b0ac4 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.ThreadInterruptedException; import com.carrotsearch.randomizedtesting.annotations.ThreadLeaks; @@ -40,6 +41,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1000; + memCtrl.flushBytes = 20; ctrl.updateStalled(memCtrl); Thread[] waitThreads = waitThreads(atLeast(1), ctrl); start(waitThreads); @@ -49,6 +51,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { // now stall threads and wake them up again memCtrl.netBytes = 1001; + memCtrl.flushBytes = 100; ctrl.updateStalled(memCtrl); waitThreads = waitThreads(atLeast(1), ctrl); start(waitThreads); @@ -56,6 +59,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { assertTrue(ctrl.hasBlocked()); assertTrue(ctrl.anyStalledThreads()); memCtrl.netBytes = 50; + memCtrl.flushBytes = 0; ctrl.updateStalled(memCtrl); assertFalse(ctrl.anyStalledThreads()); join(waitThreads, 500); @@ -76,9 +80,12 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; + int iters = atLeast(1000); for (int j = 0; j < iters; j++) { memCtrl.netBytes = baseBytes + random().nextInt(1000); + memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes); ctrl.updateStalled(memCtrl); if (random().nextInt(5) == 0) { // thread 0 only updates ctrl.waitIfStalled(); @@ -112,6 +119,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; ctrl.updateStalled(memCtrl); final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean checkPoint = new AtomicBoolean(true); @@ -143,7 +151,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { for (int i = 0; i < iters; i++) { if (checkPoint.get()) { - latches[0].await(5, TimeUnit.SECONDS); + assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS)); if (!exceptions.isEmpty()) { for (Throwable throwable : exceptions) { throwable.printStackTrace(); @@ -154,7 +162,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { if (!ctrl.anyStalledThreads()) { assertTrue( "control claims no stalled threads but waiter seems to be blocked", - latches[2].await(3, TimeUnit.SECONDS)); + latches[2].await(10, TimeUnit.SECONDS)); } checkPoint.set(false); @@ -171,14 +179,13 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { } stop.set(true); - memCtrl.limit = 1000; - memCtrl.netBytes = 1; - ctrl.updateStalled(memCtrl); - if (checkPoint.get()) { - latches[1].countDown(); - } + latches[1].countDown(); for (int i = 0; i < threads.length; i++) { + memCtrl.limit = 1000; + memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; + ctrl.updateStalled(memCtrl); threads[i].join(2000); if (threads[i].isAlive() && threads[i] instanceof Waiter) { if (threads[i].getState() == Thread.State.WAITING) { @@ -215,9 +222,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { CountDownLatch wait = latches[1]; join.countDown(); try { - wait.await(); + assertTrue(wait.await(10, TimeUnit.SECONDS)); } catch (InterruptedException e) { - throw new RuntimeException(e); + System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount()); + throw new ThreadInterruptedException(e); } } } @@ -253,6 +261,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = release ? 1 : 2000; + memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes); while (!stop.get()) { int internalIters = release && random().nextBoolean() ? atLeast(5) : 1; for (int i = 0; i < internalIters; i++) { @@ -263,9 +272,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { CountDownLatch wait = latches[1]; join.countDown(); try { - wait.await(); + assertTrue(wait.await(10, TimeUnit.SECONDS)); } catch (InterruptedException e) { - throw new RuntimeException(e); + System.out.println("[Updater] got interrupted - wait count: " + wait.getCount()); + throw new ThreadInterruptedException(e); } } Thread.yield(); @@ -338,6 +348,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { private static class SimpleMemCtrl implements MemoryController { long netBytes; long limit; + long flushBytes; @Override public long netBytes() { @@ -348,6 +359,11 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { public long stallLimitBytes() { return limit; } + + @Override + public long flushBytes() { + return flushBytes; + } } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java index e5b74f155cc..e9af7cb9e72 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java @@ -36,7 +36,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { boolean anyOpenDelFiles = false; - DirectoryReader r = IndexReader.open(writer, true); + DirectoryReader r = DirectoryReader.open(writer, true); while (System.currentTimeMillis() < stopTime && !failed.get()) { if (random().nextBoolean()) { @@ -63,7 +63,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { if (VERBOSE) { System.out.println("TEST: now open"); } - r = IndexReader.open(writer, true); + r = DirectoryReader.open(writer, true); } if (VERBOSE) { System.out.println("TEST: got new reader=" + r); @@ -110,7 +110,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { r2 = writer.getReader(); } else { writer.commit(); - r2 = IndexReader.open(dir); + r2 = DirectoryReader.open(dir); } return newSearcher(r2); }