From da41215a678f7c1a72cef558594031a99db90d88 Mon Sep 17 00:00:00 2001 From: Chris Hegarty <62058229+ChrisHegarty@users.noreply.github.com> Date: Wed, 10 Jul 2024 09:39:35 +0100 Subject: [PATCH] Use a confined Arena for IOContext.READONCE (#13535) Use a confined Arena for IOContext.READONCE. This change will require inputs opened with READONCE to be consumed and closed on the creating thread. Further testing and assertions can be added as a follow up. --- .../simpletext/SimpleTextDocValuesReader.java | 2 +- .../simpletext/SimpleTextPointsReader.java | 2 +- .../org/apache/lucene/store/IOContext.java | 7 ++- .../lucene/store/MemorySegmentIndexInput.java | 43 +++++++++++---- .../MemorySegmentIndexInputProvider.java | 6 +- .../lucene/store/TestMMapDirectory.java | 55 +++++++++++++++++++ .../tests/store/MockDirectoryWrapper.java | 11 ++-- .../tests/store/MockIndexInputWrapper.java | 39 ++++++++++++- .../SlowClosingMockIndexInputWrapper.java | 4 +- .../SlowOpeningMockIndexInputWrapper.java | 5 +- .../lucene/tests/util/LuceneTestCase.java | 12 ++-- 11 files changed, 153 insertions(+), 33 deletions(-) diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java index f58ff0873ca..435c2f73fdf 100644 --- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java +++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextDocValuesReader.java @@ -829,7 +829,7 @@ class SimpleTextDocValuesReader extends DocValuesProducer { clone.seek(0); // checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included // in SimpleTextUtil.CHECKSUM): - long footerStartPos = data.length() - (SimpleTextUtil.CHECKSUM.length + 21); + long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21); ChecksumIndexInput input = new BufferedChecksumIndexInput(clone); while (true) { SimpleTextUtil.readLine(input, scratch); diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextPointsReader.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextPointsReader.java index be0e98f906a..5d6c41663ca 100644 --- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextPointsReader.java +++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextPointsReader.java @@ -227,7 +227,7 @@ class SimpleTextPointsReader extends PointsReader { // checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included // in SimpleTextUtil.CHECKSUM): - long footerStartPos = dataIn.length() - (SimpleTextUtil.CHECKSUM.length + 21); + long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21); ChecksumIndexInput input = new BufferedChecksumIndexInput(clone); while (true) { SimpleTextUtil.readLine(input, scratch); diff --git a/lucene/core/src/java/org/apache/lucene/store/IOContext.java b/lucene/core/src/java/org/apache/lucene/store/IOContext.java index b2d82af20f8..f318b3a9015 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IOContext.java +++ b/lucene/core/src/java/org/apache/lucene/store/IOContext.java @@ -55,7 +55,12 @@ public record IOContext( */ public static final IOContext DEFAULT = new IOContext(Constants.DEFAULT_READADVICE); - /** A default context for reads with {@link ReadAdvice#SEQUENTIAL}. */ + /** + * A default context for reads with {@link ReadAdvice#SEQUENTIAL}. + * + *

This context should only be used when the read operations will be performed in the same + * thread as the thread that opens the underlying storage. + */ public static final IOContext READONCE = new IOContext(ReadAdvice.SEQUENTIAL); @SuppressWarnings("incomplete-switch") diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 68f1e771195..e9805f0f7a6 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -53,6 +53,7 @@ abstract class MemorySegmentIndexInput extends IndexInput final long length; final long chunkSizeMask; final int chunkSizePower; + final boolean confined; final Arena arena; final MemorySegment[] segments; @@ -67,12 +68,15 @@ abstract class MemorySegmentIndexInput extends IndexInput Arena arena, MemorySegment[] segments, long length, - int chunkSizePower) { + int chunkSizePower, + boolean confined) { assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals); if (segments.length == 1) { - return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower); + return new SingleSegmentImpl( + resourceDescription, arena, segments[0], length, chunkSizePower, confined); } else { - return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower); + return new MultiSegmentImpl( + resourceDescription, arena, segments, 0, length, chunkSizePower, confined); } } @@ -81,12 +85,14 @@ abstract class MemorySegmentIndexInput extends IndexInput Arena arena, MemorySegment[] segments, long length, - int chunkSizePower) { + int chunkSizePower, + boolean confined) { super(resourceDescription); this.arena = arena; this.segments = segments; this.length = length; this.chunkSizePower = chunkSizePower; + this.confined = confined; this.chunkSizeMask = (1L << chunkSizePower) - 1L; this.curSegment = segments[0]; } @@ -97,6 +103,12 @@ abstract class MemorySegmentIndexInput extends IndexInput } } + void ensureAccessible() { + if (confined && curSegment.isAccessibleBy(Thread.currentThread()) == false) { + throw new IllegalStateException("confined"); + } + } + // the unused parameter is just to silence javac about unused variables RuntimeException handlePositionalIOOBE(RuntimeException unused, String action, long pos) throws IOException { @@ -570,6 +582,7 @@ abstract class MemorySegmentIndexInput extends IndexInput /** Builds the actual sliced IndexInput (may apply extra offset in subclasses). * */ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long length) { ensureOpen(); + ensureAccessible(); final long sliceEnd = offset + length; final int startIndex = (int) (offset >>> chunkSizePower); @@ -591,7 +604,8 @@ abstract class MemorySegmentIndexInput extends IndexInput null, // clones don't have an Arena, as they can't close) slices[0].asSlice(offset, length), length, - chunkSizePower); + chunkSizePower, + confined); } else { return new MultiSegmentImpl( newResourceDescription, @@ -599,7 +613,8 @@ abstract class MemorySegmentIndexInput extends IndexInput slices, offset, length, - chunkSizePower); + chunkSizePower, + confined); } } @@ -643,8 +658,15 @@ abstract class MemorySegmentIndexInput extends IndexInput Arena arena, MemorySegment segment, long length, - int chunkSizePower) { - super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower); + int chunkSizePower, + boolean confined) { + super( + resourceDescription, + arena, + new MemorySegment[] {segment}, + length, + chunkSizePower, + confined); this.curSegmentIndex = 0; } @@ -740,8 +762,9 @@ abstract class MemorySegmentIndexInput extends IndexInput MemorySegment[] segments, long offset, long length, - int chunkSizePower) { - super(resourceDescription, arena, segments, length, chunkSizePower); + int chunkSizePower, + boolean confined) { + super(resourceDescription, arena, segments, length, chunkSizePower, confined); this.offset = offset; try { seek(0L); diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java index e1655101d75..08f6149746b 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java @@ -45,7 +45,8 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn path = Unwrappable.unwrapAll(path); boolean success = false; - final Arena arena = Arena.ofShared(); + final boolean confined = context == IOContext.READONCE; + final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared(); try (var fc = FileChannel.open(path, StandardOpenOption.READ)) { final long fileSize = fc.size(); final IndexInput in = @@ -61,7 +62,8 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn preload, fileSize), fileSize, - chunkSizePower); + chunkSizePower, + confined); success = true; return in; } finally { diff --git a/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java index 39d3dbda9ac..f7c49c9b661 100644 --- a/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java +++ b/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java @@ -19,9 +19,14 @@ package org.apache.lucene.store; import java.io.IOException; import java.nio.file.Path; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.lucene.tests.store.BaseDirectoryTestCase; import org.apache.lucene.util.Constants; +import org.apache.lucene.util.NamedThreadFactory; /** Tests MMapDirectory */ // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows @@ -117,4 +122,54 @@ public class TestMMapDirectory extends BaseDirectoryTestCase { } } } + + // Opens the input with ReadAdvice.READONCE to ensure slice and clone are appropriately confined + public void testConfined() throws Exception { + final int size = 16; + byte[] bytes = new byte[size]; + random().nextBytes(bytes); + + try (Directory dir = new MMapDirectory(createTempDir("testConfined"))) { + try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) { + out.writeBytes(bytes, 0, bytes.length); + } + + try (var in = dir.openInput("test", IOContext.READONCE); + var executor = Executors.newFixedThreadPool(1, new NamedThreadFactory("testConfined"))) { + // ensure accessible + assertEquals(16L, in.slice("test", 0, in.length()).length()); + assertEquals(15L, in.slice("test", 1, in.length() - 1).length()); + + // ensure not accessible + Callable task1 = () -> in.slice("test", 0, in.length()); + var x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task1))); + assertTrue(x.getMessage().contains("confined")); + + int offset = random().nextInt((int) in.length()); + int length = (int) in.length() - offset; + Callable task2 = () -> in.slice("test", offset, length); + x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task2))); + assertTrue(x.getMessage().contains("confined")); + + // slice.slice + var slice = in.slice("test", 0, in.length()); + Callable task3 = () -> slice.slice("test", 0, in.length()); + x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task3))); + assertTrue(x.getMessage().contains("confined")); + // slice.clone + x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(slice::clone))); + assertTrue(x.getMessage().contains("confined")); + } + } + } + + static final Class ISE = IllegalStateException.class; + + static Object getAndUnwrap(Future future) throws Throwable { + try { + return future.get(); + } catch (ExecutionException ee) { + throw ee.getCause(); + } + } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockDirectoryWrapper.java index 0411a2c183b..2589d082fc9 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockDirectoryWrapper.java @@ -812,8 +812,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { false); } - IndexInput delegateInput = - in.openInput(name, LuceneTestCase.newIOContext(randomState, context)); + context = LuceneTestCase.newIOContext(randomState, context); + final boolean confined = context == IOContext.READONCE; + IndexInput delegateInput = in.openInput(name, context); final IndexInput ii; int randomInt = randomState.nextInt(500); @@ -822,15 +823,15 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { System.out.println( "MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name); } - ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput); + ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput, confined); } else if (useSlowOpenClosers && randomInt == 1) { if (LuceneTestCase.VERBOSE) { System.out.println( "MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name); } - ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput); + ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput, confined); } else { - ii = new MockIndexInputWrapper(this, name, delegateInput, null); + ii = new MockIndexInputWrapper(this, name, delegateInput, null, confined); } addFileHandle(ii, name, Handle.Input); return ii; diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java index b25bd155783..87279008614 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java @@ -39,10 +39,16 @@ public class MockIndexInputWrapper extends FilterIndexInput { // Which MockIndexInputWrapper we were cloned from, or null if we are not a clone: private final MockIndexInputWrapper parent; + private final boolean confined; + private final Thread thread; /** Sole constructor */ public MockIndexInputWrapper( - MockDirectoryWrapper dir, String name, IndexInput delegate, MockIndexInputWrapper parent) { + MockDirectoryWrapper dir, + String name, + IndexInput delegate, + MockIndexInputWrapper parent, + boolean confined) { super("MockIndexInputWrapper(name=" + name + " delegate=" + delegate + ")", delegate); // If we are a clone then our parent better not be a clone! @@ -51,6 +57,8 @@ public class MockIndexInputWrapper extends FilterIndexInput { this.parent = parent; this.name = name; this.dir = dir; + this.confined = confined; + this.thread = Thread.currentThread(); } @Override @@ -84,6 +92,12 @@ public class MockIndexInputWrapper extends FilterIndexInput { } } + private void ensureAccessible() { + if (confined && thread != Thread.currentThread()) { + throw new RuntimeException("Abusing from another thread!"); + } + } + @Override public MockIndexInputWrapper clone() { ensureOpen(); @@ -93,7 +107,7 @@ public class MockIndexInputWrapper extends FilterIndexInput { dir.inputCloneCount.incrementAndGet(); IndexInput iiclone = in.clone(); MockIndexInputWrapper clone = - new MockIndexInputWrapper(dir, name, iiclone, parent != null ? parent : this); + new MockIndexInputWrapper(dir, name, iiclone, parent != null ? parent : this, confined); // Pending resolution on LUCENE-686 we may want to // uncomment this code so that we also track that all // clones get closed: @@ -120,25 +134,29 @@ public class MockIndexInputWrapper extends FilterIndexInput { dir.inputCloneCount.incrementAndGet(); IndexInput slice = in.slice(sliceDescription, offset, length); MockIndexInputWrapper clone = - new MockIndexInputWrapper(dir, sliceDescription, slice, parent != null ? parent : this); + new MockIndexInputWrapper( + dir, sliceDescription, slice, parent != null ? parent : this, confined); return clone; } @Override public long getFilePointer() { ensureOpen(); + ensureAccessible(); return in.getFilePointer(); } @Override public void seek(long pos) throws IOException { ensureOpen(); + ensureAccessible(); in.seek(pos); } @Override public void prefetch(long offset, long length) throws IOException { ensureOpen(); + ensureAccessible(); in.prefetch(offset, length); } @@ -151,90 +169,105 @@ public class MockIndexInputWrapper extends FilterIndexInput { @Override public byte readByte() throws IOException { ensureOpen(); + ensureAccessible(); return in.readByte(); } @Override public void readBytes(byte[] b, int offset, int len) throws IOException { ensureOpen(); + ensureAccessible(); in.readBytes(b, offset, len); } @Override public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { ensureOpen(); + ensureAccessible(); in.readBytes(b, offset, len, useBuffer); } @Override public void readFloats(float[] floats, int offset, int len) throws IOException { ensureOpen(); + ensureAccessible(); in.readFloats(floats, offset, len); } @Override public short readShort() throws IOException { ensureOpen(); + ensureAccessible(); return in.readShort(); } @Override public int readInt() throws IOException { ensureOpen(); + ensureAccessible(); return in.readInt(); } @Override public long readLong() throws IOException { ensureOpen(); + ensureAccessible(); return in.readLong(); } @Override public String readString() throws IOException { ensureOpen(); + ensureAccessible(); return in.readString(); } @Override public int readVInt() throws IOException { ensureOpen(); + ensureAccessible(); return in.readVInt(); } @Override public long readVLong() throws IOException { ensureOpen(); + ensureAccessible(); return in.readVLong(); } @Override public int readZInt() throws IOException { ensureOpen(); + ensureAccessible(); return in.readZInt(); } @Override public long readZLong() throws IOException { ensureOpen(); + ensureAccessible(); return in.readZLong(); } @Override public void skipBytes(long numBytes) throws IOException { ensureOpen(); + ensureAccessible(); super.skipBytes(numBytes); } @Override public Map readMapOfStrings() throws IOException { ensureOpen(); + ensureAccessible(); return in.readMapOfStrings(); } @Override public Set readSetOfStrings() throws IOException { ensureOpen(); + ensureAccessible(); return in.readSetOfStrings(); } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowClosingMockIndexInputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowClosingMockIndexInputWrapper.java index 73197a66155..1f9e61f5195 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowClosingMockIndexInputWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowClosingMockIndexInputWrapper.java @@ -35,8 +35,8 @@ class SlowClosingMockIndexInputWrapper extends MockIndexInputWrapper { } public SlowClosingMockIndexInputWrapper( - MockDirectoryWrapper dir, String name, IndexInput delegate) { - super(dir, name, delegate, null); + MockDirectoryWrapper dir, String name, IndexInput delegate, boolean confined) { + super(dir, name, delegate, null, confined); } @SuppressForbidden(reason = "Thread sleep") diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowOpeningMockIndexInputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowOpeningMockIndexInputWrapper.java index da0e13537c9..033785af9c7 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowOpeningMockIndexInputWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/SlowOpeningMockIndexInputWrapper.java @@ -35,8 +35,9 @@ class SlowOpeningMockIndexInputWrapper extends MockIndexInputWrapper { @SuppressForbidden(reason = "Thread sleep") public SlowOpeningMockIndexInputWrapper( - MockDirectoryWrapper dir, String name, IndexInput delegate) throws IOException { - super(dir, name, delegate, null); + MockDirectoryWrapper dir, String name, IndexInput delegate, boolean confined) + throws IOException { + super(dir, name, delegate, null, confined); try { Thread.sleep(50); } catch (InterruptedException ie) { diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java index c649fd18fa5..c61968d557e 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java @@ -1780,6 +1780,9 @@ public abstract class LuceneTestCase extends Assert { /** TODO: javadoc */ public static IOContext newIOContext(Random random, IOContext oldContext) { + if (oldContext == IOContext.READONCE) { + return oldContext; // don't mess with the READONCE singleton + } final int randomNumDocs = random.nextInt(4192); final int size = random.nextInt(512) * randomNumDocs; if (oldContext.flushInfo() != null) { @@ -1798,19 +1801,16 @@ public abstract class LuceneTestCase extends Assert { random.nextBoolean(), TestUtil.nextInt(random, 1, 100))); } else { - // Make a totally random IOContext: + // Make a totally random IOContext, except READONCE which has semantic implications final IOContext context; - switch (random.nextInt(4)) { + switch (random.nextInt(3)) { case 0: context = IOContext.DEFAULT; break; case 1: - context = IOContext.READONCE; - break; - case 2: context = new IOContext(new MergeInfo(randomNumDocs, size, true, -1)); break; - case 3: + case 2: context = new IOContext(new FlushInfo(randomNumDocs, size)); break; default: