Use a confined Arena for IOContext.READONCE (#13535)

Use a confined Arena for IOContext.READONCE.

This change will require inputs opened with READONCE to be consumed and closed on the creating thread. Further testing and assertions can be added as a follow up.
This commit is contained in:
Chris Hegarty 2024-07-10 09:39:35 +01:00 committed by GitHub
parent ef215d87ab
commit da41215a67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 153 additions and 33 deletions

View File

@ -829,7 +829,7 @@ class SimpleTextDocValuesReader extends DocValuesProducer {
clone.seek(0);
// checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included
// in SimpleTextUtil.CHECKSUM):
long footerStartPos = data.length() - (SimpleTextUtil.CHECKSUM.length + 21);
long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21);
ChecksumIndexInput input = new BufferedChecksumIndexInput(clone);
while (true) {
SimpleTextUtil.readLine(input, scratch);

View File

@ -227,7 +227,7 @@ class SimpleTextPointsReader extends PointsReader {
// checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included
// in SimpleTextUtil.CHECKSUM):
long footerStartPos = dataIn.length() - (SimpleTextUtil.CHECKSUM.length + 21);
long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21);
ChecksumIndexInput input = new BufferedChecksumIndexInput(clone);
while (true) {
SimpleTextUtil.readLine(input, scratch);

View File

@ -55,7 +55,12 @@ public record IOContext(
*/
public static final IOContext DEFAULT = new IOContext(Constants.DEFAULT_READADVICE);
/** A default context for reads with {@link ReadAdvice#SEQUENTIAL}. */
/**
* A default context for reads with {@link ReadAdvice#SEQUENTIAL}.
*
* <p>This context should only be used when the read operations will be performed in the same
* thread as the thread that opens the underlying storage.
*/
public static final IOContext READONCE = new IOContext(ReadAdvice.SEQUENTIAL);
@SuppressWarnings("incomplete-switch")

View File

@ -53,6 +53,7 @@ abstract class MemorySegmentIndexInput extends IndexInput
final long length;
final long chunkSizeMask;
final int chunkSizePower;
final boolean confined;
final Arena arena;
final MemorySegment[] segments;
@ -67,12 +68,15 @@ abstract class MemorySegmentIndexInput extends IndexInput
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
boolean confined) {
assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals);
if (segments.length == 1) {
return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower);
return new SingleSegmentImpl(
resourceDescription, arena, segments[0], length, chunkSizePower, confined);
} else {
return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower);
return new MultiSegmentImpl(
resourceDescription, arena, segments, 0, length, chunkSizePower, confined);
}
}
@ -81,12 +85,14 @@ abstract class MemorySegmentIndexInput extends IndexInput
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
boolean confined) {
super(resourceDescription);
this.arena = arena;
this.segments = segments;
this.length = length;
this.chunkSizePower = chunkSizePower;
this.confined = confined;
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
this.curSegment = segments[0];
}
@ -97,6 +103,12 @@ abstract class MemorySegmentIndexInput extends IndexInput
}
}
void ensureAccessible() {
if (confined && curSegment.isAccessibleBy(Thread.currentThread()) == false) {
throw new IllegalStateException("confined");
}
}
// the unused parameter is just to silence javac about unused variables
RuntimeException handlePositionalIOOBE(RuntimeException unused, String action, long pos)
throws IOException {
@ -570,6 +582,7 @@ abstract class MemorySegmentIndexInput extends IndexInput
/** Builds the actual sliced IndexInput (may apply extra offset in subclasses). * */
MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long length) {
ensureOpen();
ensureAccessible();
final long sliceEnd = offset + length;
final int startIndex = (int) (offset >>> chunkSizePower);
@ -591,7 +604,8 @@ abstract class MemorySegmentIndexInput extends IndexInput
null, // clones don't have an Arena, as they can't close)
slices[0].asSlice(offset, length),
length,
chunkSizePower);
chunkSizePower,
confined);
} else {
return new MultiSegmentImpl(
newResourceDescription,
@ -599,7 +613,8 @@ abstract class MemorySegmentIndexInput extends IndexInput
slices,
offset,
length,
chunkSizePower);
chunkSizePower,
confined);
}
}
@ -643,8 +658,15 @@ abstract class MemorySegmentIndexInput extends IndexInput
Arena arena,
MemorySegment segment,
long length,
int chunkSizePower) {
super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower);
int chunkSizePower,
boolean confined) {
super(
resourceDescription,
arena,
new MemorySegment[] {segment},
length,
chunkSizePower,
confined);
this.curSegmentIndex = 0;
}
@ -740,8 +762,9 @@ abstract class MemorySegmentIndexInput extends IndexInput
MemorySegment[] segments,
long offset,
long length,
int chunkSizePower) {
super(resourceDescription, arena, segments, length, chunkSizePower);
int chunkSizePower,
boolean confined) {
super(resourceDescription, arena, segments, length, chunkSizePower, confined);
this.offset = offset;
try {
seek(0L);

View File

@ -45,7 +45,8 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
path = Unwrappable.unwrapAll(path);
boolean success = false;
final Arena arena = Arena.ofShared();
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
@ -61,7 +62,8 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
preload,
fileSize),
fileSize,
chunkSizePower);
chunkSizePower,
confined);
success = true;
return in;
} finally {

View File

@ -19,9 +19,14 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.lucene.tests.store.BaseDirectoryTestCase;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.NamedThreadFactory;
/** Tests MMapDirectory */
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows
@ -117,4 +122,54 @@ public class TestMMapDirectory extends BaseDirectoryTestCase {
}
}
}
// Opens the input with ReadAdvice.READONCE to ensure slice and clone are appropriately confined
public void testConfined() throws Exception {
final int size = 16;
byte[] bytes = new byte[size];
random().nextBytes(bytes);
try (Directory dir = new MMapDirectory(createTempDir("testConfined"))) {
try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) {
out.writeBytes(bytes, 0, bytes.length);
}
try (var in = dir.openInput("test", IOContext.READONCE);
var executor = Executors.newFixedThreadPool(1, new NamedThreadFactory("testConfined"))) {
// ensure accessible
assertEquals(16L, in.slice("test", 0, in.length()).length());
assertEquals(15L, in.slice("test", 1, in.length() - 1).length());
// ensure not accessible
Callable<Object> task1 = () -> in.slice("test", 0, in.length());
var x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task1)));
assertTrue(x.getMessage().contains("confined"));
int offset = random().nextInt((int) in.length());
int length = (int) in.length() - offset;
Callable<Object> task2 = () -> in.slice("test", offset, length);
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task2)));
assertTrue(x.getMessage().contains("confined"));
// slice.slice
var slice = in.slice("test", 0, in.length());
Callable<Object> task3 = () -> slice.slice("test", 0, in.length());
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task3)));
assertTrue(x.getMessage().contains("confined"));
// slice.clone
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(slice::clone)));
assertTrue(x.getMessage().contains("confined"));
}
}
}
static final Class<IllegalStateException> ISE = IllegalStateException.class;
static Object getAndUnwrap(Future<Object> future) throws Throwable {
try {
return future.get();
} catch (ExecutionException ee) {
throw ee.getCause();
}
}
}

