From 5b43384367bfbbe5c33046b940fd0e8fc29dd9bf Mon Sep 17 00:00:00 2001 From: Uwe Schindler Date: Thu, 9 Nov 2023 18:40:10 +0100 Subject: [PATCH] Redo #12707: Do not rely on isAlive() status of MemorySegment#Scope and make sure IndexInput#close() does not throw IllegalStateException and waits instead (#12785) --- lucene/CHANGES.txt | 7 +- .../lucene/store/MemorySegmentIndexInput.java | 28 +++++-- .../lucene/store/MemorySegmentIndexInput.java | 28 +++++-- .../lucene/store/MemorySegmentIndexInput.java | 28 +++++-- .../lucene/store/TestMmapDirectory.java | 73 +++++++++---------- 5 files changed, 100 insertions(+), 64 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index db102c52562..470666c822b 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -208,8 +208,11 @@ Improvements * GITHUB#12586: Remove over-counting of deleted terms. (Guo Feng) -* GITHUB#12705, GITHUB#12705: Improve handling of NullPointerException and IllegalStateException - in MMapDirectory's IndexInputs. (Uwe Schindler, Michael Sokolov) +* GITHUB#12705, GITHUB#12705, GITHUB#12785: Improve handling of NullPointerException and + IllegalStateException in MMapDirectory's IndexInputs. Also makes sure to close master + MemorySegmentIndexInput while not throwing IllegalStateException (retry in spin loop). + Also improve TestMmapDirectory.testAceWithThreads to run faster and use less resources. + (Uwe Schindler, Maurizio Cimadamore, Michael Sokolov) * GITHUB#12689: TaskExecutor to cancel all tasks on exception to avoid needless computation. (Luca Cavanna) diff --git a/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java index ee102a12d4e..44ecff57cb1 100644 --- a/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -105,12 +105,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces AlreadyClosedException alreadyClosed(RuntimeException e) { // we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens, // we check the "is closed" condition explicitly by checking that our "curSegment" is null. + // Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput! if (this.curSegment == null) { return new AlreadyClosedException("Already closed: " + this); } - // we also check if the session of all segments is still alive: - if (Arrays.stream(segments).allMatch(s -> s.session().isAlive()) == false) { - return new AlreadyClosedException("Already closed: " + this); + // ISE can be thrown by MemorySegment and contains "closed" in message: + if (e instanceof IllegalStateException + && e.getMessage() != null + && e.getMessage().contains("closed")) { + return new AlreadyClosedException("Already closed: " + this, e); } // otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to // the IndexInput method): @@ -472,17 +475,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces return; } - // make sure all accesses to this IndexInput instance throw NPE: - curSegment = null; - Arrays.fill(segments, null); - // the master IndexInput has a MemorySession and is able // to release all resources (unmap segments) - a // side effect is that other threads still using clones // will throw IllegalStateException if (session != null) { - session.close(); + while (session.isAlive()) { + try { + session.close(); + break; + } catch ( + @SuppressWarnings("unused") + IllegalStateException e) { + Thread.onSpinWait(); + } + } } + + // make sure all accesses to this IndexInput instance throw NPE: + curSegment = null; + Arrays.fill(segments, null); } /** Optimization of MemorySegmentIndexInput for when there is only one segment. */ diff --git a/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java index 627c5edbd5a..0f2fdccac90 100644 --- a/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -103,12 +103,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces AlreadyClosedException alreadyClosed(RuntimeException e) { // we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens, // we check the "is closed" condition explicitly by checking that our "curSegment" is null. + // Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput! if (this.curSegment == null) { return new AlreadyClosedException("Already closed: " + this); } - // we also check if the scope of all segments is still alive: - if (Arrays.stream(segments).allMatch(s -> s.scope().isAlive()) == false) { - return new AlreadyClosedException("Already closed: " + this); + // ISE can be thrown by MemorySegment and contains "closed" in message: + if (e instanceof IllegalStateException + && e.getMessage() != null + && e.getMessage().contains("closed")) { + return new AlreadyClosedException("Already closed: " + this, e); } // otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to // the IndexInput method): @@ -470,17 +473,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces return; } - // make sure all accesses to this IndexInput instance throw NPE: - curSegment = null; - Arrays.fill(segments, null); - // the master IndexInput has an Arena and is able // to release all resources (unmap segments) - a // side effect is that other threads still using clones // will throw IllegalStateException if (arena != null) { - arena.close(); + while (arena.scope().isAlive()) { + try { + arena.close(); + break; + } catch ( + @SuppressWarnings("unused") + IllegalStateException e) { + Thread.onSpinWait(); + } + } } + + // make sure all accesses to this IndexInput instance throw NPE: + curSegment = null; + Arrays.fill(segments, null); } /** Optimization of MemorySegmentIndexInput for when there is only one segment. */ diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 627c5edbd5a..0f2fdccac90 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -103,12 +103,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces AlreadyClosedException alreadyClosed(RuntimeException e) { // we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens, // we check the "is closed" condition explicitly by checking that our "curSegment" is null. + // Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput! if (this.curSegment == null) { return new AlreadyClosedException("Already closed: " + this); } - // we also check if the scope of all segments is still alive: - if (Arrays.stream(segments).allMatch(s -> s.scope().isAlive()) == false) { - return new AlreadyClosedException("Already closed: " + this); + // ISE can be thrown by MemorySegment and contains "closed" in message: + if (e instanceof IllegalStateException + && e.getMessage() != null + && e.getMessage().contains("closed")) { + return new AlreadyClosedException("Already closed: " + this, e); } // otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to // the IndexInput method): @@ -470,17 +473,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces return; } - // make sure all accesses to this IndexInput instance throw NPE: - curSegment = null; - Arrays.fill(segments, null); - // the master IndexInput has an Arena and is able // to release all resources (unmap segments) - a // side effect is that other threads still using clones // will throw IllegalStateException if (arena != null) { - arena.close(); + while (arena.scope().isAlive()) { + try { + arena.close(); + break; + } catch ( + @SuppressWarnings("unused") + IllegalStateException e) { + Thread.onSpinWait(); + } + } } + + // make sure all accesses to this IndexInput instance throw NPE: + curSegment = null; + Arrays.fill(segments, null); } /** Optimization of MemorySegmentIndexInput for when there is only one segment. */ diff --git a/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java index a7156383aab..a0b013d0608 100644 --- a/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java +++ b/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java @@ -60,48 +60,45 @@ public class TestMmapDirectory extends BaseDirectoryTestCase { public void testAceWithThreads() throws Exception { assumeTrue("Test requires MemorySegmentIndexInput", isMemorySegmentImpl()); - final int iters = RANDOM_MULTIPLIER * (TEST_NIGHTLY ? 50 : 10); - for (int iter = 0; iter < iters; iter++) { - Directory dir = getDirectory(createTempDir("testAceWithThreads")); - IndexOutput out = dir.createOutput("test", IOContext.DEFAULT); - Random random = random(); - for (int i = 0; i < 8 * 1024 * 1024; i++) { - out.writeInt(random.nextInt()); + final int nInts = 8 * 1024 * 1024; + + try (Directory dir = getDirectory(createTempDir("testAceWithThreads"))) { + try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) { + final Random random = random(); + for (int i = 0; i < nInts; i++) { + out.writeInt(random.nextInt()); + } } - out.close(); - IndexInput in = dir.openInput("test", IOContext.DEFAULT); - IndexInput clone = in.clone(); - final byte[] accum = new byte[32 * 1024 * 1024]; - final CountDownLatch shotgun = new CountDownLatch(1); - Thread t1 = - new Thread( - () -> { - try { - shotgun.await(); - for (int i = 0; i < 10; i++) { - clone.seek(0); - clone.readBytes(accum, 0, accum.length); + + final int iters = RANDOM_MULTIPLIER * (TEST_NIGHTLY ? 50 : 10); + for (int iter = 0; iter < iters; iter++) { + final IndexInput in = dir.openInput("test", IOContext.DEFAULT); + final IndexInput clone = in.clone(); + final byte[] accum = new byte[nInts * Integer.BYTES]; + final CountDownLatch shotgun = new CountDownLatch(1); + final Thread t1 = + new Thread( + () -> { + try { + shotgun.await(); + for (int i = 0; i < 10; i++) { + clone.seek(0); + clone.readBytes(accum, 0, accum.length); + } + } catch ( + @SuppressWarnings("unused") + AlreadyClosedException ok) { + // OK + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); } - } catch (@SuppressWarnings("unused") IOException | AlreadyClosedException ok) { - // OK - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - t1.start(); - shotgun.countDown(); - try { + }); + t1.start(); + shotgun.countDown(); + // this triggers "bad behaviour": closing input while other threads are running in.close(); - } catch ( - @SuppressWarnings("unused") - IllegalStateException ise) { - // this may also happen and is a valid exception, informing our user that, e.g., a query is - // running! - // "java.lang.IllegalStateException: Cannot close while another thread is accessing the - // segment" + t1.join(); } - t1.join(); - dir.close(); } }