Make aggregations CacheRecycler-free.

Aggregations were still using CacheRecycler on the reduce phase. They are now
using page-based recycling for both the aggregation phase and the reduce phase.

Close #4929
This commit is contained in:
Adrien Grand 2014-03-13 13:14:44 +01:00
parent 8a1e77c50c
commit 40d67c7e09
19 changed files with 759 additions and 203 deletions

View File

@ -19,61 +19,20 @@
package org.elasticsearch.common.util; package org.elasticsearch.common.util;
import com.google.common.base.Preconditions;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
/** /**
* Base implementation for {@link BytesRefHash} and {@link LongHash}. * Base implementation for {@link BytesRefHash} and {@link LongHash}, or any class that
* needs to map values to dense ords. This class is not thread-safe.
*/ */
// IDs are internally stored as id + 1 so that 0 encodes for an empty slot // IDs are internally stored as id + 1 so that 0 encodes for an empty slot
abstract class AbstractHash implements Releasable { abstract class AbstractHash extends AbstractPagedHashMap {
// Open addressing typically requires having smaller load factors compared to linked lists because
// collisions may result into worse lookup performance.
static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f;
final BigArrays bigArrays;
final float maxLoadFactor;
long size, maxSize;
LongArray ids; LongArray ids;
long mask;
AbstractHash(long capacity, float maxLoadFactor, BigArrays bigArrays) { AbstractHash(long capacity, float maxLoadFactor, BigArrays bigArrays) {
Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0"); super(capacity, maxLoadFactor, bigArrays);
Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1"); ids = bigArrays.newLongArray(capacity(), true);
this.bigArrays = bigArrays;
this.maxLoadFactor = maxLoadFactor;
long buckets = 1L + (long) (capacity / maxLoadFactor);
buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two
assert buckets == Long.highestOneBit(buckets);
maxSize = (long) (buckets * maxLoadFactor);
assert maxSize >= capacity;
size = 0;
ids = bigArrays.newLongArray(buckets, true);
mask = buckets - 1;
}
/**
* Return the number of allocated slots to store this hash table.
*/
public long capacity() {
return ids.size();
}
/**
* Return the number of longs in this hash table.
*/
public long size() {
return size;
}
static long slot(long hash, long mask) {
return hash & mask;
}
static long nextSlot(long curSlot, long mask) {
return (curSlot + 1) & mask; // linear probing
} }
/** /**
@ -87,46 +46,13 @@ abstract class AbstractHash implements Releasable {
return ids.set(index, id + 1) - 1; return ids.set(index, id + 1) - 1;
} }
/** Resize keys to the given capacity. */ protected void resize(long capacity) {
protected void resizeKeys(long capacity) {} ids = bigArrays.resize(ids, capacity);
}
/** Remove key at the given index and */ @Override
protected abstract void removeAndAdd(long index, long id); protected boolean used(long bucket) {
return id(bucket) >= 0;
protected final void grow() {
// The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes
// the re-mapping of keys to slots a bit more tricky.
assert size == maxSize;
final long prevSize = size;
final long buckets = capacity();
// Resize arrays
final long newBuckets = buckets << 1;
assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2
resizeKeys(newBuckets);
ids = bigArrays.resize(ids, newBuckets);
mask = newBuckets - 1;
// First let's remap in-place: most data will be put in its final position directly
for (long i = 0; i < buckets; ++i) {
final long id = id(i, -1);
if (id != -1) {
removeAndAdd(i, id);
}
}
// The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that
// is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision
// resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly
// allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early.
for (long i = buckets; i < newBuckets; ++i) {
final long id = id(i, -1);
if (id != -1) {
removeAndAdd(i, id); // add it back
} else {
break;
}
}
assert size == prevSize;
maxSize = (long) (newBuckets * maxLoadFactor);
assert size < maxSize;
} }
@Override @Override

View File

@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.carrotsearch.hppc.hash.MurmurHash3;
import com.google.common.base.Preconditions;
import org.elasticsearch.common.lease.Releasable;
/**
* Base implementation for a hash table that is paged, recycles arrays and grows in-place.
*/
abstract class AbstractPagedHashMap implements Releasable {
// Open addressing typically requires having smaller load factors compared to linked lists because
// collisions may result into worse lookup performance.
static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f;
static long hash(long value) {
// Don't use the value directly. Under some cases eg dates, it could be that the low bits don't carry much value and we would like
// all bits of the hash to carry as much value
return MurmurHash3.hash(value);
}
static long hash(double value) {
return hash(Double.doubleToLongBits(value));
}
final BigArrays bigArrays;
final float maxLoadFactor;
long size, maxSize;
long mask;
AbstractPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) {
Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0");
Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1");
this.bigArrays = bigArrays;
this.maxLoadFactor = maxLoadFactor;
long buckets = 1L + (long) (capacity / maxLoadFactor);
buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two
assert buckets == Long.highestOneBit(buckets);
maxSize = (long) (buckets * maxLoadFactor);
assert maxSize >= capacity;
size = 0;
mask = buckets - 1;
}
/**
* Return the number of allocated slots to store this hash table.
*/
public long capacity() {
return mask + 1;
}
/**
* Return the number of longs in this hash table.
*/
public long size() {
return size;
}
static long slot(long hash, long mask) {
return hash & mask;
}
static long nextSlot(long curSlot, long mask) {
return (curSlot + 1) & mask; // linear probing
}
/** Resize to the given capacity. */
protected abstract void resize(long capacity);
protected abstract boolean used(long bucket);
/** Remove the entry at the given index and add it back */
protected abstract void removeAndAdd(long index);
protected final void grow() {
// The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes
// the re-mapping of keys to slots a bit more tricky.
assert size == maxSize;
final long prevSize = size;
final long buckets = capacity();
// Resize arrays
final long newBuckets = buckets << 1;
assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2
resize(newBuckets);
mask = newBuckets - 1;
// First let's remap in-place: most data will be put in its final position directly
for (long i = 0; i < buckets; ++i) {
if (used(i)) {
removeAndAdd(i);
}
}
// The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that
// is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision
// resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly
// allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early.
for (long i = buckets; i < newBuckets; ++i) {
if (used(i)) {
removeAndAdd(i); // add it back
} else {
break;
}
}
assert size == prevSize;
maxSize = (long) (newBuckets * maxLoadFactor);
assert size < maxSize;
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.lease.Releasables;
* BytesRef values to ids. Collisions are resolved with open addressing and linear * BytesRef values to ids. Collisions are resolved with open addressing and linear
* probing, growth is smooth thanks to {@link BigArrays}, hashes are cached for faster * probing, growth is smooth thanks to {@link BigArrays}, hashes are cached for faster
* re-hashing and capacity is always a multiple of 2 for faster identification of buckets. * re-hashing and capacity is always a multiple of 2 for faster identification of buckets.
* This class is not thread-safe.
*/ */
public final class BytesRefHash extends AbstractHash { public final class BytesRefHash extends AbstractHash {
@ -150,7 +151,9 @@ public final class BytesRefHash extends AbstractHash {
} }
@Override @Override
protected void removeAndAdd(long index, long id) { protected void removeAndAdd(long index) {
final long id = id(index, -1);
assert id >= 0;
final int code = hashes.get(id); final int code = hashes.get(id);
reset(code, id); reset(code, id);
} }

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasables;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* A hash table from native doubles to objects. This implementation resolves collisions
* using open-addressing and does not support null values. This class is not thread-safe.
*/
public class DoubleObjectPagedHashMap<T> extends AbstractPagedHashMap implements Iterable<DoubleObjectPagedHashMap.Cursor<T>> {
private DoubleArray keys;
private ObjectArray<T> values;
public DoubleObjectPagedHashMap(BigArrays bigArrays) {
this(16, bigArrays);
}
public DoubleObjectPagedHashMap(long capacity, BigArrays bigArrays) {
this(capacity, DEFAULT_MAX_LOAD_FACTOR, bigArrays);
}
public DoubleObjectPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) {
super(capacity, maxLoadFactor, bigArrays);
keys = bigArrays.newDoubleArray(capacity(), false);
values = bigArrays.newObjectArray(capacity());
}
/**
* Get the value that is associated with <code>key</code> or null if <code>key</code>
* was not present in the hash table.
*/
public T get(double key) {
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T value = values.get(i);
if (value == null) {
return null;
} else if (keys.get(i) == key) {
return value;
}
}
}
/**
* Put this new (key, value) pair into this hash table and return the value
* that was previously associated with <code>key</code> or null in case of
* an insertion.
*/
public T put(double key, T value) {
if (size >= maxSize) {
assert size == maxSize;
grow();
}
assert size < maxSize;
return set(key, value);
}
/**
* Remove the entry which has this key in the hash table and return the
* associated value or null if there was no entry associated with this key.
*/
public T remove(double key) {
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T previous = values.set(i, null);
if (previous == null) {
return null;
} else if (keys.get(i) == key) {
--size;
for (long j = nextSlot(i, mask); used(j); j = nextSlot(j, mask)) {
removeAndAdd(j);
}
return previous;
} else {
// repair and continue
values.set(i, previous);
}
}
}
private T set(double key, T value) {
if (value == null) {
throw new IllegalArgumentException("Null values are not supported");
}
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T previous = values.set(i, value);
if (previous == null) {
// slot was free
keys.set(i, key);
++size;
return null;
} else if (key == keys.get(i)) {
// we just updated the value
return previous;
} else {
// not the right key, repair and continue
values.set(i, previous);
}
}
}
@Override
public Iterator<Cursor<T>> iterator() {
return new UnmodifiableIterator<Cursor<T>>() {
boolean cached;
final Cursor<T> cursor;
{
cursor = new Cursor<T>();
cursor.index = -1;
cached = false;
}
@Override
public boolean hasNext() {
if (!cached) {
while (true) {
++cursor.index;
if (cursor.index >= capacity()) {
break;
} else if (used(cursor.index)) {
cursor.key = keys.get(cursor.index);
cursor.value = values.get(cursor.index);
break;
}
}
cached = true;
}
return cursor.index < capacity();
}
@Override
public Cursor<T> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
cached = false;
return cursor;
}
};
}
@Override
public boolean release() throws ElasticsearchException {
Releasables.release(keys, values);
return true;
}
@Override
protected void resize(long capacity) {
keys = bigArrays.resize(keys, capacity);
values = bigArrays.resize(values, capacity);
}
@Override
protected boolean used(long bucket) {
return values.get(bucket) != null;
}
@Override
protected void removeAndAdd(long index) {
final double key = keys.get(index);
final T value = values.set(index, null);
--size;
final T removed = set(key, value);
assert removed == null;
}
public static final class Cursor<T> {
public long index;
public double key;
public T value;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.util; package org.elasticsearch.common.util;
import com.carrotsearch.hppc.hash.MurmurHash3;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
/** /**
@ -27,6 +26,7 @@ import org.elasticsearch.common.lease.Releasables;
* long values to ids. Collisions are resolved with open addressing and linear * long values to ids. Collisions are resolved with open addressing and linear
* probing, growth is smooth thanks to {@link BigArrays} and capacity is always * probing, growth is smooth thanks to {@link BigArrays} and capacity is always
* a multiple of 2 for faster identification of buckets. * a multiple of 2 for faster identification of buckets.
* This class is not thread-safe.
*/ */
// IDs are internally stored as id + 1 so that 0 encodes for an empty slot // IDs are internally stored as id + 1 so that 0 encodes for an empty slot
public final class LongHash extends AbstractHash { public final class LongHash extends AbstractHash {
@ -44,12 +44,6 @@ public final class LongHash extends AbstractHash {
keys = bigArrays.newLongArray(capacity(), false); keys = bigArrays.newLongArray(capacity(), false);
} }
private static long hash(long value) {
// Don't use the value directly. Under some cases eg dates, it could be that the low bits don't carry much value and we would like
// all bits of the hash to carry as much value
return MurmurHash3.hash(value);
}
/** /**
* Return the key at <code>0 &lte; index &lte; capacity()</code>. The result is undefined if the slot is unused. * Return the key at <code>0 &lte; index &lte; capacity()</code>. The result is undefined if the slot is unused.
*/ */
@ -114,12 +108,15 @@ public final class LongHash extends AbstractHash {
} }
@Override @Override
protected void resizeKeys(long capacity) { protected void resize(long capacity) {
super.resize(capacity);
keys = bigArrays.resize(keys, capacity); keys = bigArrays.resize(keys, capacity);
} }
@Override @Override
protected void removeAndAdd(long index, long id) { protected void removeAndAdd(long index) {
final long id = id(index, -1);
assert id >= 0;
final long key = keys.set(index, 0); final long key = keys.set(index, 0);
reset(key, id); reset(key, id);
} }

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasables;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* A hash table from native longs to objects. This implementation resolves collisions
* using open-addressing and does not support null values. This class is not thread-safe.
*/
public class LongObjectPagedHashMap<T> extends AbstractPagedHashMap implements Iterable<LongObjectPagedHashMap.Cursor<T>> {
private LongArray keys;
private ObjectArray<T> values;
public LongObjectPagedHashMap(BigArrays bigArrays) {
this(16, bigArrays);
}
public LongObjectPagedHashMap(long capacity, BigArrays bigArrays) {
this(capacity, DEFAULT_MAX_LOAD_FACTOR, bigArrays);
}
public LongObjectPagedHashMap(long capacity, float maxLoadFactor, BigArrays bigArrays) {
super(capacity, maxLoadFactor, bigArrays);
keys = bigArrays.newLongArray(capacity(), false);
values = bigArrays.newObjectArray(capacity());
}
/**
* Get the value that is associated with <code>key</code> or null if <code>key</code>
* was not present in the hash table.
*/
public T get(long key) {
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T value = values.get(i);
if (value == null) {
return null;
} else if (keys.get(i) == key) {
return value;
}
}
}
/**
* Put this new (key, value) pair into this hash table and return the value
* that was previously associated with <code>key</code> or null in case of
* an insertion.
*/
public T put(long key, T value) {
if (size >= maxSize) {
assert size == maxSize;
grow();
}
assert size < maxSize;
return set(key, value);
}
/**
* Remove the entry which has this key in the hash table and return the
* associated value or null if there was no entry associated with this key.
*/
public T remove(long key) {
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T previous = values.set(i, null);
if (previous == null) {
return null;
} else if (keys.get(i) == key) {
--size;
for (long j = nextSlot(i, mask); used(j); j = nextSlot(j, mask)) {
removeAndAdd(j);
}
return previous;
} else {
// repair and continue
values.set(i, previous);
}
}
}
private T set(long key, T value) {
if (value == null) {
throw new IllegalArgumentException("Null values are not supported");
}
for (long i = slot(hash(key), mask); ; i = nextSlot(i, mask)) {
final T previous = values.set(i, value);
if (previous == null) {
// slot was free
keys.set(i, key);
++size;
return null;
} else if (key == keys.get(i)) {
// we just updated the value
return previous;
} else {
// not the right key, repair and continue
values.set(i, previous);
}
}
}
@Override
public Iterator<Cursor<T>> iterator() {
return new UnmodifiableIterator<Cursor<T>>() {
boolean cached;
final Cursor<T> cursor;
{
cursor = new Cursor<T>();
cursor.index = -1;
cached = false;
}
@Override
public boolean hasNext() {
if (!cached) {
while (true) {
++cursor.index;
if (cursor.index >= capacity()) {
break;
} else if (used(cursor.index)) {
cursor.key = keys.get(cursor.index);
cursor.value = values.get(cursor.index);
break;
}
}
cached = true;
}
return cursor.index < capacity();
}
@Override
public Cursor<T> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
cached = false;
return cursor;
}
};
}
@Override
public boolean release() throws ElasticsearchException {
Releasables.release(keys, values);
return true;
}
@Override
protected void resize(long capacity) {
keys = bigArrays.resize(keys, capacity);
values = bigArrays.resize(values, capacity);
}
@Override
protected boolean used(long bucket) {
return values.get(bucket) != null;
}
@Override
protected void removeAndAdd(long index) {
final long key = keys.get(index);
final T value = values.set(index, null);
--size;
final T removed = set(key, value);
assert removed == null;
}
public static final class Cursor<T> {
public long index;
public long key;
public T value;
}
}

View File

@ -871,7 +871,7 @@ public class PercolatorService extends AbstractComponent {
for (PercolateShardResponse shardResult : shardResults) { for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations()); aggregationsList.add(shardResult.aggregations());
} }
return InternalAggregations.reduce(aggregationsList, cacheRecycler); return InternalAggregations.reduce(aggregationsList, bigArrays);
} }
} }

View File

@ -18,12 +18,12 @@
*/ */
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -79,19 +79,19 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
protected static class ReduceContext { protected static class ReduceContext {
private final List<InternalAggregation> aggregations; private final List<InternalAggregation> aggregations;
private final CacheRecycler cacheRecycler; private final BigArrays bigArrays;
public ReduceContext(List<InternalAggregation> aggregations, CacheRecycler cacheRecycler) { public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays) {
this.aggregations = aggregations; this.aggregations = aggregations;
this.cacheRecycler = cacheRecycler; this.bigArrays = bigArrays;
} }
public List<InternalAggregation> aggregations() { public List<InternalAggregation> aggregations() {
return aggregations; return aggregations;
} }
public CacheRecycler cacheRecycler() { public BigArrays bigArrays() {
return cacheRecycler; return bigArrays;
} }
} }

View File

@ -20,11 +20,11 @@ package org.elasticsearch.search.aggregations;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.*; import com.google.common.collect.*;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -118,7 +118,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
* @param aggregationsList A list of aggregation to reduce * @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation * @return The reduced addAggregation
*/ */
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, CacheRecycler cacheRecycler) { public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, BigArrays bigArrays) {
if (aggregationsList.isEmpty()) { if (aggregationsList.isEmpty()) {
return null; return null;
} }
@ -143,7 +143,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) { for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue(); List<InternalAggregation> aggregations = entry.getValue();
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, cacheRecycler))); reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
} }
InternalAggregations result = aggregationsList.get(0); InternalAggregations result = aggregationsList.get(0);
result.reset(reducedAggregations); result.reset(reducedAggregations);
@ -154,10 +154,10 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations * Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
* @param cacheRecycler * @param cacheRecycler
*/ */
public void reduce(CacheRecycler cacheRecycler) { public void reduce(BigArrays bigArrays) {
for (int i = 0; i < aggregations.size(); i++) { for (int i = 0; i < aggregations.size(); i++) {
InternalAggregation aggregation = aggregations.get(i); InternalAggregation aggregation = aggregations.get(i);
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), cacheRecycler))); aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays)));
} }
} }

View File

@ -66,7 +66,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0)); InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0));
reduced.aggregations.reduce(reduceContext.cacheRecycler()); reduced.aggregations.reduce(reduceContext.bigArrays());
return reduced; return reduced;
} }
InternalSingleBucketAggregation reduced = null; InternalSingleBucketAggregation reduced = null;
@ -79,7 +79,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
} }
subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations); subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
} }
reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.cacheRecycler()); reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
return reduced; return reduced;
} }

View File

@ -18,16 +18,15 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.geogrid; package org.elasticsearch.search.aggregations.bucket.geogrid;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import org.apache.lucene.util.PriorityQueue; import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.geo.GeoHashUtils; import org.elasticsearch.common.geo.GeoHashUtils;
import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
@ -106,11 +105,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
return 0; return 0;
} }
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) { public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) { if (buckets.size() == 1) {
// we still need to reduce the sub aggs // we still need to reduce the sub aggs
Bucket bucket = buckets.get(0); Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
return bucket; return bucket;
} }
Bucket reduced = null; Bucket reduced = null;
@ -123,7 +122,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
} }
aggregationsList.add(bucket.aggregations); aggregationsList.add(bucket.aggregations);
} }
reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced; return reduced;
} }
@ -184,25 +183,25 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0); InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
grid.reduceAndTrimBuckets(reduceContext.cacheRecycler()); grid.reduceAndTrimBuckets(reduceContext.bigArrays());
return grid; return grid;
} }
InternalGeoHashGrid reduced = null; InternalGeoHashGrid reduced = null;
Recycler.V<LongObjectOpenHashMap<List<Bucket>>> buckets = null; LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation; InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
if (reduced == null) { if (reduced == null) {
reduced = grid; reduced = grid;
} }
if (buckets == null) { if (buckets == null) {
buckets = reduceContext.cacheRecycler().longObjectMap(grid.buckets.size()); buckets = new LongObjectPagedHashMap<List<Bucket>>(grid.buckets.size(), reduceContext.bigArrays());
} }
for (Bucket bucket : grid.buckets) { for (Bucket bucket : grid.buckets) {
List<Bucket> existingBuckets = buckets.v().get(bucket.geohashAsLong); List<Bucket> existingBuckets = buckets.get(bucket.geohashAsLong);
if (existingBuckets == null) { if (existingBuckets == null) {
existingBuckets = new ArrayList<Bucket>(aggregations.size()); existingBuckets = new ArrayList<Bucket>(aggregations.size());
buckets.v().put(bucket.geohashAsLong, existingBuckets); buckets.put(bucket.geohashAsLong, existingBuckets);
} }
existingBuckets.add(bucket); existingBuckets.add(bucket);
} }
@ -214,15 +213,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
} }
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
final int size = Math.min(requiredSize, buckets.v().size()); final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size); BucketPriorityQueue ordered = new BucketPriorityQueue(size);
Object[] internalBuckets = buckets.v().values; for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
boolean[] states = buckets.v().allocated; List<Bucket> sameCellBuckets = cursor.value;
for (int i = 0; i < states.length; i++) { ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.bigArrays()));
if (states[i]) {
List<Bucket> sameCellBuckets = (List<Bucket>) internalBuckets[i];
ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.cacheRecycler()));
}
} }
buckets.release(); buckets.release();
Bucket[] list = new Bucket[ordered.size()]; Bucket[] list = new Bucket[ordered.size()];
@ -233,11 +228,11 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
return reduced; return reduced;
} }
protected void reduceAndTrimBuckets(CacheRecycler cacheRecycler) { protected void reduceAndTrimBuckets(BigArrays bigArrays) {
if (requiredSize > buckets.size()) { // nothing to trim if (requiredSize > buckets.size()) { // nothing to trim
for (Bucket bucket : buckets) { for (Bucket bucket : buckets) {
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
} }
return; return;
} }
@ -247,7 +242,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
if (trimmedBuckets.size() >= requiredSize) { if (trimmedBuckets.size() >= requiredSize) {
break; break;
} }
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
trimmedBuckets.add(bucket); trimmedBuckets.add(bucket);
} }
buckets = trimmedBuckets; buckets = trimmedBuckets;

