diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index c5db1438d8b..c43128cf245 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -142,6 +142,13 @@ Improvements
because the ICU word-breaking algorithm has some issues. This allows for the previous
tokenization used before Lucene 5. (AM, Robert Muir)
+* LUCENE-7409: Changed MMapDirectory's unmapping to work safer, but still with
+ no guarantees. This uses a store-store barrier and yields the current thread
+ before unmapping to allow in-flight requests to finish. The new code no longer
+ uses WeakIdentityMap as it delegates all ByteBuffer reads throgh a new
+ ByteBufferGuard wrapper that is shared between all ByteBufferIndexInput clones.
+ (Robert Muir, Uwe Schindler)
+
Optimizations
* LUCENE-7330, LUCENE-7339: Speed up conjunction queries. (Adrien Grand)
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBufferGuard.java b/lucene/core/src/java/org/apache/lucene/store/ByteBufferGuard.java
new file mode 100644
index 00000000000..2e7ce2650c1
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBufferGuard.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A guard that is created for every {@link ByteBufferIndexInput} that tries on best effort
+ * to reject any access to the {@link ByteBuffer} behind, once it is unmapped. A single instance
+ * of this is used for the original and all clones, so once the original is closed and unmapped
+ * all clones also throw {@link AlreadyClosedException}, triggered by a {@link NullPointerException}.
+ *
+ * This code uses the trick that is also used in
+ * {@link java.lang.invoke.MutableCallSite#syncAll(java.lang.invoke.MutableCallSite[])} to
+ * invalidate switch points. It also yields the current thread to give other threads a chance
+ * to finish in-flight requests...
+ */
+final class ByteBufferGuard {
+
+ /**
+ * Pass in an implementation of this interface to cleanup ByteBuffers.
+ * MMapDirectory implements this to allow unmapping of bytebuffers with private Java APIs.
+ */
+ @FunctionalInterface
+ static interface BufferCleaner {
+ void freeBuffer(String resourceDescription, ByteBuffer b) throws IOException;
+ }
+
+ private final String resourceDescription;
+ private final BufferCleaner cleaner;
+
+ /** not volatile, we use store-store barrier! */
+ private boolean invalidated = false;
+
+ /** the actual store-store barrier. */
+ private final AtomicInteger barrier = new AtomicInteger();
+
+ /**
+ * Creates an instance to be used for a single {@link ByteBufferIndexInput} which
+ * must be shared by all of its clones.
+ */
+ public ByteBufferGuard(String resourceDescription, BufferCleaner cleaner) {
+ this.resourceDescription = resourceDescription;
+ this.cleaner = cleaner;
+ }
+
+ /**
+ * Invalidates this guard and unmaps (if supported).
+ */
+ public void invalidateAndUnmap(ByteBuffer... bufs) throws IOException {
+ if (cleaner != null) {
+ invalidated = true;
+ // this should trigger a happens-before - so flushes all caches
+ barrier.lazySet(0);
+ Thread.yield();
+ for (ByteBuffer b : bufs) {
+ cleaner.freeBuffer(resourceDescription, b);
+ }
+ }
+ }
+
+ private void ensureValid() {
+ if (invalidated) {
+ // this triggers an AlreadyClosedException in ByteBufferIndexInput:
+ throw new NullPointerException();
+ }
+ }
+
+ public void getBytes(ByteBuffer receiver, byte[] dst, int offset, int length) {
+ ensureValid();
+ receiver.get(dst, offset, length);
+ }
+
+ public byte getByte(ByteBuffer receiver) {
+ ensureValid();
+ return receiver.get();
+ }
+
+ public short getShort(ByteBuffer receiver) {
+ ensureValid();
+ return receiver.getShort();
+ }
+
+ public int getInt(ByteBuffer receiver) {
+ ensureValid();
+ return receiver.getInt();
+ }
+
+ public long getLong(ByteBuffer receiver) {
+ ensureValid();
+ return receiver.getLong();
+ }
+
+ public byte getByte(ByteBuffer receiver, int pos) {
+ ensureValid();
+ return receiver.get(pos);
+ }
+
+ public short getShort(ByteBuffer receiver, int pos) {
+ ensureValid();
+ return receiver.getShort(pos);
+ }
+
+ public int getInt(ByteBuffer receiver, int pos) {
+ ensureValid();
+ return receiver.getInt(pos);
+ }
+
+ public long getLong(ByteBuffer receiver, int pos) {
+ ensureValid();
+ return receiver.getLong(pos);
+ }
+
+}
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBufferIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBufferIndexInput.java
index 8e8ef90655a..0f6c733410b 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteBufferIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBufferIndexInput.java
@@ -21,9 +21,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.lucene.util.WeakIdentityMap;
/**
* Base IndexInput implementation that uses an array
@@ -37,35 +34,32 @@ import org.apache.lucene.util.WeakIdentityMap;
* are a power-of-two (chunkSizePower
).
*/
abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessInput {
- protected final BufferCleaner cleaner;
protected final long length;
protected final long chunkSizeMask;
protected final int chunkSizePower;
+ protected final ByteBufferGuard guard;
protected ByteBuffer[] buffers;
protected int curBufIndex = -1;
protected ByteBuffer curBuf; // redundant for speed: buffers[curBufIndex]
protected boolean isClone = false;
- protected final WeakIdentityMap clones;
- public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, boolean trackClones) {
- final WeakIdentityMap clones = trackClones ? WeakIdentityMap.newConcurrentHashMap() : null;
+ public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
if (buffers.length == 1) {
- return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, cleaner, clones);
+ return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, guard);
} else {
- return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, cleaner, clones);
+ return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, guard);
}
}
- ByteBufferIndexInput(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, WeakIdentityMap clones) {
+ ByteBufferIndexInput(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
super(resourceDescription);
this.buffers = buffers;
this.length = length;
this.chunkSizePower = chunkSizePower;
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
- this.clones = clones;
- this.cleaner = cleaner;
+ this.guard = guard;
assert chunkSizePower >= 0 && chunkSizePower <= 30;
assert (length >>> chunkSizePower) < Integer.MAX_VALUE;
}
@@ -73,7 +67,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public final byte readByte() throws IOException {
try {
- return curBuf.get();
+ return guard.getByte(curBuf);
} catch (BufferUnderflowException e) {
do {
curBufIndex++;
@@ -83,7 +77,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
curBuf = buffers[curBufIndex];
curBuf.position(0);
} while (!curBuf.hasRemaining());
- return curBuf.get();
+ return guard.getByte(curBuf);
} catch (NullPointerException npe) {
throw new AlreadyClosedException("Already closed: " + this);
}
@@ -92,11 +86,11 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public final void readBytes(byte[] b, int offset, int len) throws IOException {
try {
- curBuf.get(b, offset, len);
+ guard.getBytes(curBuf, b, offset, len);
} catch (BufferUnderflowException e) {
int curAvail = curBuf.remaining();
while (len > curAvail) {
- curBuf.get(b, offset, curAvail);
+ guard.getBytes(curBuf, b, offset, curAvail);
len -= curAvail;
offset += curAvail;
curBufIndex++;
@@ -107,7 +101,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
curBuf.position(0);
curAvail = curBuf.remaining();
}
- curBuf.get(b, offset, len);
+ guard.getBytes(curBuf, b, offset, len);
} catch (NullPointerException npe) {
throw new AlreadyClosedException("Already closed: " + this);
}
@@ -116,7 +110,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public final short readShort() throws IOException {
try {
- return curBuf.getShort();
+ return guard.getShort(curBuf);
} catch (BufferUnderflowException e) {
return super.readShort();
} catch (NullPointerException npe) {
@@ -127,7 +121,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public final int readInt() throws IOException {
try {
- return curBuf.getInt();
+ return guard.getInt(curBuf);
} catch (BufferUnderflowException e) {
return super.readInt();
} catch (NullPointerException npe) {
@@ -138,7 +132,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public final long readLong() throws IOException {
try {
- return curBuf.getLong();
+ return guard.getLong(curBuf);
} catch (BufferUnderflowException e) {
return super.readLong();
} catch (NullPointerException npe) {
@@ -181,7 +175,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
public byte readByte(long pos) throws IOException {
try {
final int bi = (int) (pos >> chunkSizePower);
- return buffers[bi].get((int) (pos & chunkSizeMask));
+ return guard.getByte(buffers[bi], (int) (pos & chunkSizeMask));
} catch (IndexOutOfBoundsException ioobe) {
throw new EOFException("seek past EOF: " + this);
} catch (NullPointerException npe) {
@@ -207,7 +201,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
public short readShort(long pos) throws IOException {
final int bi = (int) (pos >> chunkSizePower);
try {
- return buffers[bi].getShort((int) (pos & chunkSizeMask));
+ return guard.getShort(buffers[bi], (int) (pos & chunkSizeMask));
} catch (IndexOutOfBoundsException ioobe) {
// either it's a boundary, or read past EOF, fall back:
setPos(pos, bi);
@@ -221,7 +215,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
public int readInt(long pos) throws IOException {
final int bi = (int) (pos >> chunkSizePower);
try {
- return buffers[bi].getInt((int) (pos & chunkSizeMask));
+ return guard.getInt(buffers[bi], (int) (pos & chunkSizeMask));
} catch (IndexOutOfBoundsException ioobe) {
// either it's a boundary, or read past EOF, fall back:
setPos(pos, bi);
@@ -235,7 +229,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
public long readLong(long pos) throws IOException {
final int bi = (int) (pos >> chunkSizePower);
try {
- return buffers[bi].getLong((int) (pos & chunkSizeMask));
+ return guard.getLong(buffers[bi], (int) (pos & chunkSizeMask));
} catch (IndexOutOfBoundsException ioobe) {
// either it's a boundary, or read past EOF, fall back:
setPos(pos, bi);
@@ -285,11 +279,6 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
final ByteBufferIndexInput clone = newCloneInstance(getFullSliceDescription(sliceDescription), newBuffers, ofs, length);
clone.isClone = true;
-
- // register the new clone in our clone list to clean it up on closing:
- if (clones != null) {
- this.clones.put(clone, Boolean.TRUE);
- }
return clone;
}
@@ -299,9 +288,9 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
protected ByteBufferIndexInput newCloneInstance(String newResourceDescription, ByteBuffer[] newBuffers, int offset, long length) {
if (newBuffers.length == 1) {
newBuffers[0].position(offset);
- return new SingleBufferImpl(newResourceDescription, newBuffers[0].slice(), length, chunkSizePower, this.cleaner, this.clones);
+ return new SingleBufferImpl(newResourceDescription, newBuffers[0].slice(), length, chunkSizePower, this.guard);
} else {
- return new MultiBufferImpl(newResourceDescription, newBuffers, offset, length, chunkSizePower, cleaner, clones);
+ return new MultiBufferImpl(newResourceDescription, newBuffers, offset, length, chunkSizePower, guard);
}
}
@@ -335,25 +324,11 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
// make local copy, then un-set early
final ByteBuffer[] bufs = buffers;
unsetBuffers();
- if (clones != null) {
- clones.remove(this);
- }
if (isClone) return;
- // for extra safety unset also all clones' buffers:
- if (clones != null) {
- for (Iterator it = this.clones.keyIterator(); it.hasNext();) {
- final ByteBufferIndexInput clone = it.next();
- assert clone.isClone;
- clone.unsetBuffers();
- }
- this.clones.clear();
- }
-
- for (final ByteBuffer b : bufs) {
- freeBuffer(b);
- }
+ // tell the guard to invalidate and later unmap the bytebuffers (if supported):
+ guard.invalidateAndUnmap(bufs);
} finally {
unsetBuffers();
}
@@ -367,31 +342,12 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
curBuf = null;
curBufIndex = 0;
}
-
- /**
- * Called when the contents of a buffer will be no longer needed.
- */
- private void freeBuffer(ByteBuffer b) throws IOException {
- if (cleaner != null) {
- cleaner.freeBuffer(this, b);
- }
- }
-
- /**
- * Pass in an implementation of this interface to cleanup ByteBuffers.
- * MMapDirectory implements this to allow unmapping of bytebuffers with private Java APIs.
- */
- @FunctionalInterface
- static interface BufferCleaner {
- void freeBuffer(ByteBufferIndexInput parent, ByteBuffer b) throws IOException;
- }
/** Optimization of ByteBufferIndexInput for when there is only one buffer */
static final class SingleBufferImpl extends ByteBufferIndexInput {
- SingleBufferImpl(String resourceDescription, ByteBuffer buffer, long length, int chunkSizePower,
- BufferCleaner cleaner, WeakIdentityMap clones) {
- super(resourceDescription, new ByteBuffer[] { buffer }, length, chunkSizePower, cleaner, clones);
+ SingleBufferImpl(String resourceDescription, ByteBuffer buffer, long length, int chunkSizePower, ByteBufferGuard guard) {
+ super(resourceDescription, new ByteBuffer[] { buffer }, length, chunkSizePower, guard);
this.curBufIndex = 0;
this.curBuf = buffer;
buffer.position(0);
@@ -426,7 +382,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public byte readByte(long pos) throws IOException {
try {
- return curBuf.get((int) pos);
+ return guard.getByte(curBuf, (int) pos);
} catch (IllegalArgumentException e) {
if (pos < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
@@ -441,7 +397,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public short readShort(long pos) throws IOException {
try {
- return curBuf.getShort((int) pos);
+ return guard.getShort(curBuf, (int) pos);
} catch (IllegalArgumentException e) {
if (pos < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
@@ -456,7 +412,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public int readInt(long pos) throws IOException {
try {
- return curBuf.getInt((int) pos);
+ return guard.getInt(curBuf, (int) pos);
} catch (IllegalArgumentException e) {
if (pos < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
@@ -471,7 +427,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
@Override
public long readLong(long pos) throws IOException {
try {
- return curBuf.getLong((int) pos);
+ return guard.getLong(curBuf, (int) pos);
} catch (IllegalArgumentException e) {
if (pos < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
@@ -489,8 +445,8 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
private final int offset;
MultiBufferImpl(String resourceDescription, ByteBuffer[] buffers, int offset, long length, int chunkSizePower,
- BufferCleaner cleaner, WeakIdentityMap clones) {
- super(resourceDescription, buffers, length, chunkSizePower, cleaner, clones);
+ ByteBufferGuard guard) {
+ super(resourceDescription, buffers, length, chunkSizePower, guard);
this.offset = offset;
try {
seek(0L);
diff --git a/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java b/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
index 60ca103a047..c0e35197f0e 100644
--- a/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
@@ -36,7 +36,7 @@ import java.util.concurrent.Future;
import java.lang.invoke.MethodHandle;
import java.lang.reflect.Method;
-import org.apache.lucene.store.ByteBufferIndexInput.BufferCleaner;
+import org.apache.lucene.store.ByteBufferGuard.BufferCleaner;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SuppressForbidden;
@@ -240,7 +240,7 @@ public class MMapDirectory extends FSDirectory {
final boolean useUnmap = getUseUnmap();
return ByteBufferIndexInput.newInstance(resourceDescription,
map(resourceDescription, c, 0, c.size()),
- c.size(), chunkSizePower, useUnmap ? CLEANER : null, useUnmap);
+ c.size(), chunkSizePower, new ByteBufferGuard(resourceDescription, useUnmap ? CLEANER : null));
}
}
@@ -370,7 +370,7 @@ public class MMapDirectory extends FSDirectory {
final MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
.asType(methodType(void.class, ByteBuffer.class));
- return (BufferCleaner) (ByteBufferIndexInput parent, ByteBuffer buffer) -> {
+ return (BufferCleaner) (String resourceDescription, ByteBuffer buffer) -> {
if (directBufferClass.isInstance(buffer)) {
final Throwable error = AccessController.doPrivileged((PrivilegedAction) () -> {
try {
@@ -381,7 +381,7 @@ public class MMapDirectory extends FSDirectory {
}
});
if (error != null) {
- throw new IOException("Unable to unmap the mapped buffer: " + parent.toString(), error);
+ throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, error);
}
}
};
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java
index 153cc5e6d30..b87a21bc9c9 100644
--- a/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java
+++ b/lucene/core/src/test/org/apache/lucene/store/TestMmapDirectory.java
@@ -19,6 +19,9 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+
+// import org.junit.Ignore;
/**
* Tests MMapDirectory
@@ -39,4 +42,37 @@ public class TestMmapDirectory extends BaseDirectoryTestCase {
MMapDirectory.UNMAP_SUPPORTED);
}
+ // TODO: @Ignore("This test is for JVM testing purposes. There are no guarantees that it may not fail with SIGSEGV!")
+ public void testAceWithThreads() throws Exception {
+ for (int iter = 0; iter < 10; iter++) {
+ Directory dir = getDirectory(createTempDir("testAceWithThreads"));
+ IndexOutput out = dir.createOutput("test", IOContext.DEFAULT);
+ for (int i = 0; i < 8 * 1024 * 1024; i++) {
+ out.writeInt(random().nextInt());
+ }
+ out.close();
+ IndexInput in = dir.openInput("test", IOContext.DEFAULT);
+ IndexInput clone = in.clone();
+ final byte accum[] = new byte[32 * 1024 * 1024];
+ final CountDownLatch shotgun = new CountDownLatch(1);
+ Thread t1 = new Thread(() -> {
+ try {
+ shotgun.await();
+ for (int i = 0; i < 10; i++) {
+ clone.seek(0);
+ clone.readBytes(accum, 0, accum.length);
+ }
+ } catch (IOException | AlreadyClosedException ok) {
+ // OK
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ t1.start();
+ shotgun.countDown();
+ in.close();
+ t1.join();
+ dir.close();
+ }
+ }
}