LUCENE-3841: fix CloseableThreadLocal to also purge entries periodically in get(), to avoid holding onto objects for too long

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1300547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-03-14 13:54:27 +00:00
parent d8fff9b873
commit 6a4e492913
5 changed files with 107 additions and 47 deletions

View File

@ -934,6 +934,10 @@ Bug fixes
Fixed except when for repeating multiterms (e.g. "yes ho yes|no").
(Robert Muir, Doron Cohen)
* LUCENE-3841: Fix CloseableThreadLocal to also purge stale entries on
get(); this fixes certain cases where we were holding onto objects
for dead threads for too long (Matthew Bellew, Mike McCandless)
Optimizations
* LUCENE-3653: Improve concurrency in VirtualMethod and AttributeSource by

View File

@ -393,9 +393,6 @@ public class FixedGapTermsIndexReader extends TermsIndexReaderBase {
if (in != null && !indexLoaded) {
in.close();
}
if (termBytesReader != null) {
termBytesReader.close();
}
}
protected void seekDir(IndexInput input, long dirOffset) throws IOException {

View File

@ -19,9 +19,10 @@ package org.apache.lucene.util;
import java.io.Closeable;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/** Java's builtin ThreadLocal has a serious flaw:
* it can take an arbitrarily long amount of time to
@ -56,7 +57,18 @@ public class CloseableThreadLocal<T> implements Closeable {
private ThreadLocal<WeakReference<T>> t = new ThreadLocal<WeakReference<T>>();
private Map<Thread,T> hardRefs = new HashMap<Thread,T>();
// Use a WeakHashMap so that if a Thread exits and is
// GC'able, its entry may be removed:
private Map<Thread,T> hardRefs = new WeakHashMap<Thread,T>();
// Increase this to decrease frequency of purging in get:
private static int PURGE_MULTIPLIER = 20;
// On each get or set we decrement this; when it hits 0 we
// purge. After purge, we set this to
// PURGE_MULTIPLIER * stillAliveCount. This keeps
// amortized cost of purging linear.
private final AtomicInteger countUntilPurge = new AtomicInteger(PURGE_MULTIPLIER);
protected T initialValue() {
return null;
@ -69,9 +81,11 @@ public class CloseableThreadLocal<T> implements Closeable {
if (iv != null) {
set(iv);
return iv;
} else
return null;
} else {
return null;
}
} else {
maybePurge();
return weakRef.get();
}
}
@ -82,14 +96,36 @@ public class CloseableThreadLocal<T> implements Closeable {
synchronized(hardRefs) {
hardRefs.put(Thread.currentThread(), object);
maybePurge();
}
}
private void maybePurge() {
if (countUntilPurge.getAndDecrement() == 0) {
purge();
}
}
// Purge dead threads
private void purge() {
synchronized(hardRefs) {
int stillAliveCount = 0;
for (Iterator<Thread> it = hardRefs.keySet().iterator(); it.hasNext();) {
final Thread t = it.next();
if (!t.isAlive())
if (!t.isAlive()) {
it.remove();
} else {
stillAliveCount++;
}
}
int nextCount = (1+stillAliveCount) * PURGE_MULTIPLIER;
if (nextCount <= 0) {
// defensive: int overflow!
nextCount = 1000000;
}
countUntilPurge.set(nextCount);
}
}
public void close() {

View File

@ -17,7 +17,6 @@ package org.apache.lucene.util;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -46,13 +45,12 @@ public final class PagedBytes {
private static final byte[] EMPTY_BYTES = new byte[0];
public final static class Reader implements Closeable {
public final static class Reader {
private final byte[][] blocks;
private final int[] blockEnds;
private final int blockBits;
private final int blockMask;
private final int blockSize;
private final CloseableThreadLocal<byte[]> threadBuffers = new CloseableThreadLocal<byte[]>();
public Reader(PagedBytes pagedBytes) {
blocks = new byte[pagedBytes.blocks.size()][];
@ -79,6 +77,7 @@ public final class PagedBytes {
**/
public BytesRef fillSlice(BytesRef b, long start, int length) {
assert length >= 0: "length=" + length;
assert length <= blockSize+1;
final int index = (int) (start >> blockBits);
final int offset = (int) (start & blockMask);
b.length = length;
@ -88,18 +87,10 @@ public final class PagedBytes {
b.offset = offset;
} else {
// Split
byte[] buffer = threadBuffers.get();
if (buffer == null) {
buffer = new byte[length];
threadBuffers.set(buffer);
} else if (buffer.length < length) {
buffer = ArrayUtil.grow(buffer, length);
threadBuffers.set(buffer);
}
b.bytes = buffer;
b.bytes = new byte[length];
b.offset = 0;
System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
System.arraycopy(blocks[index], offset, b.bytes, 0, blockSize-offset);
System.arraycopy(blocks[1+index], 0, b.bytes, blockSize-offset, length-(blockSize-offset));
}
return b;
}
@ -216,25 +207,12 @@ public final class PagedBytes {
}
assert length >= 0: "length=" + length;
b.length = length;
if (blockSize - offset >= length) {
// We always alloc a new block when writing w/ prefix;
// we could some day relax that and span two blocks:
assert blockSize - offset >= length;
// Within block
b.offset = offset;
b.bytes = blocks[index];
} else {
// Split
byte[] buffer = threadBuffers.get();
if (buffer == null) {
buffer = new byte[length];
threadBuffers.set(buffer);
} else if (buffer.length < length) {
buffer = ArrayUtil.grow(buffer, length);
threadBuffers.set(buffer);
}
b.bytes = buffer;
b.offset = 0;
System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
}
return b;
}
@ -247,10 +225,6 @@ public final class PagedBytes {
public int[] getBlockEnds() {
return blockEnds;
}
public void close() {
threadBuffers.close();
}
}
/** 1<<blockBits must be bigger than biggest single
@ -375,6 +349,9 @@ public final class PagedBytes {
/** Copy bytes in, writing the length as a 1 or 2 byte
* vInt prefix. */
public long copyUsingLengthPrefix(BytesRef bytes) throws IOException {
if (bytes.length >= 32768) {
throw new IllegalArgumentException("max length is 32767 (got " + bytes.length + ")");
}
if (upto + bytes.length + 2 > blockSize) {
if (bytes.length + 2 > blockSize) {

View File

@ -17,7 +17,9 @@
package org.apache.lucene.util;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
@ -26,7 +28,9 @@ public class TestPagedBytes extends LuceneTestCase {
public void testDataInputOutput() throws Exception {
for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
final PagedBytes p = new PagedBytes(_TestUtil.nextInt(random, 1, 20));
final int blockBits = _TestUtil.nextInt(random, 1, 20);
final int blockSize = 1 << blockBits;
final PagedBytes p = new PagedBytes(blockBits);
final DataOutput out = p.getDataOutput();
final int numBytes = random.nextInt(10000000);
@ -43,7 +47,7 @@ public class TestPagedBytes extends LuceneTestCase {
}
}
p.freeze(random.nextBoolean());
final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
final DataInput in = p.getDataInput();
@ -59,6 +63,48 @@ public class TestPagedBytes extends LuceneTestCase {
}
}
assertTrue(Arrays.equals(answer, verify));
final BytesRef slice = new BytesRef();
for(int iter2=0;iter2<100;iter2++) {
final int pos = random.nextInt(numBytes-1);
final int len = random.nextInt(Math.min(blockSize+1, numBytes - pos));
reader.fillSlice(slice, pos, len);
for(int byteUpto=0;byteUpto<len;byteUpto++) {
assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]);
}
}
}
}
public void testLengthPrefix() throws Exception {
for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
final int blockBits = _TestUtil.nextInt(random, 2, 20);
final int blockSize = 1 << blockBits;
final PagedBytes p = new PagedBytes(blockBits);
final List<Integer> addresses = new ArrayList<Integer>();
final List<BytesRef> answers = new ArrayList<BytesRef>();
int totBytes = 0;
while(totBytes < 10000000 && answers.size() < 100000) {
final int len = random.nextInt(Math.min(blockSize-2, 32768));
final BytesRef b = new BytesRef();
b.bytes = new byte[len];
b.length = len;
b.offset = 0;
random.nextBytes(b.bytes);
answers.add(b);
addresses.add((int) p.copyUsingLengthPrefix(b));
totBytes += len;
}
final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
final BytesRef slice = new BytesRef();
for(int idx=0;idx<answers.size();idx++) {
reader.fillSliceWithPrefix(slice, addresses.get(idx));
assertEquals(answers.get(idx), slice);
}
}
}
}