View File

@ -21,13 +21,13 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.carrotsearch.hppc.LongObjectOpenHashMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.rounding.Rounding; import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
@ -100,11 +100,11 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return aggregations; return aggregations;
} }
<B extends Bucket> B reduce(List<B> buckets, CacheRecycler cacheRecycler) { <B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) { if (buckets.size() == 1) {
// we only need to reduce the sub aggregations // we only need to reduce the sub aggregations
Bucket bucket = buckets.get(0); Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
return (B) bucket; return (B) bucket;
} }
List<InternalAggregations> aggregations = new ArrayList<InternalAggregations>(buckets.size()); List<InternalAggregations> aggregations = new ArrayList<InternalAggregations>(buckets.size());
@ -117,7 +117,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
} }
aggregations.add((InternalAggregations) bucket.getAggregations()); aggregations.add((InternalAggregations) bucket.getAggregations());
} }
reduced.aggregations = InternalAggregations.reduce(aggregations, cacheRecycler); reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays);
return (B) reduced; return (B) reduced;
} }
} }
@ -217,7 +217,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
if (minDocCount == 1) { if (minDocCount == 1) {
for (B bucket : histo.buckets) { for (B bucket : histo.buckets) {
bucket.aggregations.reduce(reduceContext.cacheRecycler()); bucket.aggregations.reduce(reduceContext.bigArrays());
} }
return histo; return histo;
} }
@ -233,7 +233,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
// look ahead on the next bucket without advancing the iter // look ahead on the next bucket without advancing the iter
// so we'll be able to insert elements at the right position // so we'll be able to insert elements at the right position
B nextBucket = list.get(iter.nextIndex()); B nextBucket = list.get(iter.nextIndex());
nextBucket.aggregations.reduce(reduceContext.cacheRecycler()); nextBucket.aggregations.reduce(reduceContext.bigArrays());
if (prevBucket != null) { if (prevBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.key); long key = emptyBucketInfo.rounding.nextRoundingValue(prevBucket.key);
while (key != nextBucket.key) { while (key != nextBucket.key) {
@ -249,7 +249,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
if (bucket.getDocCount() < minDocCount) { if (bucket.getDocCount() < minDocCount) {
iter.remove(); iter.remove();
} else { } else {
bucket.aggregations.reduce(reduceContext.cacheRecycler()); bucket.aggregations.reduce(reduceContext.bigArrays());
} }
} }
} }
@ -263,28 +263,25 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
InternalHistogram reduced = (InternalHistogram) aggregations.get(0); InternalHistogram reduced = (InternalHistogram) aggregations.get(0);
Recycler.V<LongObjectOpenHashMap<List<Histogram.Bucket>>> bucketsByKey = reduceContext.cacheRecycler().longObjectMap(-1); LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<List<B>>(reduceContext.bigArrays());
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
InternalHistogram<B> histogram = (InternalHistogram) aggregation; InternalHistogram<B> histogram = (InternalHistogram) aggregation;
for (B bucket : histogram.buckets) { for (B bucket : histogram.buckets) {
List<Histogram.Bucket> bucketList = bucketsByKey.v().get(bucket.key); List<B> bucketList = bucketsByKey.get(bucket.key);
if (bucketList == null) { if (bucketList == null) {
bucketList = new ArrayList<Histogram.Bucket>(aggregations.size()); bucketList = new ArrayList<B>(aggregations.size());
bucketsByKey.v().put(bucket.key, bucketList); bucketsByKey.put(bucket.key, bucketList);
} }
bucketList.add(bucket); bucketList.add(bucket);
} }
} }
List<B> reducedBuckets = new ArrayList<B>(bucketsByKey.v().size()); List<B> reducedBuckets = new ArrayList<B>((int) bucketsByKey.size());
Object[] buckets = bucketsByKey.v().values; for (LongObjectPagedHashMap.Cursor<List<B>> cursor : bucketsByKey) {
boolean[] allocated = bucketsByKey.v().allocated; List<B> sameTermBuckets = cursor.value;
for (int i = 0; i < allocated.length; i++) { B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (allocated[i]) { if (bucket.getDocCount() >= minDocCount) {
B bucket = ((List<B>) buckets[i]).get(0).reduce(((List<B>) buckets[i]), reduceContext.cacheRecycler()); reducedBuckets.add(bucket);
if (bucket.getDocCount() >= minDocCount) {
reducedBuckets.add(bucket);
}
} }
} }
bucketsByKey.release(); bucketsByKey.release();

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.aggregations.bucket.range; package org.elasticsearch.search.aggregations.bucket.range;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
@ -109,11 +109,11 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
return aggregations; return aggregations;
} }
Bucket reduce(List<Bucket> ranges, CacheRecycler cacheRecycler) { Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
if (ranges.size() == 1) { if (ranges.size() == 1) {
// we stil need to call reduce on all the sub aggregations // we stil need to call reduce on all the sub aggregations
Bucket bucket = ranges.get(0); Bucket bucket = ranges.get(0);
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
return bucket; return bucket;
} }
Bucket reduced = null; Bucket reduced = null;
@ -126,7 +126,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
} }
aggregationsList.add(range.aggregations); aggregationsList.add(range.aggregations);
} }
reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced; return reduced;
} }
@ -226,7 +226,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalRange<B> reduced = (InternalRange<B>) aggregations.get(0); InternalRange<B> reduced = (InternalRange<B>) aggregations.get(0);
for (B bucket : reduced.ranges) { for (B bucket : reduced.ranges) {
bucket.aggregations.reduce(reduceContext.cacheRecycler()); bucket.aggregations.reduce(reduceContext.bigArrays());
} }
return reduced; return reduced;
} }
@ -259,7 +259,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
InternalRange reduced = (InternalRange) aggregations.get(0); InternalRange reduced = (InternalRange) aggregations.get(0);
int i = 0; int i = 0;
for (List<Bucket> sameRangeList : rangesList) { for (List<Bucket> sameRangeList : rangesList) {
reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.cacheRecycler())); reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays()));
} }
return reduced; return reduced;
} }