View File

@ -812,8 +812,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
false);
}
IndexInput delegateInput =
in.openInput(name, LuceneTestCase.newIOContext(randomState, context));
context = LuceneTestCase.newIOContext(randomState, context);
final boolean confined = context == IOContext.READONCE;
IndexInput delegateInput = in.openInput(name, context);
final IndexInput ii;
int randomInt = randomState.nextInt(500);
@ -822,15 +823,15 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
System.out.println(
"MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
}
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput, confined);
} else if (useSlowOpenClosers && randomInt == 1) {
if (LuceneTestCase.VERBOSE) {
System.out.println(
"MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
}
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput, confined);
} else {
ii = new MockIndexInputWrapper(this, name, delegateInput, null);
ii = new MockIndexInputWrapper(this, name, delegateInput, null, confined);
}
addFileHandle(ii, name, Handle.Input);
return ii;

View File

@ -39,10 +39,16 @@ public class MockIndexInputWrapper extends FilterIndexInput {
// Which MockIndexInputWrapper we were cloned from, or null if we are not a clone:
private final MockIndexInputWrapper parent;
private final boolean confined;
private final Thread thread;
/** Sole constructor */
public MockIndexInputWrapper(
MockDirectoryWrapper dir, String name, IndexInput delegate, MockIndexInputWrapper parent) {
MockDirectoryWrapper dir,
String name,
IndexInput delegate,
MockIndexInputWrapper parent,
boolean confined) {
super("MockIndexInputWrapper(name=" + name + " delegate=" + delegate + ")", delegate);
// If we are a clone then our parent better not be a clone!
@ -51,6 +57,8 @@ public class MockIndexInputWrapper extends FilterIndexInput {
this.parent = parent;
this.name = name;
this.dir = dir;
this.confined = confined;
this.thread = Thread.currentThread();
}
@Override
@ -84,6 +92,12 @@ public class MockIndexInputWrapper extends FilterIndexInput {
}
}
private void ensureAccessible() {
if (confined && thread != Thread.currentThread()) {
throw new RuntimeException("Abusing from another thread!");
}
}
@Override
public MockIndexInputWrapper clone() {
ensureOpen();
@ -93,7 +107,7 @@ public class MockIndexInputWrapper extends FilterIndexInput {
dir.inputCloneCount.incrementAndGet();
IndexInput iiclone = in.clone();
MockIndexInputWrapper clone =
new MockIndexInputWrapper(dir, name, iiclone, parent != null ? parent : this);
new MockIndexInputWrapper(dir, name, iiclone, parent != null ? parent : this, confined);
// Pending resolution on LUCENE-686 we may want to
// uncomment this code so that we also track that all
// clones get closed:
@ -120,25 +134,29 @@ public class MockIndexInputWrapper extends FilterIndexInput {
dir.inputCloneCount.incrementAndGet();
IndexInput slice = in.slice(sliceDescription, offset, length);
MockIndexInputWrapper clone =
new MockIndexInputWrapper(dir, sliceDescription, slice, parent != null ? parent : this);
new MockIndexInputWrapper(
dir, sliceDescription, slice, parent != null ? parent : this, confined);
return clone;
}
@Override
public long getFilePointer() {
ensureOpen();
ensureAccessible();
return in.getFilePointer();
}
@Override
public void seek(long pos) throws IOException {
ensureOpen();
ensureAccessible();
in.seek(pos);
}
@Override
public void prefetch(long offset, long length) throws IOException {
ensureOpen();
ensureAccessible();
in.prefetch(offset, length);
}
@ -151,90 +169,105 @@ public class MockIndexInputWrapper extends FilterIndexInput {
@Override
public byte readByte() throws IOException {
ensureOpen();
ensureAccessible();
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
ensureOpen();
ensureAccessible();
in.readBytes(b, offset, len);
}
@Override
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
ensureOpen();
ensureAccessible();
in.readBytes(b, offset, len, useBuffer);
}
@Override
public void readFloats(float[] floats, int offset, int len) throws IOException {
ensureOpen();
ensureAccessible();
in.readFloats(floats, offset, len);
}
@Override
public short readShort() throws IOException {
ensureOpen();
ensureAccessible();
return in.readShort();
}
@Override
public int readInt() throws IOException {
ensureOpen();
ensureAccessible();
return in.readInt();
}
@Override
public long readLong() throws IOException {
ensureOpen();
ensureAccessible();
return in.readLong();
}
@Override
public String readString() throws IOException {
ensureOpen();
ensureAccessible();
return in.readString();
}
@Override
public int readVInt() throws IOException {
ensureOpen();
ensureAccessible();
return in.readVInt();
}
@Override
public long readVLong() throws IOException {
ensureOpen();
ensureAccessible();
return in.readVLong();
}
@Override
public int readZInt() throws IOException {
ensureOpen();
ensureAccessible();
return in.readZInt();
}
@Override
public long readZLong() throws IOException {
ensureOpen();
ensureAccessible();
return in.readZLong();
}
@Override
public void skipBytes(long numBytes) throws IOException {
ensureOpen();
ensureAccessible();
super.skipBytes(numBytes);
}
@Override
public Map<String, String> readMapOfStrings() throws IOException {
ensureOpen();
ensureAccessible();
return in.readMapOfStrings();
}
@Override
public Set<String> readSetOfStrings() throws IOException {
ensureOpen();
ensureAccessible();
return in.readSetOfStrings();
}

View File

@ -35,8 +35,8 @@ class SlowClosingMockIndexInputWrapper extends MockIndexInputWrapper {
}
public SlowClosingMockIndexInputWrapper(
MockDirectoryWrapper dir, String name, IndexInput delegate) {
super(dir, name, delegate, null);
MockDirectoryWrapper dir, String name, IndexInput delegate, boolean confined) {
super(dir, name, delegate, null, confined);
}
@SuppressForbidden(reason = "Thread sleep")

View File

@ -35,8 +35,9 @@ class SlowOpeningMockIndexInputWrapper extends MockIndexInputWrapper {
@SuppressForbidden(reason = "Thread sleep")
public SlowOpeningMockIndexInputWrapper(
MockDirectoryWrapper dir, String name, IndexInput delegate) throws IOException {
super(dir, name, delegate, null);
MockDirectoryWrapper dir, String name, IndexInput delegate, boolean confined)
throws IOException {
super(dir, name, delegate, null, confined);
try {
Thread.sleep(50);
} catch (InterruptedException ie) {

View File

@ -1780,6 +1780,9 @@ public abstract class LuceneTestCase extends Assert {
/** TODO: javadoc */
public static IOContext newIOContext(Random random, IOContext oldContext) {
if (oldContext == IOContext.READONCE) {
return oldContext; // don't mess with the READONCE singleton
}
final int randomNumDocs = random.nextInt(4192);
final int size = random.nextInt(512) * randomNumDocs;
if (oldContext.flushInfo() != null) {
@ -1798,19 +1801,16 @@ public abstract class LuceneTestCase extends Assert {
random.nextBoolean(),
TestUtil.nextInt(random, 1, 100)));
} else {
// Make a totally random IOContext:
// Make a totally random IOContext, except READONCE which has semantic implications
final IOContext context;
switch (random.nextInt(4)) {
switch (random.nextInt(3)) {
case 0:
context = IOContext.DEFAULT;
break;
case 1:
context = IOContext.READONCE;
break;
case 2:
context = new IOContext(new MergeInfo(randomNumDocs, size, true, -1));
break;
case 3:
case 2:
context = new IOContext(new FlushInfo(randomNumDocs, size));
break;
default: