Fix BytesRef owning issue in string terms aggregations.
The byte[] array that was used to store the term was owned by the BytesRefHash which is used to compute counts. However, the BytesRefHash is released at some point and its content may be recycled. MockPageCacheRecycler has been improved to expose this issue (putting random content into the arrays upon release). Number of documents/terms have been increased in RandomTests to make sure page recycling occurs. Close #5021
This commit is contained in:
parent
444dff7b40
commit
e1c1120949
|
@ -60,6 +60,7 @@ public final class BytesRefHash extends AbstractHash {
|
|||
|
||||
/**
|
||||
* Return the key at <code>0 <e; index <e; capacity()</code>. The result is undefined if the slot is unused.
|
||||
* <p color="red">Beware that the content of the {@link BytesRef} may become invalid as soon as {@link #release()} is called</p>
|
||||
*/
|
||||
public BytesRef get(long id, BytesRef dest) {
|
||||
final long startOffset = startOffsets.get(id);
|
||||
|
|
|
@ -57,7 +57,7 @@ public class StringTerms extends InternalTerms {
|
|||
|
||||
public static class Bucket extends InternalTerms.Bucket {
|
||||
|
||||
final BytesRef termBytes;
|
||||
BytesRef termBytes;
|
||||
|
||||
public Bucket(BytesRef term, long docCount, InternalAggregations aggregations) {
|
||||
super(docCount, aggregations);
|
||||
|
|
|
@ -234,6 +234,8 @@ public class StringTermsAggregator extends BucketsAggregator {
|
|||
final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; --i) {
|
||||
final StringTerms.Bucket bucket = (StringTerms.Bucket) ordered.pop();
|
||||
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
|
||||
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
|
||||
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
|
||||
list[i] = bucket;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.test.TestCluster;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -51,7 +52,7 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
random = new Random(seed);
|
||||
}
|
||||
|
||||
private static <T> V<T> wrap(final V<T> v) {
|
||||
private <T> V<T> wrap(final V<T> v) {
|
||||
ACQUIRED_PAGES.put(v, new Throwable());
|
||||
final Thread t = Thread.currentThread();
|
||||
return new V<T>() {
|
||||
|
@ -67,6 +68,14 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
|
|||
if (t == null) {
|
||||
throw new IllegalStateException("Releasing a page that has not been acquired");
|
||||
}
|
||||
final T ref = v();
|
||||
for (int i = 0; i < Array.getLength(ref); ++i) {
|
||||
if (ref instanceof Object[]) {
|
||||
Array.set(ref, i, null);
|
||||
} else {
|
||||
Array.set(ref, i, (byte) random.nextInt(256));
|
||||
}
|
||||
}
|
||||
return v.release();
|
||||
}
|
||||
|
||||
|
|
|
@ -148,8 +148,9 @@ public class RandomTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
// test long/double/string terms aggs with high number of buckets that require array growth
|
||||
public void testDuelTerms() throws Exception {
|
||||
final int numDocs = atLeast(1000);
|
||||
final int maxNumTerms = randomIntBetween(10, 10000);
|
||||
// These high numbers of docs and terms are important to trigger page recycling
|
||||
final int numDocs = atLeast(10000);
|
||||
final int maxNumTerms = randomIntBetween(10, 100000);
|
||||
|
||||
final IntOpenHashSet valuesSet = new IntOpenHashSet();
|
||||
wipeIndices("idx");
|
||||
|
|
Loading…
Reference in New Issue