mirror of https://github.com/apache/lucene.git
LUCENE-7409: Changed MMapDirectory's unmapping to work safer, but still with no guarantees
This commit is contained in:
parent
cea8a488f0
commit
48cc599936
|
@ -142,6 +142,13 @@ Improvements
|
||||||
because the ICU word-breaking algorithm has some issues. This allows for the previous
|
because the ICU word-breaking algorithm has some issues. This allows for the previous
|
||||||
tokenization used before Lucene 5. (AM, Robert Muir)
|
tokenization used before Lucene 5. (AM, Robert Muir)
|
||||||
|
|
||||||
|
* LUCENE-7409: Changed MMapDirectory's unmapping to work safer, but still with
|
||||||
|
no guarantees. This uses a store-store barrier and yields the current thread
|
||||||
|
before unmapping to allow in-flight requests to finish. The new code no longer
|
||||||
|
uses WeakIdentityMap as it delegates all ByteBuffer reads throgh a new
|
||||||
|
ByteBufferGuard wrapper that is shared between all ByteBufferIndexInput clones.
|
||||||
|
(Robert Muir, Uwe Schindler)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
|
|
||||||
* LUCENE-7330, LUCENE-7339: Speed up conjunction queries. (Adrien Grand)
|
* LUCENE-7330, LUCENE-7339: Speed up conjunction queries. (Adrien Grand)
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.lucene.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A guard that is created for every {@link ByteBufferIndexInput} that tries on best effort
|
||||||
|
* to reject any access to the {@link ByteBuffer} behind, once it is unmapped. A single instance
|
||||||
|
* of this is used for the original and all clones, so once the original is closed and unmapped
|
||||||
|
* all clones also throw {@link AlreadyClosedException}, triggered by a {@link NullPointerException}.
|
||||||
|
* <p>
|
||||||
|
* This code uses the trick that is also used in
|
||||||
|
* {@link java.lang.invoke.MutableCallSite#syncAll(java.lang.invoke.MutableCallSite[])} to
|
||||||
|
* invalidate switch points. It also yields the current thread to give other threads a chance
|
||||||
|
* to finish in-flight requests...
|
||||||
|
*/
|
||||||
|
final class ByteBufferGuard {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass in an implementation of this interface to cleanup ByteBuffers.
|
||||||
|
* MMapDirectory implements this to allow unmapping of bytebuffers with private Java APIs.
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
static interface BufferCleaner {
|
||||||
|
void freeBuffer(String resourceDescription, ByteBuffer b) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String resourceDescription;
|
||||||
|
private final BufferCleaner cleaner;
|
||||||
|
|
||||||
|
/** not volatile, we use store-store barrier! */
|
||||||
|
private boolean invalidated = false;
|
||||||
|
|
||||||
|
/** the actual store-store barrier. */
|
||||||
|
private final AtomicInteger barrier = new AtomicInteger();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an instance to be used for a single {@link ByteBufferIndexInput} which
|
||||||
|
* must be shared by all of its clones.
|
||||||
|
*/
|
||||||
|
public ByteBufferGuard(String resourceDescription, BufferCleaner cleaner) {
|
||||||
|
this.resourceDescription = resourceDescription;
|
||||||
|
this.cleaner = cleaner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates this guard and unmaps (if supported).
|
||||||
|
*/
|
||||||
|
public void invalidateAndUnmap(ByteBuffer... bufs) throws IOException {
|
||||||
|
if (cleaner != null) {
|
||||||
|
invalidated = true;
|
||||||
|
// this should trigger a happens-before - so flushes all caches
|
||||||
|
barrier.lazySet(0);
|
||||||
|
Thread.yield();
|
||||||
|
for (ByteBuffer b : bufs) {
|
||||||
|
cleaner.freeBuffer(resourceDescription, b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureValid() {
|
||||||
|
if (invalidated) {
|
||||||
|
// this triggers an AlreadyClosedException in ByteBufferIndexInput:
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void getBytes(ByteBuffer receiver, byte[] dst, int offset, int length) {
|
||||||
|
ensureValid();
|
||||||
|
receiver.get(dst, offset, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte getByte(ByteBuffer receiver) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public short getShort(ByteBuffer receiver) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getShort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInt(ByteBuffer receiver) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLong(ByteBuffer receiver) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte getByte(ByteBuffer receiver, int pos) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.get(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public short getShort(ByteBuffer receiver, int pos) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getShort(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInt(ByteBuffer receiver, int pos) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getInt(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLong(ByteBuffer receiver, int pos) {
|
||||||
|
ensureValid();
|
||||||
|
return receiver.getLong(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,9 +21,6 @@ import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.BufferUnderflowException;
|
import java.nio.BufferUnderflowException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Iterator;
|
|
||||||
|
|
||||||
import org.apache.lucene.util.WeakIdentityMap;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base IndexInput implementation that uses an array
|
* Base IndexInput implementation that uses an array
|
||||||
|
@ -37,35 +34,32 @@ import org.apache.lucene.util.WeakIdentityMap;
|
||||||
* are a power-of-two (<code>chunkSizePower</code>).
|
* are a power-of-two (<code>chunkSizePower</code>).
|
||||||
*/
|
*/
|
||||||
abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessInput {
|
abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessInput {
|
||||||
protected final BufferCleaner cleaner;
|
|
||||||
protected final long length;
|
protected final long length;
|
||||||
protected final long chunkSizeMask;
|
protected final long chunkSizeMask;
|
||||||
protected final int chunkSizePower;
|
protected final int chunkSizePower;
|
||||||
|
protected final ByteBufferGuard guard;
|
||||||
|
|
||||||
protected ByteBuffer[] buffers;
|
protected ByteBuffer[] buffers;
|
||||||
protected int curBufIndex = -1;
|
protected int curBufIndex = -1;
|
||||||
protected ByteBuffer curBuf; // redundant for speed: buffers[curBufIndex]
|
protected ByteBuffer curBuf; // redundant for speed: buffers[curBufIndex]
|
||||||
|
|
||||||
protected boolean isClone = false;
|
protected boolean isClone = false;
|
||||||
protected final WeakIdentityMap<ByteBufferIndexInput,Boolean> clones;
|
|
||||||
|
|
||||||
public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, boolean trackClones) {
|
public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
|
||||||
final WeakIdentityMap<ByteBufferIndexInput,Boolean> clones = trackClones ? WeakIdentityMap.<ByteBufferIndexInput,Boolean>newConcurrentHashMap() : null;
|
|
||||||
if (buffers.length == 1) {
|
if (buffers.length == 1) {
|
||||||
return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, cleaner, clones);
|
return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, guard);
|
||||||
} else {
|
} else {
|
||||||
return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, cleaner, clones);
|
return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, guard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBufferIndexInput(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
|
ByteBufferIndexInput(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
|
||||||
super(resourceDescription);
|
super(resourceDescription);
|
||||||
this.buffers = buffers;
|
this.buffers = buffers;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.chunkSizePower = chunkSizePower;
|
this.chunkSizePower = chunkSizePower;
|
||||||
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
|
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
|
||||||
this.clones = clones;
|
this.guard = guard;
|
||||||
this.cleaner = cleaner;
|
|
||||||
assert chunkSizePower >= 0 && chunkSizePower <= 30;
|
assert chunkSizePower >= 0 && chunkSizePower <= 30;
|
||||||
assert (length >>> chunkSizePower) < Integer.MAX_VALUE;
|
assert (length >>> chunkSizePower) < Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
@ -73,7 +67,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public final byte readByte() throws IOException {
|
public final byte readByte() throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.get();
|
return guard.getByte(curBuf);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
do {
|
do {
|
||||||
curBufIndex++;
|
curBufIndex++;
|
||||||
|
@ -83,7 +77,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
curBuf = buffers[curBufIndex];
|
curBuf = buffers[curBufIndex];
|
||||||
curBuf.position(0);
|
curBuf.position(0);
|
||||||
} while (!curBuf.hasRemaining());
|
} while (!curBuf.hasRemaining());
|
||||||
return curBuf.get();
|
return guard.getByte(curBuf);
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
throw new AlreadyClosedException("Already closed: " + this);
|
throw new AlreadyClosedException("Already closed: " + this);
|
||||||
}
|
}
|
||||||
|
@ -92,11 +86,11 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public final void readBytes(byte[] b, int offset, int len) throws IOException {
|
public final void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||||
try {
|
try {
|
||||||
curBuf.get(b, offset, len);
|
guard.getBytes(curBuf, b, offset, len);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
int curAvail = curBuf.remaining();
|
int curAvail = curBuf.remaining();
|
||||||
while (len > curAvail) {
|
while (len > curAvail) {
|
||||||
curBuf.get(b, offset, curAvail);
|
guard.getBytes(curBuf, b, offset, curAvail);
|
||||||
len -= curAvail;
|
len -= curAvail;
|
||||||
offset += curAvail;
|
offset += curAvail;
|
||||||
curBufIndex++;
|
curBufIndex++;
|
||||||
|
@ -107,7 +101,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
curBuf.position(0);
|
curBuf.position(0);
|
||||||
curAvail = curBuf.remaining();
|
curAvail = curBuf.remaining();
|
||||||
}
|
}
|
||||||
curBuf.get(b, offset, len);
|
guard.getBytes(curBuf, b, offset, len);
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
throw new AlreadyClosedException("Already closed: " + this);
|
throw new AlreadyClosedException("Already closed: " + this);
|
||||||
}
|
}
|
||||||
|
@ -116,7 +110,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public final short readShort() throws IOException {
|
public final short readShort() throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getShort();
|
return guard.getShort(curBuf);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
return super.readShort();
|
return super.readShort();
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
|
@ -127,7 +121,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public final int readInt() throws IOException {
|
public final int readInt() throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getInt();
|
return guard.getInt(curBuf);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
return super.readInt();
|
return super.readInt();
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
|
@ -138,7 +132,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public final long readLong() throws IOException {
|
public final long readLong() throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getLong();
|
return guard.getLong(curBuf);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
return super.readLong();
|
return super.readLong();
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
|
@ -181,7 +175,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
public byte readByte(long pos) throws IOException {
|
public byte readByte(long pos) throws IOException {
|
||||||
try {
|
try {
|
||||||
final int bi = (int) (pos >> chunkSizePower);
|
final int bi = (int) (pos >> chunkSizePower);
|
||||||
return buffers[bi].get((int) (pos & chunkSizeMask));
|
return guard.getByte(buffers[bi], (int) (pos & chunkSizeMask));
|
||||||
} catch (IndexOutOfBoundsException ioobe) {
|
} catch (IndexOutOfBoundsException ioobe) {
|
||||||
throw new EOFException("seek past EOF: " + this);
|
throw new EOFException("seek past EOF: " + this);
|
||||||
} catch (NullPointerException npe) {
|
} catch (NullPointerException npe) {
|
||||||
|
@ -207,7 +201,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
public short readShort(long pos) throws IOException {
|
public short readShort(long pos) throws IOException {
|
||||||
final int bi = (int) (pos >> chunkSizePower);
|
final int bi = (int) (pos >> chunkSizePower);
|
||||||
try {
|
try {
|
||||||
return buffers[bi].getShort((int) (pos & chunkSizeMask));
|
return guard.getShort(buffers[bi], (int) (pos & chunkSizeMask));
|
||||||
} catch (IndexOutOfBoundsException ioobe) {
|
} catch (IndexOutOfBoundsException ioobe) {
|
||||||
// either it's a boundary, or read past EOF, fall back:
|
// either it's a boundary, or read past EOF, fall back:
|
||||||
setPos(pos, bi);
|
setPos(pos, bi);
|
||||||
|
@ -221,7 +215,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
public int readInt(long pos) throws IOException {
|
public int readInt(long pos) throws IOException {
|
||||||
final int bi = (int) (pos >> chunkSizePower);
|
final int bi = (int) (pos >> chunkSizePower);
|
||||||
try {
|
try {
|
||||||
return buffers[bi].getInt((int) (pos & chunkSizeMask));
|
return guard.getInt(buffers[bi], (int) (pos & chunkSizeMask));
|
||||||
} catch (IndexOutOfBoundsException ioobe) {
|
} catch (IndexOutOfBoundsException ioobe) {
|
||||||
// either it's a boundary, or read past EOF, fall back:
|
// either it's a boundary, or read past EOF, fall back:
|
||||||
setPos(pos, bi);
|
setPos(pos, bi);
|
||||||
|
@ -235,7 +229,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
public long readLong(long pos) throws IOException {
|
public long readLong(long pos) throws IOException {
|
||||||
final int bi = (int) (pos >> chunkSizePower);
|
final int bi = (int) (pos >> chunkSizePower);
|
||||||
try {
|
try {
|
||||||
return buffers[bi].getLong((int) (pos & chunkSizeMask));
|
return guard.getLong(buffers[bi], (int) (pos & chunkSizeMask));
|
||||||
} catch (IndexOutOfBoundsException ioobe) {
|
} catch (IndexOutOfBoundsException ioobe) {
|
||||||
// either it's a boundary, or read past EOF, fall back:
|
// either it's a boundary, or read past EOF, fall back:
|
||||||
setPos(pos, bi);
|
setPos(pos, bi);
|
||||||
|
@ -285,11 +279,6 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
|
|
||||||
final ByteBufferIndexInput clone = newCloneInstance(getFullSliceDescription(sliceDescription), newBuffers, ofs, length);
|
final ByteBufferIndexInput clone = newCloneInstance(getFullSliceDescription(sliceDescription), newBuffers, ofs, length);
|
||||||
clone.isClone = true;
|
clone.isClone = true;
|
||||||
|
|
||||||
// register the new clone in our clone list to clean it up on closing:
|
|
||||||
if (clones != null) {
|
|
||||||
this.clones.put(clone, Boolean.TRUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
return clone;
|
return clone;
|
||||||
}
|
}
|
||||||
|
@ -299,9 +288,9 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
protected ByteBufferIndexInput newCloneInstance(String newResourceDescription, ByteBuffer[] newBuffers, int offset, long length) {
|
protected ByteBufferIndexInput newCloneInstance(String newResourceDescription, ByteBuffer[] newBuffers, int offset, long length) {
|
||||||
if (newBuffers.length == 1) {
|
if (newBuffers.length == 1) {
|
||||||
newBuffers[0].position(offset);
|
newBuffers[0].position(offset);
|
||||||
return new SingleBufferImpl(newResourceDescription, newBuffers[0].slice(), length, chunkSizePower, this.cleaner, this.clones);
|
return new SingleBufferImpl(newResourceDescription, newBuffers[0].slice(), length, chunkSizePower, this.guard);
|
||||||
} else {
|
} else {
|
||||||
return new MultiBufferImpl(newResourceDescription, newBuffers, offset, length, chunkSizePower, cleaner, clones);
|
return new MultiBufferImpl(newResourceDescription, newBuffers, offset, length, chunkSizePower, guard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,25 +324,11 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
// make local copy, then un-set early
|
// make local copy, then un-set early
|
||||||
final ByteBuffer[] bufs = buffers;
|
final ByteBuffer[] bufs = buffers;
|
||||||
unsetBuffers();
|
unsetBuffers();
|
||||||
if (clones != null) {
|
|
||||||
clones.remove(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isClone) return;
|
if (isClone) return;
|
||||||
|
|
||||||
// for extra safety unset also all clones' buffers:
|
// tell the guard to invalidate and later unmap the bytebuffers (if supported):
|
||||||
if (clones != null) {
|
guard.invalidateAndUnmap(bufs);
|
||||||
for (Iterator<ByteBufferIndexInput> it = this.clones.keyIterator(); it.hasNext();) {
|
|
||||||
final ByteBufferIndexInput clone = it.next();
|
|
||||||
assert clone.isClone;
|
|
||||||
clone.unsetBuffers();
|
|
||||||
}
|
|
||||||
this.clones.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final ByteBuffer b : bufs) {
|
|
||||||
freeBuffer(b);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
unsetBuffers();
|
unsetBuffers();
|
||||||
}
|
}
|
||||||
|
@ -367,31 +342,12 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
curBuf = null;
|
curBuf = null;
|
||||||
curBufIndex = 0;
|
curBufIndex = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when the contents of a buffer will be no longer needed.
|
|
||||||
*/
|
|
||||||
private void freeBuffer(ByteBuffer b) throws IOException {
|
|
||||||
if (cleaner != null) {
|
|
||||||
cleaner.freeBuffer(this, b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pass in an implementation of this interface to cleanup ByteBuffers.
|
|
||||||
* MMapDirectory implements this to allow unmapping of bytebuffers with private Java APIs.
|
|
||||||
*/
|
|
||||||
@FunctionalInterface
|
|
||||||
static interface BufferCleaner {
|
|
||||||
void freeBuffer(ByteBufferIndexInput parent, ByteBuffer b) throws IOException;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Optimization of ByteBufferIndexInput for when there is only one buffer */
|
/** Optimization of ByteBufferIndexInput for when there is only one buffer */
|
||||||
static final class SingleBufferImpl extends ByteBufferIndexInput {
|
static final class SingleBufferImpl extends ByteBufferIndexInput {
|
||||||
|
|
||||||
SingleBufferImpl(String resourceDescription, ByteBuffer buffer, long length, int chunkSizePower,
|
SingleBufferImpl(String resourceDescription, ByteBuffer buffer, long length, int chunkSizePower, ByteBufferGuard guard) {
|
||||||
BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
|
super(resourceDescription, new ByteBuffer[] { buffer }, length, chunkSizePower, guard);
|
||||||
super(resourceDescription, new ByteBuffer[] { buffer }, length, chunkSizePower, cleaner, clones);
|
|
||||||
this.curBufIndex = 0;
|
this.curBufIndex = 0;
|
||||||
this.curBuf = buffer;
|
this.curBuf = buffer;
|
||||||
buffer.position(0);
|
buffer.position(0);
|
||||||
|
@ -426,7 +382,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public byte readByte(long pos) throws IOException {
|
public byte readByte(long pos) throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.get((int) pos);
|
return guard.getByte(curBuf, (int) pos);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
if (pos < 0) {
|
if (pos < 0) {
|
||||||
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
||||||
|
@ -441,7 +397,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public short readShort(long pos) throws IOException {
|
public short readShort(long pos) throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getShort((int) pos);
|
return guard.getShort(curBuf, (int) pos);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
if (pos < 0) {
|
if (pos < 0) {
|
||||||
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
||||||
|
@ -456,7 +412,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public int readInt(long pos) throws IOException {
|
public int readInt(long pos) throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getInt((int) pos);
|
return guard.getInt(curBuf, (int) pos);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
if (pos < 0) {
|
if (pos < 0) {
|
||||||
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
||||||
|
@ -471,7 +427,7 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
@Override
|
@Override
|
||||||
public long readLong(long pos) throws IOException {
|
public long readLong(long pos) throws IOException {
|
||||||
try {
|
try {
|
||||||
return curBuf.getLong((int) pos);
|
return guard.getLong(curBuf, (int) pos);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
if (pos < 0) {
|
if (pos < 0) {
|
||||||
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
throw new IllegalArgumentException("Seeking to negative position: " + this, e);
|
||||||
|
@ -489,8 +445,8 @@ abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessIn
|
||||||
private final int offset;
|
private final int offset;
|
||||||
|
|
||||||
MultiBufferImpl(String resourceDescription, ByteBuffer[] buffers, int offset, long length, int chunkSizePower,
|
MultiBufferImpl(String resourceDescription, ByteBuffer[] buffers, int offset, long length, int chunkSizePower,
|
||||||
BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
|
ByteBufferGuard guard) {
|
||||||
super(resourceDescription, buffers, length, chunkSizePower, cleaner, clones);
|
super(resourceDescription, buffers, length, chunkSizePower, guard);
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
try {
|
try {
|
||||||
seek(0L);
|
seek(0L);
|
||||||
|
|
|
@ -36,7 +36,7 @@ import java.util.concurrent.Future;
|
||||||
import java.lang.invoke.MethodHandle;
|
import java.lang.invoke.MethodHandle;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
import org.apache.lucene.store.ByteBufferIndexInput.BufferCleaner;
|
import org.apache.lucene.store.ByteBufferGuard.BufferCleaner;
|
||||||
import org.apache.lucene.util.Constants;
|
import org.apache.lucene.util.Constants;
|
||||||
import org.apache.lucene.util.SuppressForbidden;
|
import org.apache.lucene.util.SuppressForbidden;
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ public class MMapDirectory extends FSDirectory {
|
||||||
final boolean useUnmap = getUseUnmap();
|
final boolean useUnmap = getUseUnmap();
|
||||||
return ByteBufferIndexInput.newInstance(resourceDescription,
|
return ByteBufferIndexInput.newInstance(resourceDescription,
|
||||||
map(resourceDescription, c, 0, c.size()),
|
map(resourceDescription, c, 0, c.size()),
|
||||||
c.size(), chunkSizePower, useUnmap ? CLEANER : null, useUnmap);
|
c.size(), chunkSizePower, new ByteBufferGuard(resourceDescription, useUnmap ? CLEANER : null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,7 +370,7 @@ public class MMapDirectory extends FSDirectory {
|
||||||
final MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
|
final MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
|
||||||
.asType(methodType(void.class, ByteBuffer.class));
|
.asType(methodType(void.class, ByteBuffer.class));
|
||||||
|
|
||||||
return (BufferCleaner) (ByteBufferIndexInput parent, ByteBuffer buffer) -> {
|
return (BufferCleaner) (String resourceDescription, ByteBuffer buffer) -> {
|
||||||
if (directBufferClass.isInstance(buffer)) {
|
if (directBufferClass.isInstance(buffer)) {
|
||||||
final Throwable error = AccessController.doPrivileged((PrivilegedAction<Throwable>) () -> {
|
final Throwable error = AccessController.doPrivileged((PrivilegedAction<Throwable>) () -> {
|
||||||
try {
|
try {
|
||||||
|
@ -381,7 +381,7 @@ public class MMapDirectory extends FSDirectory {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
throw new IOException("Unable to unmap the mapped buffer: " + parent.toString(), error);
|
throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -19,6 +19,9 @@ package org.apache.lucene.store;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
// import org.junit.Ignore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests MMapDirectory
|
* Tests MMapDirectory
|
||||||
|
@ -39,4 +42,37 @@ public class TestMmapDirectory extends BaseDirectoryTestCase {
|
||||||
MMapDirectory.UNMAP_SUPPORTED);
|
MMapDirectory.UNMAP_SUPPORTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: @Ignore("This test is for JVM testing purposes. There are no guarantees that it may not fail with SIGSEGV!")
|
||||||
|
public void testAceWithThreads() throws Exception {
|
||||||
|
for (int iter = 0; iter < 10; iter++) {
|
||||||
|
Directory dir = getDirectory(createTempDir("testAceWithThreads"));
|
||||||
|
IndexOutput out = dir.createOutput("test", IOContext.DEFAULT);
|
||||||
|
for (int i = 0; i < 8 * 1024 * 1024; i++) {
|
||||||
|
out.writeInt(random().nextInt());
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
IndexInput in = dir.openInput("test", IOContext.DEFAULT);
|
||||||
|
IndexInput clone = in.clone();
|
||||||
|
final byte accum[] = new byte[32 * 1024 * 1024];
|
||||||
|
final CountDownLatch shotgun = new CountDownLatch(1);
|
||||||
|
Thread t1 = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
shotgun.await();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
clone.seek(0);
|
||||||
|
clone.readBytes(accum, 0, accum.length);
|
||||||
|
}
|
||||||
|
} catch (IOException | AlreadyClosedException ok) {
|
||||||
|
// OK
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t1.start();
|
||||||
|
shotgun.countDown();
|
||||||
|
in.close();
|
||||||
|
t1.join();
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue