Redo #12707: Do not rely on isAlive() status of MemorySegment#Scope and make sure IndexInput#close() does not throw IllegalStateException and waits instead (#12785)

This commit is contained in:
Uwe Schindler 2023-11-09 18:40:10 +01:00 committed by GitHub
parent 2cd66fb311
commit 5b43384367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 64 deletions

View File

@ -208,8 +208,11 @@ Improvements
* GITHUB#12586: Remove over-counting of deleted terms. (Guo Feng)
* GITHUB#12705, GITHUB#12705: Improve handling of NullPointerException and IllegalStateException
in MMapDirectory's IndexInputs. (Uwe Schindler, Michael Sokolov)
* GITHUB#12705, GITHUB#12705, GITHUB#12785: Improve handling of NullPointerException and
IllegalStateException in MMapDirectory's IndexInputs. Also makes sure to close master
MemorySegmentIndexInput while not throwing IllegalStateException (retry in spin loop).
Also improve TestMmapDirectory.testAceWithThreads to run faster and use less resources.
(Uwe Schindler, Maurizio Cimadamore, Michael Sokolov)
* GITHUB#12689: TaskExecutor to cancel all tasks on exception to avoid needless computation. (Luca Cavanna)

View File

@ -105,12 +105,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
AlreadyClosedException alreadyClosed(RuntimeException e) {
// we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens,
// we check the "is closed" condition explicitly by checking that our "curSegment" is null.
// Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput!
if (this.curSegment == null) {
return new AlreadyClosedException("Already closed: " + this);
}
// we also check if the session of all segments is still alive:
if (Arrays.stream(segments).allMatch(s -> s.session().isAlive()) == false) {
return new AlreadyClosedException("Already closed: " + this);
// ISE can be thrown by MemorySegment and contains "closed" in message:
if (e instanceof IllegalStateException
&& e.getMessage() != null
&& e.getMessage().contains("closed")) {
return new AlreadyClosedException("Already closed: " + this, e);
}
// otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to
// the IndexInput method):
@ -472,17 +475,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return;
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
// the master IndexInput has a MemorySession and is able
// to release all resources (unmap segments) - a
// side effect is that other threads still using clones
// will throw IllegalStateException
if (session != null) {
session.close();
while (session.isAlive()) {
try {
session.close();
break;
} catch (
@SuppressWarnings("unused")
IllegalStateException e) {
Thread.onSpinWait();
}
}
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
}
/** Optimization of MemorySegmentIndexInput for when there is only one segment. */

View File

@ -103,12 +103,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
AlreadyClosedException alreadyClosed(RuntimeException e) {
// we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens,
// we check the "is closed" condition explicitly by checking that our "curSegment" is null.
// Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput!
if (this.curSegment == null) {
return new AlreadyClosedException("Already closed: " + this);
}
// we also check if the scope of all segments is still alive:
if (Arrays.stream(segments).allMatch(s -> s.scope().isAlive()) == false) {
return new AlreadyClosedException("Already closed: " + this);
// ISE can be thrown by MemorySegment and contains "closed" in message:
if (e instanceof IllegalStateException
&& e.getMessage() != null
&& e.getMessage().contains("closed")) {
return new AlreadyClosedException("Already closed: " + this, e);
}
// otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to
// the IndexInput method):
@ -470,17 +473,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return;
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
// the master IndexInput has an Arena and is able
// to release all resources (unmap segments) - a
// side effect is that other threads still using clones
// will throw IllegalStateException
if (arena != null) {
arena.close();
while (arena.scope().isAlive()) {
try {
arena.close();
break;
} catch (
@SuppressWarnings("unused")
IllegalStateException e) {
Thread.onSpinWait();
}
}
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
}
/** Optimization of MemorySegmentIndexInput for when there is only one segment. */

View File

@ -103,12 +103,15 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
AlreadyClosedException alreadyClosed(RuntimeException e) {
// we use NPE to signal if this input is closed (to not have checks everywhere). If NPE happens,
// we check the "is closed" condition explicitly by checking that our "curSegment" is null.
// Care must be taken to not leak the NPE to code outside MemorySegmentIndexInput!
if (this.curSegment == null) {
return new AlreadyClosedException("Already closed: " + this);
}
// we also check if the scope of all segments is still alive:
if (Arrays.stream(segments).allMatch(s -> s.scope().isAlive()) == false) {
return new AlreadyClosedException("Already closed: " + this);
// ISE can be thrown by MemorySegment and contains "closed" in message:
if (e instanceof IllegalStateException
&& e.getMessage() != null
&& e.getMessage().contains("closed")) {
return new AlreadyClosedException("Already closed: " + this, e);
}
// otherwise rethrow unmodified NPE/ISE (as it possibly a bug with passing a null parameter to
// the IndexInput method):
@ -470,17 +473,26 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return;
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
// the master IndexInput has an Arena and is able
// to release all resources (unmap segments) - a
// side effect is that other threads still using clones
// will throw IllegalStateException
if (arena != null) {
arena.close();
while (arena.scope().isAlive()) {
try {
arena.close();
break;
} catch (
@SuppressWarnings("unused")
IllegalStateException e) {
Thread.onSpinWait();
}
}
}
// make sure all accesses to this IndexInput instance throw NPE:
curSegment = null;
Arrays.fill(segments, null);
}
/** Optimization of MemorySegmentIndexInput for when there is only one segment. */

View File

@ -60,48 +60,45 @@ public class TestMmapDirectory extends BaseDirectoryTestCase {
public void testAceWithThreads() throws Exception {
assumeTrue("Test requires MemorySegmentIndexInput", isMemorySegmentImpl());
final int iters = RANDOM_MULTIPLIER * (TEST_NIGHTLY ? 50 : 10);
for (int iter = 0; iter < iters; iter++) {
Directory dir = getDirectory(createTempDir("testAceWithThreads"));
IndexOutput out = dir.createOutput("test", IOContext.DEFAULT);
Random random = random();
for (int i = 0; i < 8 * 1024 * 1024; i++) {
out.writeInt(random.nextInt());
final int nInts = 8 * 1024 * 1024;
try (Directory dir = getDirectory(createTempDir("testAceWithThreads"))) {
try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) {
final Random random = random();
for (int i = 0; i < nInts; i++) {
out.writeInt(random.nextInt());
}
}
out.close();
IndexInput in = dir.openInput("test", IOContext.DEFAULT);
IndexInput clone = in.clone();
final byte[] accum = new byte[32 * 1024 * 1024];
final CountDownLatch shotgun = new CountDownLatch(1);
Thread t1 =
new Thread(
() -> {
try {
shotgun.await();
for (int i = 0; i < 10; i++) {
clone.seek(0);
clone.readBytes(accum, 0, accum.length);
final int iters = RANDOM_MULTIPLIER * (TEST_NIGHTLY ? 50 : 10);
for (int iter = 0; iter < iters; iter++) {
final IndexInput in = dir.openInput("test", IOContext.DEFAULT);
final IndexInput clone = in.clone();
final byte[] accum = new byte[nInts * Integer.BYTES];
final CountDownLatch shotgun = new CountDownLatch(1);
final Thread t1 =
new Thread(
() -> {
try {
shotgun.await();
for (int i = 0; i < 10; i++) {
clone.seek(0);
clone.readBytes(accum, 0, accum.length);
}
} catch (
@SuppressWarnings("unused")
AlreadyClosedException ok) {
// OK
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
} catch (@SuppressWarnings("unused") IOException | AlreadyClosedException ok) {
// OK
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
shotgun.countDown();
try {
});
t1.start();
shotgun.countDown();
// this triggers "bad behaviour": closing input while other threads are running
in.close();
} catch (
@SuppressWarnings("unused")
IllegalStateException ise) {
// this may also happen and is a valid exception, informing our user that, e.g., a query is
// running!
// "java.lang.IllegalStateException: Cannot close while another thread is accessing the
// segment"
t1.join();
}
t1.join();
dir.close();
}
}