diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index d7b734c09a1..ddbcb71cfd5 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -933,6 +933,10 @@ Bug fixes sloppy query missed documents that exact query matched. Fixed except when for repeating multiterms (e.g. "yes ho yes|no"). (Robert Muir, Doron Cohen) + +* LUCENE-3841: Fix CloseableThreadLocal to also purge stale entries on + get(); this fixes certain cases where we were holding onto objects + for dead threads for too long (Matthew Bellew, Mike McCandless) Optimizations diff --git a/lucene/core/src/java/org/apache/lucene/codecs/FixedGapTermsIndexReader.java b/lucene/core/src/java/org/apache/lucene/codecs/FixedGapTermsIndexReader.java index 49c8b5dbe34..8ebcb628406 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/FixedGapTermsIndexReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/FixedGapTermsIndexReader.java @@ -393,9 +393,6 @@ public class FixedGapTermsIndexReader extends TermsIndexReaderBase { if (in != null && !indexLoaded) { in.close(); } - if (termBytesReader != null) { - termBytesReader.close(); - } } protected void seekDir(IndexInput input, long dirOffset) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java b/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java index 97d873fc290..a1792fa7800 100644 --- a/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java +++ b/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java @@ -19,9 +19,10 @@ package org.apache.lucene.util; import java.io.Closeable; import java.lang.ref.WeakReference; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** Java's builtin ThreadLocal has a serious flaw: * it can take an arbitrarily long amount of time to @@ -56,8 +57,19 @@ public class CloseableThreadLocal implements Closeable { private ThreadLocal> t = new ThreadLocal>(); - private Map hardRefs = new HashMap(); + // Use a WeakHashMap so that if a Thread exits and is + // GC'able, its entry may be removed: + private Map hardRefs = new WeakHashMap(); + // Increase this to decrease frequency of purging in get: + private static int PURGE_MULTIPLIER = 20; + + // On each get or set we decrement this; when it hits 0 we + // purge. After purge, we set this to + // PURGE_MULTIPLIER * stillAliveCount. This keeps + // amortized cost of purging linear. + private final AtomicInteger countUntilPurge = new AtomicInteger(PURGE_MULTIPLIER); + protected T initialValue() { return null; } @@ -69,9 +81,11 @@ public class CloseableThreadLocal implements Closeable { if (iv != null) { set(iv); return iv; - } else + } else { return null; + } } else { + maybePurge(); return weakRef.get(); } } @@ -82,13 +96,35 @@ public class CloseableThreadLocal implements Closeable { synchronized(hardRefs) { hardRefs.put(Thread.currentThread(), object); + maybePurge(); + } + } - // Purge dead threads + private void maybePurge() { + if (countUntilPurge.getAndDecrement() == 0) { + purge(); + } + } + + // Purge dead threads + private void purge() { + synchronized(hardRefs) { + int stillAliveCount = 0; for (Iterator it = hardRefs.keySet().iterator(); it.hasNext();) { final Thread t = it.next(); - if (!t.isAlive()) + if (!t.isAlive()) { it.remove(); + } else { + stillAliveCount++; + } } + int nextCount = (1+stillAliveCount) * PURGE_MULTIPLIER; + if (nextCount <= 0) { + // defensive: int overflow! + nextCount = 1000000; + } + + countUntilPurge.set(nextCount); } } diff --git a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java index 983f3bab69c..cc1ff3f8824 100644 --- a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java +++ b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java @@ -17,7 +17,6 @@ package org.apache.lucene.util; * limitations under the License. */ -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -46,13 +45,12 @@ public final class PagedBytes { private static final byte[] EMPTY_BYTES = new byte[0]; - public final static class Reader implements Closeable { + public final static class Reader { private final byte[][] blocks; private final int[] blockEnds; private final int blockBits; private final int blockMask; private final int blockSize; - private final CloseableThreadLocal threadBuffers = new CloseableThreadLocal(); public Reader(PagedBytes pagedBytes) { blocks = new byte[pagedBytes.blocks.size()][]; @@ -79,6 +77,7 @@ public final class PagedBytes { **/ public BytesRef fillSlice(BytesRef b, long start, int length) { assert length >= 0: "length=" + length; + assert length <= blockSize+1; final int index = (int) (start >> blockBits); final int offset = (int) (start & blockMask); b.length = length; @@ -88,18 +87,10 @@ public final class PagedBytes { b.offset = offset; } else { // Split - byte[] buffer = threadBuffers.get(); - if (buffer == null) { - buffer = new byte[length]; - threadBuffers.set(buffer); - } else if (buffer.length < length) { - buffer = ArrayUtil.grow(buffer, length); - threadBuffers.set(buffer); - } - b.bytes = buffer; + b.bytes = new byte[length]; b.offset = 0; - System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset); - System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset)); + System.arraycopy(blocks[index], offset, b.bytes, 0, blockSize-offset); + System.arraycopy(blocks[1+index], 0, b.bytes, blockSize-offset, length-(blockSize-offset)); } return b; } @@ -216,25 +207,12 @@ public final class PagedBytes { } assert length >= 0: "length=" + length; b.length = length; - if (blockSize - offset >= length) { - // Within block - b.offset = offset; - b.bytes = blocks[index]; - } else { - // Split - byte[] buffer = threadBuffers.get(); - if (buffer == null) { - buffer = new byte[length]; - threadBuffers.set(buffer); - } else if (buffer.length < length) { - buffer = ArrayUtil.grow(buffer, length); - threadBuffers.set(buffer); - } - b.bytes = buffer; - b.offset = 0; - System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset); - System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset)); - } + // We always alloc a new block when writing w/ prefix; + // we could some day relax that and span two blocks: + assert blockSize - offset >= length; + // Within block + b.offset = offset; + b.bytes = blocks[index]; return b; } @@ -247,10 +225,6 @@ public final class PagedBytes { public int[] getBlockEnds() { return blockEnds; } - - public void close() { - threadBuffers.close(); - } } /** 1<= 32768) { + throw new IllegalArgumentException("max length is 32767 (got " + bytes.length + ")"); + } if (upto + bytes.length + 2 > blockSize) { if (bytes.length + 2 > blockSize) { diff --git a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java index 73e84bd4bf7..df8fa2f8966 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java @@ -17,7 +17,9 @@ package org.apache.lucene.util; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataOutput; @@ -26,7 +28,9 @@ public class TestPagedBytes extends LuceneTestCase { public void testDataInputOutput() throws Exception { for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) { - final PagedBytes p = new PagedBytes(_TestUtil.nextInt(random, 1, 20)); + final int blockBits = _TestUtil.nextInt(random, 1, 20); + final int blockSize = 1 << blockBits; + final PagedBytes p = new PagedBytes(blockBits); final DataOutput out = p.getDataOutput(); final int numBytes = random.nextInt(10000000); @@ -43,7 +47,7 @@ public class TestPagedBytes extends LuceneTestCase { } } - p.freeze(random.nextBoolean()); + final PagedBytes.Reader reader = p.freeze(random.nextBoolean()); final DataInput in = p.getDataInput(); @@ -59,6 +63,48 @@ public class TestPagedBytes extends LuceneTestCase { } } assertTrue(Arrays.equals(answer, verify)); + + final BytesRef slice = new BytesRef(); + for(int iter2=0;iter2<100;iter2++) { + final int pos = random.nextInt(numBytes-1); + final int len = random.nextInt(Math.min(blockSize+1, numBytes - pos)); + reader.fillSlice(slice, pos, len); + for(int byteUpto=0;byteUpto addresses = new ArrayList(); + final List answers = new ArrayList(); + int totBytes = 0; + while(totBytes < 10000000 && answers.size() < 100000) { + final int len = random.nextInt(Math.min(blockSize-2, 32768)); + final BytesRef b = new BytesRef(); + b.bytes = new byte[len]; + b.length = len; + b.offset = 0; + random.nextBytes(b.bytes); + answers.add(b); + addresses.add((int) p.copyUsingLengthPrefix(b)); + + totBytes += len; + } + + final PagedBytes.Reader reader = p.freeze(random.nextBoolean()); + + final BytesRef slice = new BytesRef(); + + for(int idx=0;idx