From 4267734e804c18a66a77889a7a569fe8c3060890 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 14 Aug 2020 16:04:58 +0200 Subject: [PATCH] Ensure DWPTPool never release any new DWPT after it's closed (#1751) The DWPTPool should not release new DPWTs after it's closed. Yet, if the pool is in a state where it's preventing new writers from being created in order to swap the delete queue it might get closed and in that case we miss to throw an AlreadyClosedException and release a new writer which violates the condition that the pool is empty after it's closed and all remaining DWPTs have been aborted. --- .../index/DocumentsWriterPerThreadPool.java | 14 ++- .../TestDocumentsWriterPerThreadPool.java | 96 +++++++++++++++++++ .../index/TestIndexWriterWithThreads.java | 1 - 3 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index ce6956c55fb..5db3ec4294c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -96,6 +96,10 @@ final class DocumentsWriterPerThreadPool implements Iterable + new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory, + newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false)); + + DocumentsWriterPerThread first = pool.getAndLock(); + assertEquals(1, pool.size()); + DocumentsWriterPerThread second = pool.getAndLock(); + assertEquals(2, pool.size()); + pool.marksAsFreeAndUnlock(first); + assertEquals(2, pool.size()); + DocumentsWriterPerThread third = pool.getAndLock(); + assertSame(first, third); + assertEquals(2, pool.size()); + pool.checkout(third); + assertEquals(1, pool.size()); + + pool.close(); + assertEquals(1, pool.size()); + pool.marksAsFreeAndUnlock(second); + assertEquals(1, pool.size()); + for (DocumentsWriterPerThread lastPerThead : pool.filterAndLock(x -> true)) { + pool.checkout(lastPerThead); + lastPerThead.unlock(); + } + assertEquals(0, pool.size()); + } + } + + public void testCloseWhileNewWritersLocked() throws IOException, InterruptedException { + try (Directory directory = newDirectory()) { + DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool(() -> + new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory, + newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false)); + + DocumentsWriterPerThread first = pool.getAndLock(); + pool.lockNewWriters(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + latch.countDown(); + pool.getAndLock(); + fail(); + } catch (AlreadyClosedException e) { + // fine + } catch (IOException e) { + throw new AssertionError(e); + } + }); + t.start(); + latch.await(); + while (t.getState().equals(Thread.State.WAITING) == false) { + Thread.yield(); + } + first.unlock(); + pool.close(); + pool.unlockNewWriters(); + for (DocumentsWriterPerThread perThread : pool.filterAndLock(x -> true)) { + assertTrue(pool.checkout(perThread)); + perThread.unlock(); + } + assertEquals(0, pool.size()); + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java index 4ff6686ab59..c4f379ef083 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java @@ -59,7 +59,6 @@ public class TestIndexWriterWithThreads extends LuceneTestCase { private final CyclicBarrier syncStart; boolean diskFull; Throwable error; - AlreadyClosedException ace; IndexWriter writer; boolean noErrors; volatile int addCount;