From 40d67c7e091194340297aa6eed721e988977b811 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 13 Mar 2014 13:14:44 +0100 Subject: [PATCH] Make aggregations CacheRecycler-free. Aggregations were still using CacheRecycler on the reduce phase. They are now using page-based recycling for both the aggregation phase and the reduce phase. Close #4929 --- .../common/util/AbstractHash.java | 96 +-------- .../common/util/AbstractPagedHashMap.java | 127 +++++++++++ .../common/util/BytesRefHash.java | 5 +- .../common/util/DoubleObjectPagedHashMap.java | 200 ++++++++++++++++++ .../elasticsearch/common/util/LongHash.java | 15 +- .../common/util/LongObjectPagedHashMap.java | 200 ++++++++++++++++++ .../percolator/PercolatorService.java | 2 +- .../aggregations/InternalAggregation.java | 12 +- .../aggregations/InternalAggregations.java | 10 +- .../InternalSingleBucketAggregation.java | 4 +- .../bucket/geogrid/InternalGeoHashGrid.java | 39 ++-- .../bucket/histogram/InternalHistogram.java | 39 ++-- .../bucket/range/InternalRange.java | 12 +- .../bucket/terms/DoubleTerms.java | 29 ++- .../bucket/terms/InternalTerms.java | 16 +- .../aggregations/bucket/terms/LongTerms.java | 29 ++- .../controller/SearchPhaseController.java | 9 +- .../common/util/DoubleObjectHashMapTests.java | 59 ++++++ .../common/util/LongObjectHashMapTests.java | 59 ++++++ 19 files changed, 759 insertions(+), 203 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/AbstractPagedHashMap.java create mode 100644 src/main/java/org/elasticsearch/common/util/DoubleObjectPagedHashMap.java create mode 100644 src/main/java/org/elasticsearch/common/util/LongObjectPagedHashMap.java create mode 100644 src/test/java/org/elasticsearch/common/util/DoubleObjectHashMapTests.java create mode 100644 src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java diff --git a/src/main/java/org/elasticsearch/common/util/AbstractHash.java b/src/main/java/org/elasticsearch/common/util/AbstractHash.java index 8dcb4b1ed4d..5f261c7a9bb 100644 --- a/src/main/java/org/elasticsearch/common/util/AbstractHash.java +++ b/src/main/java/org/elasticsearch/common/util/AbstractHash.java @@ -19,61 +19,20 @@ package org.elasticsearch.common.util; -import com.google.common.base.Preconditions; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; /** - * Base implementation for {@link BytesRefHash} and {@link LongHash}. + * Base implementation for {@link BytesRefHash} and {@link LongHash}, or any class that + * needs to map values to dense ords. This class is not thread-safe. */ // IDs are internally stored as id + 1 so that 0 encodes for an empty slot -abstract class AbstractHash implements Releasable { +abstract class AbstractHash extends AbstractPagedHashMap { - // Open addressing typically requires having smaller load factors compared to linked lists because - // collisions may result into worse lookup performance. - static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f; - - final BigArrays bigArrays; - final float maxLoadFactor; - long size, maxSize; LongArray ids; - long mask; AbstractHash(long capacity, float maxLoadFactor, BigArrays bigArrays) { - Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0"); - Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1"); - this.bigArrays = bigArrays; - this.maxLoadFactor = maxLoadFactor; - long buckets = 1L + (long) (capacity / maxLoadFactor); - buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two - assert buckets == Long.highestOneBit(buckets); - maxSize = (long) (buckets * maxLoadFactor); - assert maxSize >= capacity; - size = 0; - ids = bigArrays.newLongArray(buckets, true); - mask = buckets - 1; - } - - /** - * Return the number of allocated slots to store this hash table. - */ - public long capacity() { - return ids.size(); - } - - /** - * Return the number of longs in this hash table. - */ - public long size() { - return size; - } - - static long slot(long hash, long mask) { - return hash & mask; - } - - static long nextSlot(long curSlot, long mask) { - return (curSlot + 1) & mask; // linear probing + super(capacity, maxLoadFactor, bigArrays); + ids = bigArrays.newLongArray(capacity(), true); } /** @@ -87,46 +46,13 @@ abstract class AbstractHash implements Releasable { return ids.set(index, id + 1) - 1; } - /** Resize keys to the given capacity. */ - protected void resizeKeys(long capacity) {} + protected void resize(long capacity) { + ids = bigArrays.resize(ids, capacity); + } - /** Remove key at the given index and */ - protected abstract void removeAndAdd(long index, long id); - - protected final void grow() { - // The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes - // the re-mapping of keys to slots a bit more tricky. - assert size == maxSize; - final long prevSize = size; - final long buckets = capacity(); - // Resize arrays - final long newBuckets = buckets << 1; - assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2 - resizeKeys(newBuckets); - ids = bigArrays.resize(ids, newBuckets); - mask = newBuckets - 1; - // First let's remap in-place: most data will be put in its final position directly - for (long i = 0; i < buckets; ++i) { - final long id = id(i, -1); - if (id != -1) { - removeAndAdd(i, id); - } - } - // The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that - // is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision - // resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly - // allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early. - for (long i = buckets; i < newBuckets; ++i) { - final long id = id(i, -1); - if (id != -1) { - removeAndAdd(i, id); // add it back - } else { - break; - } - } - assert size == prevSize; - maxSize = (long) (newBuckets * maxLoadFactor); - assert size < maxSize; + @Override + protected boolean used(long bucket) { + return id(bucket) >= 0; } @Override diff --git a/src/main/java/org/elasticsearch/common/util/AbstractPagedHashMap.java b/src/main/java/org/elasticsearch/common/util/AbstractPagedHashMap.java new file mode 100644 index 00000000000..0793da83f7f --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/AbstractPagedHashMap.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import com.carrotsearch.hppc.hash.MurmurHash3; +import com.google.common.base.Preconditions; +import org.elasticsearch.common.lease.Releasable; + +/** + * Base implementation for a hash table that is paged, recycles arrays and grows in-place. + */ +abstract class AbstractPagedHashMap implements Releasable { + + // Open addressing typically requires having smaller load factors compared to linked lists because + // collisions may result into worse lookup performance. + static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f; + + static long hash(long value) { + // Don't use the value directly. Under some cases eg dates, it could be that the low bits don't carry much value and we would like + // all bits of the hash to carry as much value + return MurmurHash3.hash(value); + } + + static long hash(double value) { + return hash(Double.doubleToLongBits(value)); + } + + final BigArrays bigArrays; + final float maxLoadFactor; + long size, maxSize; + long mask; + + AbstractPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) { + Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0"); + Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1"); + this.bigArrays = bigArrays; + this.maxLoadFactor = maxLoadFactor; + long buckets = 1L + (long) (capacity / maxLoadFactor); + buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two + assert buckets == Long.highestOneBit(buckets); + maxSize = (long) (buckets * maxLoadFactor); + assert maxSize >= capacity; + size = 0; + mask = buckets - 1; + } + + /** + * Return the number of allocated slots to store this hash table. + */ + public long capacity() { + return mask + 1; + } + + /** + * Return the number of longs in this hash table. + */ + public long size() { + return size; + } + + static long slot(long hash, long mask) { + return hash & mask; + } + + static long nextSlot(long curSlot, long mask) { + return (curSlot + 1) & mask; // linear probing + } + + /** Resize to the given capacity. */ + protected abstract void resize(long capacity); + + protected abstract boolean used(long bucket); + + /** Remove the entry at the given index and add it back */ + protected abstract void removeAndAdd(long index); + + protected final void grow() { + // The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes + // the re-mapping of keys to slots a bit more tricky. + assert size == maxSize; + final long prevSize = size; + final long buckets = capacity(); + // Resize arrays + final long newBuckets = buckets << 1; + assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2 + resize(newBuckets); + mask = newBuckets - 1; + // First let's remap in-place: most data will be put in its final position directly + for (long i = 0; i < buckets; ++i) { + if (used(i)) { + removeAndAdd(i); + } + } + // The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that + // is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision + // resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly + // allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early. + for (long i = buckets; i < newBuckets; ++i) { + if (used(i)) { + removeAndAdd(i); // add it back + } else { + break; + } + } + assert size == prevSize; + maxSize = (long) (newBuckets * maxLoadFactor); + assert size < maxSize; + } + +} diff --git a/src/main/java/org/elasticsearch/common/util/BytesRefHash.java b/src/main/java/org/elasticsearch/common/util/BytesRefHash.java index 119b0be9b0d..956dc3e3489 100644 --- a/src/main/java/org/elasticsearch/common/util/BytesRefHash.java +++ b/src/main/java/org/elasticsearch/common/util/BytesRefHash.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.lease.Releasables; * BytesRef values to ids. Collisions are resolved with open addressing and linear * probing, growth is smooth thanks to {@link BigArrays}, hashes are cached for faster * re-hashing and capacity is always a multiple of 2 for faster identification of buckets. + * This class is not thread-safe. */ public final class BytesRefHash extends AbstractHash { @@ -150,7 +151,9 @@ public final class BytesRefHash extends AbstractHash { } @Override - protected void removeAndAdd(long index, long id) { + protected void removeAndAdd(long index) { + final long id = id(index, -1); + assert id >= 0; final int code = hashes.get(id); reset(code, id); } diff --git a/src/main/java/org/elasticsearch/common/util/DoubleObjectPagedHashMap.java b/src/main/java/org/elasticsearch/common/util/DoubleObjectPagedHashMap.java new file mode 100644 index 00000000000..574744ca984 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/DoubleObjectPagedHashMap.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + + +import com.google.common.collect.UnmodifiableIterator; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.lease.Releasables; + +import java.util.Iterator; +import java.util.NoSuchElementException; + + +/** + * A hash table from native doubles to objects. This implementation resolves collisions + * using open-addressing and does not support null values. This class is not thread-safe. + */ +public class DoubleObjectPagedHashMap extends AbstractPagedHashMap implements Iterable> { + + private DoubleArray keys; + private ObjectArray values; + + public DoubleObjectPagedHashMap(BigArrays bigArrays) { + this(16, bigArrays); + } + + public DoubleObjectPagedHashMap(long capacity, BigArrays bigArrays) { + this(capacity, DEFAULT_MAX_LOAD_FACTOR, bigArrays); + } + + public DoubleObjectPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) { + super(capacity, maxLoadFactor, bigArrays); + keys = bigArrays.newDoubleArray(capacity(), false); + values = bigArrays.newObjectArray(capacity()); + } + + /** + * Get the value that is associated with key or null if key + * was not present in the hash table. + */ + public T get(double key) { + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T value = values.get(i); + if (value == null) { + return null; + } else if (keys.get(i) == key) { + return value; + } + } + } + + /** + * Put this new (key, value) pair into this hash table and return the value + * that was previously associated with key or null in case of + * an insertion. + */ + public T put(double key, T value) { + if (size >= maxSize) { + assert size == maxSize; + grow(); + } + assert size < maxSize; + return set(key, value); + } + + /** + * Remove the entry which has this key in the hash table and return the + * associated value or null if there was no entry associated with this key. + */ + public T remove(double key) { + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T previous = values.set(i, null); + if (previous == null) { + return null; + } else if (keys.get(i) == key) { + --size; + for (long j = nextSlot(i, mask); used(j); j = nextSlot(j, mask)) { + removeAndAdd(j); + } + return previous; + } else { + // repair and continue + values.set(i, previous); + } + } + } + + private T set(double key, T value) { + if (value == null) { + throw new IllegalArgumentException("Null values are not supported"); + } + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T previous = values.set(i, value); + if (previous == null) { + // slot was free + keys.set(i, key); + ++size; + return null; + } else if (key == keys.get(i)) { + // we just updated the value + return previous; + } else { + // not the right key, repair and continue + values.set(i, previous); + } + } + } + + @Override + public Iterator> iterator() { + return new UnmodifiableIterator>() { + + boolean cached; + final Cursor cursor; + { + cursor = new Cursor(); + cursor.index = -1; + cached = false; + } + + @Override + public boolean hasNext() { + if (!cached) { + while (true) { + ++cursor.index; + if (cursor.index >= capacity()) { + break; + } else if (used(cursor.index)) { + cursor.key = keys.get(cursor.index); + cursor.value = values.get(cursor.index); + break; + } + } + cached = true; + } + return cursor.index < capacity(); + } + + @Override + public Cursor next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + cached = false; + return cursor; + } + + }; + } + + @Override + public boolean release() throws ElasticsearchException { + Releasables.release(keys, values); + return true; + } + + @Override + protected void resize(long capacity) { + keys = bigArrays.resize(keys, capacity); + values = bigArrays.resize(values, capacity); + } + + @Override + protected boolean used(long bucket) { + return values.get(bucket) != null; + } + + @Override + protected void removeAndAdd(long index) { + final double key = keys.get(index); + final T value = values.set(index, null); + --size; + final T removed = set(key, value); + assert removed == null; + } + + public static final class Cursor { + public long index; + public double key; + public T value; + } + +} diff --git a/src/main/java/org/elasticsearch/common/util/LongHash.java b/src/main/java/org/elasticsearch/common/util/LongHash.java index f77053d96ed..dbaf2af3632 100644 --- a/src/main/java/org/elasticsearch/common/util/LongHash.java +++ b/src/main/java/org/elasticsearch/common/util/LongHash.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util; -import com.carrotsearch.hppc.hash.MurmurHash3; import org.elasticsearch.common.lease.Releasables; /** @@ -27,6 +26,7 @@ import org.elasticsearch.common.lease.Releasables; * long values to ids. Collisions are resolved with open addressing and linear * probing, growth is smooth thanks to {@link BigArrays} and capacity is always * a multiple of 2 for faster identification of buckets. + * This class is not thread-safe. */ // IDs are internally stored as id + 1 so that 0 encodes for an empty slot public final class LongHash extends AbstractHash { @@ -44,12 +44,6 @@ public final class LongHash extends AbstractHash { keys = bigArrays.newLongArray(capacity(), false); } - private static long hash(long value) { - // Don't use the value directly. Under some cases eg dates, it could be that the low bits don't carry much value and we would like - // all bits of the hash to carry as much value - return MurmurHash3.hash(value); - } - /** * Return the key at 0 <e; index <e; capacity(). The result is undefined if the slot is unused. */ @@ -114,12 +108,15 @@ public final class LongHash extends AbstractHash { } @Override - protected void resizeKeys(long capacity) { + protected void resize(long capacity) { + super.resize(capacity); keys = bigArrays.resize(keys, capacity); } @Override - protected void removeAndAdd(long index, long id) { + protected void removeAndAdd(long index) { + final long id = id(index, -1); + assert id >= 0; final long key = keys.set(index, 0); reset(key, id); } diff --git a/src/main/java/org/elasticsearch/common/util/LongObjectPagedHashMap.java b/src/main/java/org/elasticsearch/common/util/LongObjectPagedHashMap.java new file mode 100644 index 00000000000..18af93140cb --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/LongObjectPagedHashMap.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + + +import com.google.common.collect.UnmodifiableIterator; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.lease.Releasables; + +import java.util.Iterator; +import java.util.NoSuchElementException; + + +/** + * A hash table from native longs to objects. This implementation resolves collisions + * using open-addressing and does not support null values. This class is not thread-safe. + */ +public class LongObjectPagedHashMap extends AbstractPagedHashMap implements Iterable> { + + private LongArray keys; + private ObjectArray values; + + public LongObjectPagedHashMap(BigArrays bigArrays) { + this(16, bigArrays); + } + + public LongObjectPagedHashMap(long capacity, BigArrays bigArrays) { + this(capacity, DEFAULT_MAX_LOAD_FACTOR, bigArrays); + } + + public LongObjectPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) { + super(capacity, maxLoadFactor, bigArrays); + keys = bigArrays.newLongArray(capacity(), false); + values = bigArrays.newObjectArray(capacity()); + } + + /** + * Get the value that is associated with key or null if key + * was not present in the hash table. + */ + public T get(long key) { + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T value = values.get(i); + if (value == null) { + return null; + } else if (keys.get(i) == key) { + return value; + } + } + } + + /** + * Put this new (key, value) pair into this hash table and return the value + * that was previously associated with key or null in case of + * an insertion. + */ + public T put(long key, T value) { + if (size >= maxSize) { + assert size == maxSize; + grow(); + } + assert size < maxSize; + return set(key, value); + } + + /** + * Remove the entry which has this key in the hash table and return the + * associated value or null if there was no entry associated with this key. + */ + public T remove(long key) { + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T previous = values.set(i, null); + if (previous == null) { + return null; + } else if (keys.get(i) == key) { + --size; + for (long j = nextSlot(i, mask); used(j); j = nextSlot(j, mask)) { + removeAndAdd(j); + } + return previous; + } else { + // repair and continue + values.set(i, previous); + } + } + } + + private T set(long key, T value) { + if (value == null) { + throw new IllegalArgumentException("Null values are not supported"); + } + for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) { + final T previous = values.set(i, value); + if (previous == null) { + // slot was free + keys.set(i, key); + ++size; + return null; + } else if (key == keys.get(i)) { + // we just updated the value + return previous; + } else { + // not the right key, repair and continue + values.set(i, previous); + } + } + } + + @Override + public Iterator> iterator() { + return new UnmodifiableIterator>() { + + boolean cached; + final Cursor cursor; + { + cursor = new Cursor(); + cursor.index = -1; + cached = false; + } + + @Override + public boolean hasNext() { + if (!cached) { + while (true) { + ++cursor.index; + if (cursor.index >= capacity()) { + break; + } else if (used(cursor.index)) { + cursor.key = keys.get(cursor.index); + cursor.value = values.get(cursor.index); + break; + } + } + cached = true; + } + return cursor.index < capacity(); + } + + @Override + public Cursor next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + cached = false; + return cursor; + } + + }; + } + + @Override + public boolean release() throws ElasticsearchException { + Releasables.release(keys, values); + return true; + } + + @Override + protected void resize(long capacity) { + keys = bigArrays.resize(keys, capacity); + values = bigArrays.resize(values, capacity); + } + + @Override + protected boolean used(long bucket) { + return values.get(bucket) != null; + } + + @Override + protected void removeAndAdd(long index) { + final long key = keys.get(index); + final T value = values.set(index, null); + --size; + final T removed = set(key, value); + assert removed == null; + } + + public static final class Cursor { + public long index; + public long key; + public T value; + } + +} diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 60ea74997cc..ff758ab5946 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -871,7 +871,7 @@ public class PercolatorService extends AbstractComponent { for (PercolateShardResponse shardResult : shardResults) { aggregationsList.add(shardResult.aggregations()); } - return InternalAggregations.reduce(aggregationsList, cacheRecycler); + return InternalAggregations.reduce(aggregationsList, bigArrays); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 6f43e9eced0..a2e61516659 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -18,12 +18,12 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -79,19 +79,19 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St protected static class ReduceContext { private final List aggregations; - private final CacheRecycler cacheRecycler; + private final BigArrays bigArrays; - public ReduceContext(List aggregations, CacheRecycler cacheRecycler) { + public ReduceContext(List aggregations, BigArrays bigArrays) { this.aggregations = aggregations; - this.cacheRecycler = cacheRecycler; + this.bigArrays = bigArrays; } public List aggregations() { return aggregations; } - public CacheRecycler cacheRecycler() { - return cacheRecycler; + public BigArrays bigArrays() { + return bigArrays; } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 17d93874eab..7b3226d6291 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -20,11 +20,11 @@ package org.elasticsearch.search.aggregations; import com.google.common.base.Function; import com.google.common.collect.*; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -118,7 +118,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl * @param aggregationsList A list of aggregation to reduce * @return The reduced addAggregation */ - public static InternalAggregations reduce(List aggregationsList, CacheRecycler cacheRecycler) { + public static InternalAggregations reduce(List aggregationsList, BigArrays bigArrays) { if (aggregationsList.isEmpty()) { return null; } @@ -143,7 +143,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl for (Map.Entry> entry : aggByName.entrySet()) { List aggregations = entry.getValue(); InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand - reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, cacheRecycler))); + reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays))); } InternalAggregations result = aggregationsList.get(0); result.reset(reducedAggregations); @@ -154,10 +154,10 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl * Reduces this aggregations, effectively propagates the reduce to all the sub aggregations * @param cacheRecycler */ - public void reduce(CacheRecycler cacheRecycler) { + public void reduce(BigArrays bigArrays) { for (int i = 0; i < aggregations.size(); i++) { InternalAggregation aggregation = aggregations.get(i); - aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), cacheRecycler))); + aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays))); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 638a98a916b..78a23baaea9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -66,7 +66,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0)); - reduced.aggregations.reduce(reduceContext.cacheRecycler()); + reduced.aggregations.reduce(reduceContext.bigArrays()); return reduced; } InternalSingleBucketAggregation reduced = null; @@ -79,7 +79,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio } subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations); } - reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.cacheRecycler()); + reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays()); return reduced; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index 98d1e11316d..6c03df1790e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -18,16 +18,15 @@ */ package org.elasticsearch.search.aggregations.bucket.geogrid; -import com.carrotsearch.hppc.LongObjectOpenHashMap; import org.apache.lucene.util.PriorityQueue; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.geo.GeoHashUtils; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; @@ -106,11 +105,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG return 0; } - public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { + public Bucket reduce(List buckets, BigArrays bigArrays) { if (buckets.size() == 1) { // we still need to reduce the sub aggs Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); return bucket; } Bucket reduced = null; @@ -123,7 +122,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); return reduced; } @@ -184,25 +183,25 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0); - grid.reduceAndTrimBuckets(reduceContext.cacheRecycler()); + grid.reduceAndTrimBuckets(reduceContext.bigArrays()); return grid; } InternalGeoHashGrid reduced = null; - Recycler.V>> buckets = null; + LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation; if (reduced == null) { reduced = grid; } if (buckets == null) { - buckets = reduceContext.cacheRecycler().longObjectMap(grid.buckets.size()); + buckets = new LongObjectPagedHashMap>(grid.buckets.size(), reduceContext.bigArrays()); } for (Bucket bucket : grid.buckets) { - List existingBuckets = buckets.v().get(bucket.geohashAsLong); + List existingBuckets = buckets.get(bucket.geohashAsLong); if (existingBuckets == null) { existingBuckets = new ArrayList(aggregations.size()); - buckets.v().put(bucket.geohashAsLong, existingBuckets); + buckets.put(bucket.geohashAsLong, existingBuckets); } existingBuckets.add(bucket); } @@ -214,15 +213,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? - final int size = Math.min(requiredSize, buckets.v().size()); + final int size = (int) Math.min(requiredSize, buckets.size()); BucketPriorityQueue ordered = new BucketPriorityQueue(size); - Object[] internalBuckets = buckets.v().values; - boolean[] states = buckets.v().allocated; - for (int i = 0; i < states.length; i++) { - if (states[i]) { - List sameCellBuckets = (List) internalBuckets[i]; - ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.cacheRecycler())); - } + for (LongObjectPagedHashMap.Cursor> cursor : buckets) { + List sameCellBuckets = cursor.value; + ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.bigArrays())); } buckets.release(); Bucket[] list = new Bucket[ordered.size()]; @@ -233,11 +228,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG return reduced; } - protected void reduceAndTrimBuckets(CacheRecycler cacheRecycler) { + protected void reduceAndTrimBuckets(BigArrays bigArrays) { if (requiredSize > buckets.size()) { // nothing to trim for (Bucket bucket : buckets) { - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); } return; } @@ -247,7 +242,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG if (trimmedBuckets.size() >= requiredSize) { break; } - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); trimmedBuckets.add(bucket); } buckets = trimmedBuckets; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 73eed06ba70..13c058b459e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -21,13 +21,13 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.google.common.collect.Lists; import org.apache.lucene.util.CollectionUtil; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.rounding.Rounding; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; @@ -100,11 +100,11 @@ public class InternalHistogram extends Inter return aggregations; } - B reduce(List buckets, CacheRecycler cacheRecycler) { + B reduce(List buckets, BigArrays bigArrays) { if (buckets.size() == 1) { // we only need to reduce the sub aggregations Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); return (B) bucket; } List aggregations = new ArrayList(buckets.size()); @@ -117,7 +117,7 @@ public class InternalHistogram extends Inter } aggregations.add((InternalAggregations) bucket.getAggregations()); } - reduced.aggregations = InternalAggregations.reduce(aggregations, cacheRecycler); + reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays); return (B) reduced; } } @@ -217,7 +217,7 @@ public class InternalHistogram extends Inter if (minDocCount == 1) { for (B bucket : histo.buckets) { - bucket.aggregations.reduce(reduceContext.cacheRecycler()); + bucket.aggregations.reduce(reduceContext.bigArrays()); } return histo; } @@ -233,7 +233,7 @@ public class InternalHistogram extends Inter // look ahead on the next bucket without advancing the iter // so we'll be able to insert elements at the right position B nextBucket = list.get(iter.nextIndex()); - nextBucket.aggregations.reduce(reduceContext.cacheRecycler()); + nextBucket.aggregations.reduce(reduceContext.bigArrays()); if (prevBucket != null) { long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.key); while (key != nextBucket.key) { @@ -249,7 +249,7 @@ public class InternalHistogram extends Inter if (bucket.getDocCount() < minDocCount) { iter.remove(); } else { - bucket.aggregations.reduce(reduceContext.cacheRecycler()); + bucket.aggregations.reduce(reduceContext.bigArrays()); } } } @@ -263,28 +263,25 @@ public class InternalHistogram extends Inter InternalHistogram reduced = (InternalHistogram) aggregations.get(0); - Recycler.V>> bucketsByKey = reduceContext.cacheRecycler().longObjectMap(-1); + LongObjectPagedHashMap> bucketsByKey = new LongObjectPagedHashMap>(reduceContext.bigArrays()); for (InternalAggregation aggregation : aggregations) { InternalHistogram histogram = (InternalHistogram) aggregation; for (B bucket : histogram.buckets) { - List bucketList = bucketsByKey.v().get(bucket.key); + List bucketList = bucketsByKey.get(bucket.key); if (bucketList == null) { - bucketList = new ArrayList(aggregations.size()); - bucketsByKey.v().put(bucket.key, bucketList); + bucketList = new ArrayList(aggregations.size()); + bucketsByKey.put(bucket.key, bucketList); } bucketList.add(bucket); } } - List reducedBuckets = new ArrayList(bucketsByKey.v().size()); - Object[] buckets = bucketsByKey.v().values; - boolean[] allocated = bucketsByKey.v().allocated; - for (int i = 0; i < allocated.length; i++) { - if (allocated[i]) { - B bucket = ((List) buckets[i]).get(0).reduce(((List) buckets[i]), reduceContext.cacheRecycler()); - if (bucket.getDocCount() >= minDocCount) { - reducedBuckets.add(bucket); - } + List reducedBuckets = new ArrayList((int) bucketsByKey.size()); + for (LongObjectPagedHashMap.Cursor> cursor : bucketsByKey) { + List sameTermBuckets = cursor.value; + B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); + if (bucket.getDocCount() >= minDocCount) { + reducedBuckets.add(bucket); } } bucketsByKey.release(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 9a7b1badef5..73b6abaa9db 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -19,11 +19,11 @@ package org.elasticsearch.search.aggregations.bucket.range; import com.google.common.collect.Lists; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.Aggregations; @@ -109,11 +109,11 @@ public class InternalRange extends InternalAggre return aggregations; } - Bucket reduce(List ranges, CacheRecycler cacheRecycler) { + Bucket reduce(List ranges, BigArrays bigArrays) { if (ranges.size() == 1) { // we stil need to call reduce on all the sub aggregations Bucket bucket = ranges.get(0); - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); return bucket; } Bucket reduced = null; @@ -126,7 +126,7 @@ public class InternalRange extends InternalAggre } aggregationsList.add(range.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); return reduced; } @@ -226,7 +226,7 @@ public class InternalRange extends InternalAggre if (aggregations.size() == 1) { InternalRange reduced = (InternalRange) aggregations.get(0); for (B bucket : reduced.ranges) { - bucket.aggregations.reduce(reduceContext.cacheRecycler()); + bucket.aggregations.reduce(reduceContext.bigArrays()); } return reduced; } @@ -259,7 +259,7 @@ public class InternalRange extends InternalAggre InternalRange reduced = (InternalRange) aggregations.get(0); int i = 0; for (List sameRangeList : rangesList) { - reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.cacheRecycler())); + reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays())); } return reduced; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index fef848b1d86..b941df09c67 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -18,12 +18,11 @@ */ package org.elasticsearch.search.aggregations.bucket.terms; -import com.carrotsearch.hppc.DoubleObjectOpenHashMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.DoubleObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -112,12 +111,12 @@ public class DoubleTerms extends InternalTerms { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.cacheRecycler()); + terms.trimExcessEntries(reduceContext.bigArrays()); return terms; } InternalTerms reduced = null; - Recycler.V>> buckets = null; + DoubleObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalTerms terms = (InternalTerms) aggregation; if (terms instanceof UnmappedTerms) { @@ -127,13 +126,13 @@ public class DoubleTerms extends InternalTerms { reduced = terms; } if (buckets == null) { - buckets = reduceContext.cacheRecycler().doubleObjectMap(terms.buckets.size()); + buckets = new DoubleObjectPagedHashMap>(terms.buckets.size(), reduceContext.bigArrays()); } for (Terms.Bucket bucket : terms.buckets) { - List existingBuckets = buckets.v().get(((Bucket) bucket).term); + List existingBuckets = buckets.get(((Bucket) bucket).term); if (existingBuckets == null) { existingBuckets = new ArrayList(aggregations.size()); - buckets.v().put(((Bucket) bucket).term, existingBuckets); + buckets.put(((Bucket) bucket).term, existingBuckets); } existingBuckets.add((Bucket) bucket); } @@ -145,17 +144,13 @@ public class DoubleTerms extends InternalTerms { } // TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ? - final int size = Math.min(requiredSize, buckets.v().size()); + final int size = (int) Math.min(requiredSize, buckets.size()); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); - boolean[] states = buckets.v().allocated; - Object[] internalBuckets = buckets.v().values; - for (int i = 0; i < states.length; i++) { - if (states[i]) { - List sameTermBuckets = (List) internalBuckets[i]; - final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); - if (b.getDocCount() >= minDocCount) { - ordered.insertWithOverflow(b); - } + for (DoubleObjectPagedHashMap.Cursor> cursor : buckets) { + List sameTermBuckets = cursor.value; + final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); + if (b.getDocCount() >= minDocCount) { + ordered.insertWithOverflow(b); } } buckets.release(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 529d564d65a..f4619eb8d87 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -20,9 +20,9 @@ package org.elasticsearch.search.aggregations.bucket.terms; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -58,10 +58,10 @@ public abstract class InternalTerms extends InternalAggregation implements Terms return aggregations; } - public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { + public Bucket reduce(List buckets, BigArrays bigArrays) { if (buckets.size() == 1) { Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(cacheRecycler); + bucket.aggregations.reduce(bigArrays); return bucket; } Bucket reduced = null; @@ -74,7 +74,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms } aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); return reduced; } } @@ -117,7 +117,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.cacheRecycler()); + terms.trimExcessEntries(reduceContext.bigArrays()); return terms; } @@ -154,7 +154,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); - final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); + final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); if (b.docCount >= minDocCount) { ordered.insertWithOverflow(b); } @@ -167,7 +167,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms return reduced; } - final void trimExcessEntries(CacheRecycler cacheRecycler) { + final void trimExcessEntries(BigArrays bigArrays) { final List newBuckets = Lists.newArrayList(); for (Bucket b : buckets) { if (newBuckets.size() >= requiredSize) { @@ -175,7 +175,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms } if (b.docCount >= minDocCount) { newBuckets.add(b); - b.aggregations.reduce(cacheRecycler); + b.aggregations.reduce(bigArrays); } } buckets = newBuckets; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 77e0923947d..40a4a27adb1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -18,13 +18,12 @@ */ package org.elasticsearch.search.aggregations.bucket.terms; -import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.google.common.primitives.Longs; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -109,12 +108,12 @@ public class LongTerms extends InternalTerms { List aggregations = reduceContext.aggregations(); if (aggregations.size() == 1) { InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.cacheRecycler()); + terms.trimExcessEntries(reduceContext.bigArrays()); return terms; } InternalTerms reduced = null; - Recycler.V>> buckets = null; + LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalTerms terms = (InternalTerms) aggregation; if (terms instanceof UnmappedTerms) { @@ -124,13 +123,13 @@ public class LongTerms extends InternalTerms { reduced = terms; } if (buckets == null) { - buckets = reduceContext.cacheRecycler().longObjectMap(terms.buckets.size()); + buckets = new LongObjectPagedHashMap>(terms.buckets.size(), reduceContext.bigArrays()); } for (Terms.Bucket bucket : terms.buckets) { - List existingBuckets = buckets.v().get(((Bucket) bucket).term); + List existingBuckets = buckets.get(((Bucket) bucket).term); if (existingBuckets == null) { existingBuckets = new ArrayList(aggregations.size()); - buckets.v().put(((Bucket) bucket).term, existingBuckets); + buckets.put(((Bucket) bucket).term, existingBuckets); } existingBuckets.add((Bucket) bucket); } @@ -142,17 +141,13 @@ public class LongTerms extends InternalTerms { } // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? - final int size = Math.min(requiredSize, buckets.v().size()); + final int size = (int) Math.min(requiredSize, buckets.size()); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); - Object[] internalBuckets = buckets.v().values; - boolean[] states = buckets.v().allocated; - for (int i = 0; i < states.length; i++) { - if (states[i]) { - List sameTermBuckets = (List) internalBuckets[i]; - final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); - if (b.getDocCount() >= minDocCount) { - ordered.insertWithOverflow(b); - } + for (LongObjectPagedHashMap.Cursor> cursor : buckets) { + List sameTermBuckets = cursor.value; + final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); + if (b.getDocCount() >= minDocCount) { + ordered.insertWithOverflow(b); } } buckets.release(); diff --git a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index 8289c514323..325246cdaa8 100644 --- a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -26,10 +26,11 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.*; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.cache.recycler.CacheRecycler; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.collect.HppcMaps; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -67,12 +68,14 @@ public class SearchPhaseController extends AbstractComponent { public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; private final CacheRecycler cacheRecycler; + private final BigArrays bigArrays; private final boolean optimizeSingleShard; @Inject - public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler) { + public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays) { super(settings); this.cacheRecycler = cacheRecycler; + this.bigArrays = bigArrays; this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true); } @@ -431,7 +434,7 @@ public class SearchPhaseController extends AbstractComponent { for (AtomicArray.Entry entry : queryResults) { aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); } - aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); } } diff --git a/src/test/java/org/elasticsearch/common/util/DoubleObjectHashMapTests.java b/src/test/java/org/elasticsearch/common/util/DoubleObjectHashMapTests.java new file mode 100644 index 00000000000..a7a6b7d64d6 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/DoubleObjectHashMapTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import com.carrotsearch.hppc.DoubleObjectOpenHashMap; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +public class DoubleObjectHashMapTests extends ElasticsearchTestCase { + + @Test + public void duel() { + final DoubleObjectOpenHashMap map1 = new DoubleObjectOpenHashMap(); + final DoubleObjectPagedHashMap map2 = new DoubleObjectPagedHashMap(randomInt(42), 0.6f + randomFloat() * 0.39f, BigArraysTests.randombigArrays()); + final int maxKey = randomIntBetween(1, 10000); + final int iters = atLeast(10000); + for (int i = 0; i < iters; ++i) { + final boolean put = randomBoolean(); + final int iters2 = randomIntBetween(1, 100); + for (int j = 0; j < iters2; ++j) { + final double key = randomInt(maxKey); + if (put) { + final Object value = new Object(); + assertSame(map1.put(key, value), map2.put(key, value)); + } else { + assertSame(map1.remove(key), map2.remove(key)); + } + assertEquals(map1.size(), map2.size()); + } + } + for (int i = 0; i <= maxKey; ++i) { + assertSame(map1.get(i), map2.get(i)); + } + final DoubleObjectOpenHashMap copy = new DoubleObjectOpenHashMap(); + for (DoubleObjectPagedHashMap.Cursor cursor : map2) { + copy.put(cursor.key, cursor.value); + } + map2.release(); + assertEquals(map1, copy); + } + +} diff --git a/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java b/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java new file mode 100644 index 00000000000..68a2de534d4 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import com.carrotsearch.hppc.LongObjectOpenHashMap; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +public class LongObjectHashMapTests extends ElasticsearchTestCase { + + @Test + public void duel() { + final LongObjectOpenHashMap map1 = new LongObjectOpenHashMap(); + final LongObjectPagedHashMap map2 = new LongObjectPagedHashMap(randomInt(42), 0.6f + randomFloat() * 0.39f, BigArraysTests.randombigArrays()); + final int maxKey = randomIntBetween(1, 10000); + final int iters = atLeast(10000); + for (int i = 0; i < iters; ++i) { + final boolean put = randomBoolean(); + final int iters2 = randomIntBetween(1, 100); + for (int j = 0; j < iters2; ++j) { + final long key = randomInt(maxKey); + if (put) { + final Object value = new Object(); + assertSame(map1.put(key, value), map2.put(key, value)); + } else { + assertSame(map1.remove(key), map2.remove(key)); + } + assertEquals(map1.size(), map2.size()); + } + } + for (int i = 0; i <= maxKey; ++i) { + assertSame(map1.get(i), map2.get(i)); + } + final LongObjectOpenHashMap copy = new LongObjectOpenHashMap(); + for (LongObjectPagedHashMap.Cursor cursor : map2) { + copy.put(cursor.key, cursor.value); + } + map2.release(); + assertEquals(map1, copy); + } + +}