View File

@ -18,12 +18,11 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.terms; package org.elasticsearch.search.aggregations.bucket.terms;
import com.carrotsearch.hppc.DoubleObjectOpenHashMap;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.DoubleObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
@ -112,12 +111,12 @@ public class DoubleTerms extends InternalTerms {
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0); InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.cacheRecycler()); terms.trimExcessEntries(reduceContext.bigArrays());
return terms; return terms;
} }
InternalTerms reduced = null; InternalTerms reduced = null;
Recycler.V<DoubleObjectOpenHashMap<List<Bucket>>> buckets = null; DoubleObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation; InternalTerms terms = (InternalTerms) aggregation;
if (terms instanceof UnmappedTerms) { if (terms instanceof UnmappedTerms) {
@ -127,13 +126,13 @@ public class DoubleTerms extends InternalTerms {
reduced = terms; reduced = terms;
} }
if (buckets == null) { if (buckets == null) {
buckets = reduceContext.cacheRecycler().doubleObjectMap(terms.buckets.size()); buckets = new DoubleObjectPagedHashMap<List<Bucket>>(terms.buckets.size(), reduceContext.bigArrays());
} }
for (Terms.Bucket bucket : terms.buckets) { for (Terms.Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.v().get(((Bucket) bucket).term); List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
if (existingBuckets == null) { if (existingBuckets == null) {
existingBuckets = new ArrayList<Bucket>(aggregations.size()); existingBuckets = new ArrayList<Bucket>(aggregations.size());
buckets.v().put(((Bucket) bucket).term, existingBuckets); buckets.put(((Bucket) bucket).term, existingBuckets);
} }
existingBuckets.add((Bucket) bucket); existingBuckets.add((Bucket) bucket);
} }
@ -145,17 +144,13 @@ public class DoubleTerms extends InternalTerms {
} }
// TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ? // TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ?
final int size = Math.min(requiredSize, buckets.v().size()); final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
boolean[] states = buckets.v().allocated; for (DoubleObjectPagedHashMap.Cursor<List<DoubleTerms.Bucket>> cursor : buckets) {
Object[] internalBuckets = buckets.v().values; List<DoubleTerms.Bucket> sameTermBuckets = cursor.value;
for (int i = 0; i < states.length; i++) { final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (states[i]) { if (b.getDocCount() >= minDocCount) {
List<DoubleTerms.Bucket> sameTermBuckets = (List<DoubleTerms.Bucket>) internalBuckets[i]; ordered.insertWithOverflow(b);
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler());
if (b.getDocCount() >= minDocCount) {
ordered.insertWithOverflow(b);
}
} }
} }
buckets.release(); buckets.release();

View File

@ -20,9 +20,9 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
@ -58,10 +58,10 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
return aggregations; return aggregations;
} }
public Bucket reduce(List<? extends Bucket> buckets, CacheRecycler cacheRecycler) { public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) { if (buckets.size() == 1) {
Bucket bucket = buckets.get(0); Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(cacheRecycler); bucket.aggregations.reduce(bigArrays);
return bucket; return bucket;
} }
Bucket reduced = null; Bucket reduced = null;
@ -74,7 +74,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
} }
aggregationsList.add(bucket.aggregations); aggregationsList.add(bucket.aggregations);
} }
reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced; return reduced;
} }
} }
@ -117,7 +117,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0); InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.cacheRecycler()); terms.trimExcessEntries(reduceContext.bigArrays());
return terms; return terms;
} }
@ -154,7 +154,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
for (Map.Entry<Text, List<Bucket>> entry : buckets.entrySet()) { for (Map.Entry<Text, List<Bucket>> entry : buckets.entrySet()) {
List<Bucket> sameTermBuckets = entry.getValue(); List<Bucket> sameTermBuckets = entry.getValue();
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (b.docCount >= minDocCount) { if (b.docCount >= minDocCount) {
ordered.insertWithOverflow(b); ordered.insertWithOverflow(b);
} }
@ -167,7 +167,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
return reduced; return reduced;
} }
final void trimExcessEntries(CacheRecycler cacheRecycler) { final void trimExcessEntries(BigArrays bigArrays) {
final List<Bucket> newBuckets = Lists.newArrayList(); final List<Bucket> newBuckets = Lists.newArrayList();
for (Bucket b : buckets) { for (Bucket b : buckets) {
if (newBuckets.size() >= requiredSize) { if (newBuckets.size() >= requiredSize) {
@ -175,7 +175,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
} }
if (b.docCount >= minDocCount) { if (b.docCount >= minDocCount) {
newBuckets.add(b); newBuckets.add(b);
b.aggregations.reduce(cacheRecycler); b.aggregations.reduce(bigArrays);
} }
} }
buckets = newBuckets; buckets = newBuckets;

View File

@ -18,13 +18,12 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.terms; package org.elasticsearch.search.aggregations.bucket.terms;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
@ -109,12 +108,12 @@ public class LongTerms extends InternalTerms {
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) { if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0); InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.cacheRecycler()); terms.trimExcessEntries(reduceContext.bigArrays());
return terms; return terms;
} }
InternalTerms reduced = null; InternalTerms reduced = null;
Recycler.V<LongObjectOpenHashMap<List<Bucket>>> buckets = null; LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation; InternalTerms terms = (InternalTerms) aggregation;
if (terms instanceof UnmappedTerms) { if (terms instanceof UnmappedTerms) {
@ -124,13 +123,13 @@ public class LongTerms extends InternalTerms {
reduced = terms; reduced = terms;
} }
if (buckets == null) { if (buckets == null) {
buckets = reduceContext.cacheRecycler().longObjectMap(terms.buckets.size()); buckets = new LongObjectPagedHashMap<List<Bucket>>(terms.buckets.size(), reduceContext.bigArrays());
} }
for (Terms.Bucket bucket : terms.buckets) { for (Terms.Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.v().get(((Bucket) bucket).term); List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
if (existingBuckets == null) { if (existingBuckets == null) {
existingBuckets = new ArrayList<Bucket>(aggregations.size()); existingBuckets = new ArrayList<Bucket>(aggregations.size());
buckets.v().put(((Bucket) bucket).term, existingBuckets); buckets.put(((Bucket) bucket).term, existingBuckets);
} }
existingBuckets.add((Bucket) bucket); existingBuckets.add((Bucket) bucket);
} }
@ -142,17 +141,13 @@ public class LongTerms extends InternalTerms {
} }
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
final int size = Math.min(requiredSize, buckets.v().size()); final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
Object[] internalBuckets = buckets.v().values; for (LongObjectPagedHashMap.Cursor<List<LongTerms.Bucket>> cursor : buckets) {
boolean[] states = buckets.v().allocated; List<LongTerms.Bucket> sameTermBuckets = cursor.value;
for (int i = 0; i < states.length; i++) { final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (states[i]) { if (b.getDocCount() >= minDocCount) {
List<LongTerms.Bucket> sameTermBuckets = (List<LongTerms.Bucket>) internalBuckets[i]; ordered.insertWithOverflow(b);
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler());
if (b.getDocCount() >= minDocCount) {
ordered.insertWithOverflow(b);
}
} }
} }
buckets.release(); buckets.release();

View File

@ -26,10 +26,11 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.*; import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue; import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
@ -67,12 +68,14 @@ public class SearchPhaseController extends AbstractComponent {
public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private final CacheRecycler cacheRecycler; private final CacheRecycler cacheRecycler;
private final BigArrays bigArrays;
private final boolean optimizeSingleShard; private final boolean optimizeSingleShard;
@Inject @Inject
public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler) { public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays) {
super(settings); super(settings);
this.cacheRecycler = cacheRecycler; this.cacheRecycler = cacheRecycler;
this.bigArrays = bigArrays;
this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true); this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
} }
@ -431,7 +434,7 @@ public class SearchPhaseController extends AbstractComponent {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) { for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
} }
aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
} }
} }

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.carrotsearch.hppc.DoubleObjectOpenHashMap;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
public class DoubleObjectHashMapTests extends ElasticsearchTestCase {
@Test
public void duel() {
final DoubleObjectOpenHashMap<Object> map1 = new DoubleObjectOpenHashMap<Object>();
final DoubleObjectPagedHashMap<Object> map2 = new DoubleObjectPagedHashMap<Object>(randomInt(42), 0.6f + randomFloat() * 0.39f, BigArraysTests.randombigArrays());
final int maxKey = randomIntBetween(1, 10000);
final int iters = atLeast(10000);
for (int i = 0; i < iters; ++i) {
final boolean put = randomBoolean();
final int iters2 = randomIntBetween(1, 100);
for (int j = 0; j < iters2; ++j) {
final double key = randomInt(maxKey);
if (put) {
final Object value = new Object();
assertSame(map1.put(key, value), map2.put(key, value));
} else {
assertSame(map1.remove(key), map2.remove(key));
}
assertEquals(map1.size(), map2.size());
}
}
for (int i = 0; i <= maxKey; ++i) {
assertSame(map1.get(i), map2.get(i));
}
final DoubleObjectOpenHashMap<Object> copy = new DoubleObjectOpenHashMap<Object>();
for (DoubleObjectPagedHashMap.Cursor<Object> cursor : map2) {
copy.put(cursor.key, cursor.value);
}
map2.release();
assertEquals(map1, copy);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
public class LongObjectHashMapTests extends ElasticsearchTestCase {
@Test
public void duel() {
final LongObjectOpenHashMap<Object> map1 = new LongObjectOpenHashMap<Object>();
final LongObjectPagedHashMap<Object> map2 = new LongObjectPagedHashMap<Object>(randomInt(42), 0.6f + randomFloat() * 0.39f, BigArraysTests.randombigArrays());
final int maxKey = randomIntBetween(1, 10000);
final int iters = atLeast(10000);
for (int i = 0; i < iters; ++i) {
final boolean put = randomBoolean();
final int iters2 = randomIntBetween(1, 100);
for (int j = 0; j < iters2; ++j) {
final long key = randomInt(maxKey);
if (put) {
final Object value = new Object();
assertSame(map1.put(key, value), map2.put(key, value));
} else {
assertSame(map1.remove(key), map2.remove(key));
}
assertEquals(map1.size(), map2.size());
}
}
for (int i = 0; i <= maxKey; ++i) {
assertSame(map1.get(i), map2.get(i));
}
final LongObjectOpenHashMap<Object> copy = new LongObjectOpenHashMap<Object>();
for (LongObjectPagedHashMap.Cursor<Object> cursor : map2) {
copy.put(cursor.key, cursor.value);
}
map2.release();
assertEquals(map1, copy);
}
}