add total count to terms stats and compute mean properly, also better use caching

This commit is contained in:
kimchy 2011-04-06 15:50:09 +03:00
parent 35be46df71
commit 6317483fe3
22 changed files with 566 additions and 347 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.common.trove.map.hash.*;
@ -30,8 +32,8 @@ import java.util.Deque;
public class CacheRecycler {
public static void clear() {
doubleObjectHashMap.remove();
longObjectHashMap.remove();
longObjectHashMap2.remove();
longLongHashMap.remove();
intIntHashMap.remove();
floatIntHashMap.remove();
@ -42,35 +44,63 @@ public class CacheRecycler {
intArray.remove();
}
// ----- ExtTLongObjectHashMap ----
// ----- ExtTHashMap -----
private static ThreadLocal<SoftReference<Deque<ExtTLongObjectHashMap>>> longObjectHashMap2 = new ThreadLocal<SoftReference<Deque<ExtTLongObjectHashMap>>>();
private static ThreadLocal<SoftReference<Deque<ExtTHashMap>>> hashMap = new ThreadLocal<SoftReference<Deque<ExtTHashMap>>>();
public static <T> ExtTLongObjectHashMap<T> popLongObjectMap2() {
SoftReference<Deque<ExtTLongObjectHashMap>> ref = longObjectHashMap2.get();
Deque<ExtTLongObjectHashMap> deque = ref == null ? null : ref.get();
public static <K, V> ExtTHashMap<K, V> popHashMap() {
SoftReference<Deque<ExtTHashMap>> ref = hashMap.get();
Deque<ExtTHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<ExtTLongObjectHashMap>();
longObjectHashMap2.set(new SoftReference<Deque<ExtTLongObjectHashMap>>(deque));
deque = new ArrayDeque<ExtTHashMap>();
hashMap.set(new SoftReference<Deque<ExtTHashMap>>(deque));
}
if (deque.isEmpty()) {
return new ExtTLongObjectHashMap();
return new ExtTHashMap();
}
ExtTLongObjectHashMap map = deque.pollFirst();
ExtTHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushLongObjectMap2(ExtTLongObjectHashMap map) {
SoftReference<Deque<ExtTLongObjectHashMap>> ref = longObjectHashMap2.get();
Deque<ExtTLongObjectHashMap> deque = ref == null ? null : ref.get();
public static void pushHashMap(ExtTHashMap map) {
SoftReference<Deque<ExtTHashMap>> ref = hashMap.get();
Deque<ExtTHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<ExtTLongObjectHashMap>();
longObjectHashMap2.set(new SoftReference<Deque<ExtTLongObjectHashMap>>(deque));
deque = new ArrayDeque<ExtTHashMap>();
hashMap.set(new SoftReference<Deque<ExtTHashMap>>(deque));
}
deque.add(map);
}
// ------ ExtTDoubleObjectHashMap -----
private static ThreadLocal<SoftReference<Deque<ExtTDoubleObjectHashMap>>> doubleObjectHashMap = new ThreadLocal<SoftReference<Deque<ExtTDoubleObjectHashMap>>>();
public static <T> ExtTDoubleObjectHashMap<T> popDoubleObjectMap() {
SoftReference<Deque<ExtTDoubleObjectHashMap>> ref = doubleObjectHashMap.get();
Deque<ExtTDoubleObjectHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<ExtTDoubleObjectHashMap>();
doubleObjectHashMap.set(new SoftReference<Deque<ExtTDoubleObjectHashMap>>(deque));
}
if (deque.isEmpty()) {
return new ExtTDoubleObjectHashMap();
}
ExtTDoubleObjectHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushDoubleObjectMap(ExtTDoubleObjectHashMap map) {
SoftReference<Deque<ExtTDoubleObjectHashMap>> ref = doubleObjectHashMap.get();
Deque<ExtTDoubleObjectHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<ExtTDoubleObjectHashMap>();
doubleObjectHashMap.set(new SoftReference<Deque<ExtTDoubleObjectHashMap>>(deque));
}
deque.add(map);
}
// ----- ExtTLongObjectHashMap ----

View File

@ -98,6 +98,8 @@ public abstract class NumericFieldData<Doc extends NumericDocFieldData> extends
public abstract void forEachValueInDoc(int docId, LongValueInDocProc proc);
public abstract void forEachValueInDoc(int docId, MissingLongValueInDocProc proc);
public static interface DoubleValueInDocProc {
void onValue(int docId, double value);
}
@ -106,6 +108,12 @@ public abstract class NumericFieldData<Doc extends NumericDocFieldData> extends
void onValue(int docId, long value);
}
public static interface MissingLongValueInDocProc {
void onValue(int docId, long value);
void onMissing(int docId);
}
public static interface MissingDoubleValueInDocProc {
void onValue(int docId, double value);

View File

@ -126,6 +126,20 @@ public class MultiValueByteFieldData extends ByteFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -95,6 +95,15 @@ public class SingleValueByteFieldData extends ByteFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -115,6 +115,20 @@ public class MultiValueDoubleFieldData extends DoubleFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, (long) values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -88,6 +88,15 @@ public class SingleValueDoubleFieldData extends DoubleFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, (long) values[loc]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -126,6 +126,20 @@ public class MultiValueFloatFieldData extends FloatFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, (long) values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -95,6 +95,15 @@ public class SingleValueFloatFieldData extends FloatFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, (long) values[loc]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -126,6 +126,20 @@ public class MultiValueIntFieldData extends IntFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -95,6 +95,15 @@ public class SingleValueIntFieldData extends IntFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -142,6 +142,20 @@ public class MultiValueLongFieldData extends LongFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);

View File

@ -105,6 +105,15 @@ public class SingleValueLongFieldData extends LongFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}

View File

@ -126,6 +126,20 @@ public class MultiValueShortFieldData extends ShortFieldData {
}
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];
if (loc != 0) {
found = true;
proc.onValue(docId, values[loc]);
}
}
if (!found) {
proc.onMissing(docId);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -95,6 +95,15 @@ public class SingleValueShortFieldData extends ShortFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, MissingLongValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {
proc.onMissing(docId);
return;
}
proc.onValue(docId, values[loc]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -67,7 +67,7 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
if (o2 == null) {
return -1;
}
int i = o2.count() - o1.count();
int i = (o2.count() < o1.count() ? -1 : (o1.count() == o2.count() ? 0 : 1));
if (i == 0) {
i = o2.term().compareTo(o1.term());
if (i == 0) {
@ -301,9 +301,13 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
Number getTermAsNumber();
int count();
long count();
int getCount();
long getCount();
long totalCount();
long getTotalCount();
double min();

View File

@ -19,10 +19,10 @@
package org.elasticsearch.search.facet.termsstats.doubles;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -56,15 +56,17 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
public static class DoubleEntry implements Entry {
double term;
int count;
long count;
long totalCount;
double total;
double min;
double max;
public DoubleEntry(double term, int count, double total, double min, double max) {
public DoubleEntry(double term, long count, long totalCount, double total, double min, double max) {
this.term = term;
this.count = count;
this.total = total;
this.totalCount = totalCount;
this.min = min;
this.max = max;
}
@ -85,14 +87,22 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
return termAsNumber();
}
@Override public int count() {
@Override public long count() {
return count;
}
@Override public int getCount() {
@Override public long getCount() {
return count();
}
@Override public long totalCount() {
return this.totalCount;
}
@Override public long getTotalCount() {
return this.totalCount;
}
@Override public double min() {
return this.min;
}
@ -118,7 +128,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
}
@Override public double mean() {
return total / count;
return total / totalCount;
}
@Override public double getMean() {
@ -195,12 +205,6 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>(new ExtTDoubleObjectHashMap<DoubleEntry>());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
if (requiredSize == 0) {
@ -214,7 +218,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
ExtTDoubleObjectHashMap<DoubleEntry> map = aggregateCache.get().get();
ExtTDoubleObjectHashMap<DoubleEntry> map = CacheRecycler.popDoubleObjectMap();
map.clear();
for (Facet facet : facets) {
InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facet;
@ -224,11 +228,12 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
DoubleEntry current = map.get(doubleEntry.term);
if (current != null) {
current.count += doubleEntry.count;
current.totalCount += doubleEntry.totalCount;
current.total += doubleEntry.total;
if (doubleEntry.min < current.min || Double.isNaN(current.min)) {
if (doubleEntry.min < current.min) {
current.min = doubleEntry.min;
}
if (doubleEntry.max > current.max || Double.isNaN(current.max)) {
if (doubleEntry.max > current.max) {
current.max = doubleEntry.max;
}
} else {
@ -241,6 +246,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
if (requiredSize == 0) { // all terms
DoubleEntry[] entries1 = map.values(new DoubleEntry[map.size()]);
Arrays.sort(entries1, comparatorType.comparator());
CacheRecycler.pushDoubleObjectMap(map);
return new InternalTermsStatsDoubleFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
Object[] values = map.internalValues();
@ -253,6 +259,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
}
ordered.add(value);
}
CacheRecycler.pushDoubleObjectMap(map);
return new InternalTermsStatsDoubleFacet(name, comparatorType, requiredSize, ordered, missing);
}
}
@ -263,6 +270,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
static final XContentBuilderString TERM = new XContentBuilderString("term");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
static final XContentBuilderString TOTAL_COUNT = new XContentBuilderString("total_count");
static final XContentBuilderString MIN = new XContentBuilderString("min");
static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
@ -278,6 +286,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
builder.startObject();
builder.field(Fields.TERM, ((DoubleEntry) entry).term);
builder.field(Fields.COUNT, entry.count());
builder.field(Fields.TOTAL_COUNT, entry.totalCount());
builder.field(Fields.MIN, entry.min());
builder.field(Fields.MAX, entry.max());
builder.field(Fields.TOTAL, entry.total());
@ -304,7 +313,7 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
int size = in.readVInt();
entries = new ArrayList<DoubleEntry>(size);
for (int i = 0; i < size; i++) {
entries.add(new DoubleEntry(in.readDouble(), in.readVInt(), in.readDouble(), in.readDouble(), in.readDouble()));
entries.add(new DoubleEntry(in.readDouble(), in.readVLong(), in.readVLong(), in.readDouble(), in.readDouble(), in.readDouble()));
}
}
@ -317,7 +326,8 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
out.writeVInt(entries.size());
for (Entry entry : entries) {
out.writeDouble(((DoubleEntry) entry).term);
out.writeVInt(entry.count());
out.writeVLong(entry.count());
out.writeVLong(entry.totalCount());
out.writeDouble(entry.total());
out.writeDouble(entry.min());
out.writeDouble(entry.max());

View File

@ -22,9 +22,9 @@ package org.elasticsearch.search.facet.termsstats.doubles;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
@ -38,7 +38,10 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
@ -60,13 +63,9 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
private final FieldDataType valueFieldDataType;
private NumericFieldData valueFieldData;
private final SearchScript script;
private int missing = 0;
private final ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> entries;
private final Aggregator aggregator;
public TermsStatsDoubleFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -98,13 +97,13 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
this.valueFieldName = fieldMapper.names().indexName();
this.valueFieldDataType = fieldMapper.fieldDataType();
this.script = null;
this.aggregator = new Aggregator();
} else {
this.valueFieldName = null;
this.valueFieldDataType = null;
this.script = context.scriptService().search(context.lookup(), scriptLang, script, params);
this.aggregator = new ScriptAggregator(this.script);
}
this.entries = popFacets();
}
@Override public void setScorer(Scorer scorer) throws IOException {
@ -115,74 +114,29 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
keyFieldData = (NumericFieldData) fieldDataCache.cache(keyFieldDataType, reader, keyFieldName);
if (valueFieldName != null) {
valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
if (script != null) {
script.setNextReader(reader);
} else {
aggregator.valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
}
@Override protected void doCollect(int doc) throws IOException {
if (!keyFieldData.hasValue(doc)) {
missing++;
return;
}
double key = keyFieldData.doubleValue(doc);
InternalTermsStatsDoubleFacet.DoubleEntry doubleEntry = entries.get(key);
if (doubleEntry == null) {
doubleEntry = new InternalTermsStatsDoubleFacet.DoubleEntry(key, 1, 0, Double.NaN, Double.NaN);
entries.put(key, doubleEntry);
} else {
doubleEntry.count++;
}
if (script == null) {
if (valueFieldData.multiValued()) {
for (double value : valueFieldData.doubleValues(doc)) {
if (value < doubleEntry.min || Double.isNaN(doubleEntry.min)) {
doubleEntry.min = value;
}
if (value > doubleEntry.max || Double.isNaN(doubleEntry.max)) {
doubleEntry.max = value;
}
doubleEntry.total += value;
}
} else {
double value = valueFieldData.doubleValue(doc);
if (value < doubleEntry.min || Double.isNaN(doubleEntry.min)) {
doubleEntry.min = value;
}
if (value > doubleEntry.max || Double.isNaN(doubleEntry.max)) {
doubleEntry.max = value;
}
doubleEntry.total += value;
}
} else {
script.setNextDocId(doc);
double value = script.runAsDouble();
if (value < doubleEntry.min || Double.isNaN(doubleEntry.min)) {
doubleEntry.min = value;
}
if (value > doubleEntry.max || Double.isNaN(doubleEntry.max)) {
doubleEntry.max = value;
}
doubleEntry.total += value;
}
keyFieldData.forEachValueInDoc(doc, aggregator);
}
@Override public Facet facet() {
if (entries.isEmpty()) {
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsDoubleFacet.DoubleEntry>of(), missing);
if (aggregator.entries.isEmpty()) {
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsDoubleFacet.DoubleEntry>of(), aggregator.missing);
}
if (size == 0) { // all terms
// all terms, just return the collection, we will sort it on the way back
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.valueCollection(), missing);
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, 0 /* indicates all terms*/, aggregator.entries.valueCollection(), aggregator.missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
Object[] values = entries.internalValues();
Object[] values = aggregator.entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
int limit = size * numberOfShards;
int limit = size;
List<InternalTermsStatsDoubleFacet.DoubleEntry> ordered = Lists.newArrayList();
for (int i = 0; i < limit; i++) {
InternalTermsStatsDoubleFacet.DoubleEntry value = (InternalTermsStatsDoubleFacet.DoubleEntry) values[i];
@ -192,34 +146,82 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
pushFacets(entries);
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, size, ordered, missing);
CacheRecycler.pushDoubleObjectMap(aggregator.entries);
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, size, ordered, aggregator.missing);
}
public static class Aggregator implements NumericFieldData.MissingDoubleValueInDocProc {
static ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> popFacets() {
Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>());
final ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> entries = CacheRecycler.popDoubleObjectMap();
int missing;
NumericFieldData valueFieldData;
@Override public void onValue(int docId, double value) {
InternalTermsStatsDoubleFacet.DoubleEntry doubleEntry = entries.get(value);
if (doubleEntry == null) {
doubleEntry = new InternalTermsStatsDoubleFacet.DoubleEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, doubleEntry);
} else {
doubleEntry.count++;
}
if (valueFieldData.multiValued()) {
double[] valueValues = valueFieldData.doubleValues(docId);
doubleEntry.totalCount += valueValues.length;
for (double valueValue : valueValues) {
if (valueValue < doubleEntry.min) {
doubleEntry.min = valueValue;
}
if (valueValue > doubleEntry.max) {
doubleEntry.max = valueValue;
}
doubleEntry.total += valueValue;
}
} else {
double valueValue = valueFieldData.doubleValue(docId);
if (valueValue < doubleEntry.min) {
doubleEntry.min = valueValue;
}
if (valueValue > doubleEntry.max) {
doubleEntry.max = valueValue;
}
doubleEntry.totalCount++;
doubleEntry.total += valueValue;
}
}
ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets) {
facets.clear();
Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
@Override public void onMissing(int docId) {
missing++;
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>(new ArrayDeque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>());
public static class ScriptAggregator extends Aggregator {
private final SearchScript script;
public ScriptAggregator(SearchScript script) {
this.script = script;
}
};
@Override public void onValue(int docId, double value) {
InternalTermsStatsDoubleFacet.DoubleEntry doubleEntry = entries.get(value);
if (doubleEntry == null) {
doubleEntry = new InternalTermsStatsDoubleFacet.DoubleEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, doubleEntry);
} else {
doubleEntry.count++;
}
script.setNextDocId(docId);
double valueValue = script.runAsDouble();
if (valueValue < doubleEntry.min) {
doubleEntry.min = valueValue;
}
if (valueValue > doubleEntry.max) {
doubleEntry.max = valueValue;
}
doubleEntry.totalCount++;
doubleEntry.total += valueValue;
}
}
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.search.facet.termsstats.longs;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -56,12 +56,13 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
public static class LongEntry implements Entry {
long term;
int count;
long count;
long totalCount;
double total;
double min;
double max;
public LongEntry(long term, int count, double total, double min, double max) {
public LongEntry(long term, long count, long totalCount, double total, double min, double max) {
this.term = term;
this.count = count;
this.total = total;
@ -85,14 +86,22 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
return termAsNumber();
}
@Override public int count() {
@Override public long count() {
return count;
}
@Override public int getCount() {
@Override public long getCount() {
return count();
}
@Override public long totalCount() {
return this.totalCount;
}
@Override public long getTotalCount() {
return this.totalCount;
}
@Override public double min() {
return this.min;
}
@ -118,7 +127,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
}
@Override public double mean() {
return total / count;
return total / totalCount;
}
@Override public double getMean() {
@ -195,12 +204,6 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>(new ExtTLongObjectHashMap<LongEntry>());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
if (requiredSize == 0) {
@ -214,7 +217,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
ExtTLongObjectHashMap<LongEntry> map = aggregateCache.get().get();
ExtTLongObjectHashMap<LongEntry> map = CacheRecycler.popLongObjectMap();
map.clear();
for (Facet facet : facets) {
InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facet;
@ -224,11 +227,12 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
LongEntry current = map.get(longEntry.term);
if (current != null) {
current.count += longEntry.count;
current.totalCount += longEntry.totalCount;
current.total += longEntry.total;
if (longEntry.min < current.min || Double.isNaN(current.min)) {
if (longEntry.min < current.min) {
current.min = longEntry.min;
}
if (longEntry.max > current.max || Double.isNaN(current.max)) {
if (longEntry.max > current.max) {
current.max = longEntry.max;
}
} else {
@ -241,6 +245,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
if (requiredSize == 0) { // all terms
LongEntry[] entries1 = map.values(new LongEntry[map.size()]);
Arrays.sort(entries1, comparatorType.comparator());
CacheRecycler.pushLongObjectMap(map);
return new InternalTermsStatsLongFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
Object[] values = map.internalValues();
@ -253,6 +258,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
}
ordered.add(value);
}
CacheRecycler.pushLongObjectMap(map);
return new InternalTermsStatsLongFacet(name, comparatorType, requiredSize, ordered, missing);
}
}
@ -263,6 +269,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
static final XContentBuilderString TERM = new XContentBuilderString("term");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
static final XContentBuilderString TOTAL_COUNT = new XContentBuilderString("total_count");
static final XContentBuilderString MIN = new XContentBuilderString("min");
static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
@ -278,6 +285,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
builder.startObject();
builder.field(Fields.TERM, ((LongEntry) entry).term);
builder.field(Fields.COUNT, entry.count());
builder.field(Fields.TOTAL_COUNT, entry.totalCount());
builder.field(Fields.MIN, entry.min());
builder.field(Fields.MAX, entry.max());
builder.field(Fields.TOTAL, entry.total());
@ -304,7 +312,7 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
int size = in.readVInt();
entries = new ArrayList<LongEntry>(size);
for (int i = 0; i < size; i++) {
entries.add(new LongEntry(in.readLong(), in.readVInt(), in.readDouble(), in.readDouble(), in.readDouble()));
entries.add(new LongEntry(in.readLong(), in.readVLong(), in.readVLong(), in.readDouble(), in.readDouble(), in.readDouble()));
}
}
@ -317,7 +325,8 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
out.writeVInt(entries.size());
for (Entry entry : entries) {
out.writeLong(((LongEntry) entry).term);
out.writeVInt(entry.count());
out.writeVLong(entry.count());
out.writeVLong(entry.totalCount());
out.writeDouble(entry.total());
out.writeDouble(entry.min());
out.writeDouble(entry.max());

View File

@ -22,9 +22,9 @@ package org.elasticsearch.search.facet.termsstats.longs;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
@ -38,7 +38,10 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
@ -60,13 +63,10 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
private final FieldDataType valueFieldDataType;
private NumericFieldData valueFieldData;
private final SearchScript script;
private int missing = 0;
private final ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> entries;
private final Aggregator aggregator;
public TermsStatsLongFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -98,13 +98,13 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
this.valueFieldName = fieldMapper.names().indexName();
this.valueFieldDataType = fieldMapper.fieldDataType();
this.script = null;
this.aggregator = new Aggregator();
} else {
this.valueFieldName = null;
this.valueFieldDataType = null;
this.script = context.scriptService().search(context.lookup(), scriptLang, script, params);
this.aggregator = new ScriptAggregator(this.script);
}
this.entries = popFacets();
}
@Override public void setScorer(Scorer scorer) throws IOException {
@ -115,75 +115,31 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
keyFieldData = (NumericFieldData) fieldDataCache.cache(keyFieldDataType, reader, keyFieldName);
if (valueFieldName != null) {
valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
if (script != null) {
script.setNextReader(reader);
} else {
aggregator.valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
}
@Override protected void doCollect(int doc) throws IOException {
if (!keyFieldData.hasValue(doc)) {
missing++;
return;
}
long key = keyFieldData.longValue(doc);
InternalTermsStatsLongFacet.LongEntry longEntry = entries.get(key);
if (longEntry == null) {
longEntry = new InternalTermsStatsLongFacet.LongEntry(key, 1, 0, Double.NaN, Double.NaN);
entries.put(key, longEntry);
} else {
longEntry.count++;
}
if (script == null) {
if (valueFieldData.multiValued()) {
for (double value : valueFieldData.doubleValues(doc)) {
if (value < longEntry.min || Double.isNaN(longEntry.min)) {
longEntry.min = value;
}
if (value > longEntry.max || Double.isNaN(longEntry.max)) {
longEntry.max = value;
}
longEntry.total += value;
}
} else {
double value = valueFieldData.doubleValue(doc);
if (value < longEntry.min || Double.isNaN(longEntry.min)) {
longEntry.min = value;
}
if (value > longEntry.max || Double.isNaN(longEntry.max)) {
longEntry.max = value;
}
longEntry.total += value;
}
} else {
script.setNextDocId(doc);
double value = script.runAsDouble();
if (value < longEntry.min || Double.isNaN(longEntry.min)) {
longEntry.min = value;
}
if (value > longEntry.max || Double.isNaN(longEntry.max)) {
longEntry.max = value;
}
longEntry.total += value;
}
keyFieldData.forEachValueInDoc(doc, aggregator);
}
@Override public Facet facet() {
if (entries.isEmpty()) {
return new InternalTermsStatsLongFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsLongFacet.LongEntry>of(), missing);
if (aggregator.entries.isEmpty()) {
return new InternalTermsStatsLongFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsLongFacet.LongEntry>of(), aggregator.missing);
}
if (size == 0) { // all terms
// all terms, just return the collection, we will sort it on the way back
return new InternalTermsStatsLongFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.valueCollection(), missing);
return new InternalTermsStatsLongFacet(facetName, comparatorType, 0 /* indicates all terms*/, aggregator.entries.valueCollection(), aggregator.missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
Object[] values = entries.internalValues();
Object[] values = aggregator.entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
int limit = size * numberOfShards;
int limit = size;
List<InternalTermsStatsLongFacet.LongEntry> ordered = Lists.newArrayList();
for (int i = 0; i < limit; i++) {
InternalTermsStatsLongFacet.LongEntry value = (InternalTermsStatsLongFacet.LongEntry) values[i];
@ -192,34 +148,82 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
}
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
pushFacets(entries);
return new InternalTermsStatsLongFacet(facetName, comparatorType, size, ordered, missing);
CacheRecycler.pushLongObjectMap(aggregator.entries);
return new InternalTermsStatsLongFacet(facetName, comparatorType, size, ordered, aggregator.missing);
}
public static class Aggregator implements NumericFieldData.MissingLongValueInDocProc {
static ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> popFacets() {
Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>());
final ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> entries = CacheRecycler.popLongObjectMap();
int missing;
NumericFieldData valueFieldData;
@Override public void onValue(int docId, long value) {
InternalTermsStatsLongFacet.LongEntry longEntry = entries.get(value);
if (longEntry == null) {
longEntry = new InternalTermsStatsLongFacet.LongEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, longEntry);
} else {
longEntry.count++;
}
if (valueFieldData.multiValued()) {
double[] valueValues = valueFieldData.doubleValues(docId);
longEntry.totalCount += valueValues.length;
for (double valueValue : valueValues) {
if (valueValue < longEntry.min) {
longEntry.min = valueValue;
}
if (valueValue > longEntry.max) {
longEntry.max = valueValue;
}
longEntry.total += valueValue;
}
} else {
double valueValue = valueFieldData.doubleValue(docId);
if (valueValue < longEntry.min) {
longEntry.min = valueValue;
}
if (valueValue > longEntry.max) {
longEntry.max = valueValue;
}
longEntry.totalCount++;
longEntry.total += valueValue;
}
}
ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets) {
facets.clear();
Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
@Override public void onMissing(int docId) {
missing++;
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>(new ArrayDeque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>());
public static class ScriptAggregator extends Aggregator {
private final SearchScript script;
public ScriptAggregator(SearchScript script) {
this.script = script;
}
};
@Override public void onValue(int docId, long value) {
InternalTermsStatsLongFacet.LongEntry longEntry = entries.get(value);
if (longEntry == null) {
longEntry = new InternalTermsStatsLongFacet.LongEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, longEntry);
} else {
longEntry.count++;
}
script.setNextDocId(docId);
double valueValue = script.runAsDouble();
if (valueValue < longEntry.min) {
longEntry.min = valueValue;
}
if (valueValue > longEntry.max) {
longEntry.max = valueValue;
}
longEntry.totalCount++;
longEntry.total += valueValue;
}
}
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.search.facet.termsstats.strings;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -56,14 +56,16 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
public static class StringEntry implements Entry {
String term;
int count;
long count;
long totalCount;
double total;
double min;
double max;
public StringEntry(String term, int count, double total, double min, double max) {
public StringEntry(String term, long count, long totalCount, double total, double min, double max) {
this.term = term;
this.count = count;
this.totalCount = totalCount;
this.total = total;
this.min = min;
this.max = max;
@ -85,14 +87,22 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
return termAsNumber();
}
@Override public int count() {
@Override public long count() {
return count;
}
@Override public int getCount() {
@Override public long getCount() {
return count();
}
@Override public long totalCount() {
return this.totalCount;
}
@Override public long getTotalCount() {
return this.totalCount;
}
@Override public double min() {
return this.min;
}
@ -118,7 +128,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
}
@Override public double mean() {
return total / count;
return total / totalCount;
}
@Override public double getMean() {
@ -194,12 +204,6 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>(new ExtTHashMap<String, StringEntry>());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
if (requiredSize == 0) {
@ -213,8 +217,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
ExtTHashMap<String, StringEntry> map = aggregateCache.get().get();
map.clear();
ExtTHashMap<String, StringEntry> map = CacheRecycler.popHashMap();
for (Facet facet : facets) {
InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facet;
missing += tsFacet.missing;
@ -223,11 +226,12 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
StringEntry current = map.get(stringEntry.term());
if (current != null) {
current.count += stringEntry.count;
current.totalCount += stringEntry.totalCount;
current.total += stringEntry.total;
if (stringEntry.min < current.min || Double.isNaN(current.min)) {
if (stringEntry.min < current.min) {
current.min = stringEntry.min;
}
if (stringEntry.max > current.max || Double.isNaN(current.max)) {
if (stringEntry.max > current.max) {
current.max = stringEntry.max;
}
} else {
@ -240,6 +244,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
if (requiredSize == 0) { // all terms
StringEntry[] entries1 = map.values().toArray(new StringEntry[map.size()]);
Arrays.sort(entries1, comparatorType.comparator());
CacheRecycler.pushHashMap(map);
return new InternalTermsStatsStringFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
Object[] values = map.internalValues();
@ -252,6 +257,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
}
ordered.add(value);
}
CacheRecycler.pushHashMap(map);
return new InternalTermsStatsStringFacet(name, comparatorType, requiredSize, ordered, missing);
}
}
@ -262,6 +268,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
static final XContentBuilderString TERM = new XContentBuilderString("term");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
static final XContentBuilderString TOTAL_COUNT = new XContentBuilderString("total_count");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString MIN = new XContentBuilderString("min");
static final XContentBuilderString MAX = new XContentBuilderString("max");
@ -277,6 +284,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
builder.startObject();
builder.field(Fields.TERM, entry.term());
builder.field(Fields.COUNT, entry.count());
builder.field(Fields.TOTAL_COUNT, entry.totalCount());
builder.field(Fields.MIN, entry.min());
builder.field(Fields.MAX, entry.max());
builder.field(Fields.TOTAL, entry.total());
@ -303,7 +311,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
int size = in.readVInt();
entries = new ArrayList<StringEntry>(size);
for (int i = 0; i < size; i++) {
entries.add(new StringEntry(in.readUTF(), in.readVInt(), in.readDouble(), in.readDouble(), in.readDouble()));
entries.add(new StringEntry(in.readUTF(), in.readVLong(), in.readVLong(), in.readDouble(), in.readDouble(), in.readDouble()));
}
}
@ -316,7 +324,8 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
out.writeVInt(entries.size());
for (Entry entry : entries) {
out.writeUTF(entry.term());
out.writeVInt(entry.count());
out.writeVLong(entry.count());
out.writeVLong(entry.totalCount());
out.writeDouble(entry.total());
out.writeDouble(entry.min());
out.writeDouble(entry.max());

View File

@ -22,9 +22,9 @@ package org.elasticsearch.search.facet.termsstats.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
@ -39,7 +39,10 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
@ -61,13 +64,9 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
private final FieldDataType valueFieldDataType;
private NumericFieldData valueFieldData;
private final SearchScript script;
private int missing = 0;
private final ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> entries;
private final Aggregator aggregator;
public TermsStatsStringFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -99,13 +98,13 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
this.valueFieldName = fieldMapper.names().indexName();
this.valueFieldDataType = fieldMapper.fieldDataType();
this.script = null;
this.aggregator = new Aggregator();
} else {
this.valueFieldName = null;
this.valueFieldDataType = null;
this.script = context.scriptService().search(context.lookup(), scriptLang, script, params);
this.aggregator = new ScriptAggregator(this.script);
}
this.entries = popFacets();
}
@Override public void setScorer(Scorer scorer) throws IOException {
@ -116,75 +115,30 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
keyFieldData = fieldDataCache.cache(keyFieldDataType, reader, keyFieldName);
if (valueFieldName != null) {
valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
if (script != null) {
script.setNextReader(reader);
} else {
aggregator.valueFieldData = (NumericFieldData) fieldDataCache.cache(valueFieldDataType, reader, valueFieldName);
}
}
@Override protected void doCollect(int doc) throws IOException {
if (!keyFieldData.hasValue(doc)) {
missing++;
return;
}
String key = keyFieldData.stringValue(doc);
InternalTermsStatsStringFacet.StringEntry stringEntry = entries.get(key);
if (stringEntry == null) {
stringEntry = new InternalTermsStatsStringFacet.StringEntry(key, 1, 0, Double.NaN, Double.NaN);
entries.put(key, stringEntry);
} else {
stringEntry.count++;
}
if (script == null) {
if (valueFieldData.multiValued()) {
for (double value : valueFieldData.doubleValues(doc)) {
if (value < stringEntry.min || Double.isNaN(stringEntry.min)) {
stringEntry.min = value;
}
if (value > stringEntry.max || Double.isNaN(stringEntry.max)) {
stringEntry.max = value;
}
stringEntry.total += value;
}
} else {
double value = valueFieldData.doubleValue(doc);
if (value < stringEntry.min || Double.isNaN(stringEntry.min)) {
stringEntry.min = value;
}
if (value > stringEntry.max || Double.isNaN(stringEntry.max)) {
stringEntry.max = value;
}
stringEntry.total += value;
}
} else {
script.setNextDocId(doc);
double value = script.runAsDouble();
if (value < stringEntry.min || Double.isNaN(stringEntry.min)) {
stringEntry.min = value;
}
if (value > stringEntry.max || Double.isNaN(stringEntry.max)) {
stringEntry.max = value;
}
stringEntry.total += value;
}
keyFieldData.forEachValueInDoc(doc, aggregator);
}
@Override public Facet facet() {
if (entries.isEmpty()) {
return new InternalTermsStatsStringFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsStringFacet.StringEntry>of(), missing);
if (aggregator.entries.isEmpty()) {
return new InternalTermsStatsStringFacet(facetName, comparatorType, size, ImmutableList.<InternalTermsStatsStringFacet.StringEntry>of(), aggregator.missing);
}
if (size == 0) { // all terms
// all terms, just return the collection, we will sort it on the way back
return new InternalTermsStatsStringFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.values(), missing);
return new InternalTermsStatsStringFacet(facetName, comparatorType, 0 /* indicates all terms*/, aggregator.entries.values(), aggregator.missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
Object[] values = entries.internalValues();
Object[] values = aggregator.entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<InternalTermsStatsStringFacet.StringEntry> ordered = Lists.newArrayList();
int limit = size * numberOfShards;
int limit = size;
for (int i = 0; i < limit; i++) {
InternalTermsStatsStringFacet.StringEntry value = (InternalTermsStatsStringFacet.StringEntry) values[i];
if (value == null) {
@ -193,34 +147,83 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
pushFacets(entries);
return new InternalTermsStatsStringFacet(facetName, comparatorType, size, ordered, missing);
CacheRecycler.pushHashMap(aggregator.entries); // fine to push here, we are done with it
return new InternalTermsStatsStringFacet(facetName, comparatorType, size, ordered, aggregator.missing);
}
public static class Aggregator implements FieldData.StringValueInDocProc {
static ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> popFacets() {
Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>());
final ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> entries = CacheRecycler.popHashMap();
int missing = 0;
NumericFieldData valueFieldData;
@Override public void onValue(int docId, String value) {
InternalTermsStatsStringFacet.StringEntry stringEntry = entries.get(value);
if (stringEntry == null) {
stringEntry = new InternalTermsStatsStringFacet.StringEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, stringEntry);
} else {
stringEntry.count++;
}
if (valueFieldData.multiValued()) {
double[] valueValues = valueFieldData.doubleValues(docId);
stringEntry.totalCount += valueValues.length;
for (double valueValue : valueValues) {
if (valueValue < stringEntry.min) {
stringEntry.min = valueValue;
}
if (valueValue > stringEntry.max) {
stringEntry.max = valueValue;
}
stringEntry.total += valueValue;
}
} else {
double valueValue = valueFieldData.doubleValue(docId);
if (valueValue < stringEntry.min) {
stringEntry.min = valueValue;
}
if (valueValue > stringEntry.max) {
stringEntry.max = valueValue;
}
stringEntry.total += valueValue;
stringEntry.totalCount++;
}
}
ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> facets) {
facets.clear();
Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
@Override public void onMissing(int docId) {
missing++;
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>(new ArrayDeque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>());
public static class ScriptAggregator extends Aggregator {
private final SearchScript script;
public ScriptAggregator(SearchScript script) {
this.script = script;
}
};
@Override public void onValue(int docId, String value) {
InternalTermsStatsStringFacet.StringEntry stringEntry = entries.get(value);
if (stringEntry == null) {
stringEntry = new InternalTermsStatsStringFacet.StringEntry(value, 1, 0, 0, Double.MAX_VALUE, Double.MIN_VALUE);
entries.put(value, stringEntry);
} else {
stringEntry.count++;
}
script.setNextDocId(docId);
double valueValue = script.runAsDouble();
if (valueValue < stringEntry.min) {
stringEntry.min = valueValue;
}
if (valueValue > stringEntry.max) {
stringEntry.max = valueValue;
}
stringEntry.total += valueValue;
stringEntry.totalCount++;
}
}
}

View File

@ -1480,12 +1480,14 @@ public class SimpleFacetsTests extends AbstractNodesTests {
TermsStatsFacet facet = searchResponse.facets().facet("stats1");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).totalCount(), equalTo(2l));
assertThat(facet.entries().get(0).min(), closeTo(100d, 0.00001d));
assertThat(facet.entries().get(0).max(), closeTo(200d, 0.00001d));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).totalCount(), equalTo(1l));
assertThat(facet.entries().get(1).min(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).max(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));
@ -1493,12 +1495,12 @@ public class SimpleFacetsTests extends AbstractNodesTests {
facet = searchResponse.facets().facet("stats2");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).min(), closeTo(1d, 0.00001d));
assertThat(facet.entries().get(0).max(), closeTo(3d, 0.00001d));
assertThat(facet.entries().get(0).total(), closeTo(8d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).min(), closeTo(5d, 0.00001d));
assertThat(facet.entries().get(1).max(), closeTo(6d, 0.00001d));
assertThat(facet.entries().get(1).total(), closeTo(11d, 0.00001d));
@ -1506,100 +1508,100 @@ public class SimpleFacetsTests extends AbstractNodesTests {
facet = searchResponse.facets().facet("stats3");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));
facet = searchResponse.facets().facet("stats4");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(8d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(11d, 0.00001d));
facet = searchResponse.facets().facet("stats5");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("yyy"));
assertThat(facet.entries().get(0).count(), equalTo(1));
assertThat(facet.entries().get(0).count(), equalTo(1l));
assertThat(facet.entries().get(0).total(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("xxx"));
assertThat(facet.entries().get(1).count(), equalTo(2));
assertThat(facet.entries().get(1).count(), equalTo(2l));
assertThat(facet.entries().get(1).total(), closeTo(300d, 0.00001d));
facet = searchResponse.facets().facet("stats6");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("yyy"));
assertThat(facet.entries().get(0).count(), equalTo(1));
assertThat(facet.entries().get(0).count(), equalTo(1l));
assertThat(facet.entries().get(0).total(), closeTo(11d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("xxx"));
assertThat(facet.entries().get(1).count(), equalTo(2));
assertThat(facet.entries().get(1).count(), equalTo(2l));
assertThat(facet.entries().get(1).total(), closeTo(8d, 0.00001d));
facet = searchResponse.facets().facet("stats7");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));
facet = searchResponse.facets().facet("stats8");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(8d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(11d, 0.00001d));
facet = searchResponse.facets().facet("stats9");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));
facet = searchResponse.facets().facet("stats10");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(8d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(11d, 0.00001d));
facet = searchResponse.facets().facet("stats11");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("yyy"));
assertThat(facet.entries().get(0).count(), equalTo(1));
assertThat(facet.entries().get(0).count(), equalTo(1l));
assertThat(facet.entries().get(0).total(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("xxx"));
assertThat(facet.entries().get(1).count(), equalTo(2));
assertThat(facet.entries().get(1).count(), equalTo(2l));
assertThat(facet.entries().get(1).total(), closeTo(300d, 0.00001d));
facet = searchResponse.facets().facet("stats12");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("yyy"));
assertThat(facet.entries().get(0).count(), equalTo(1));
assertThat(facet.entries().get(0).count(), equalTo(1l));
assertThat(facet.entries().get(0).total(), closeTo(11d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("xxx"));
assertThat(facet.entries().get(1).count(), equalTo(2));
assertThat(facet.entries().get(1).count(), equalTo(2l));
assertThat(facet.entries().get(1).total(), closeTo(8d, 0.00001d));
facet = searchResponse.facets().facet("stats13");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("xxx"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).total(), closeTo(600d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("yyy"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).total(), closeTo(1000d, 0.00001d));
}
}
@ -1651,12 +1653,12 @@ public class SimpleFacetsTests extends AbstractNodesTests {
TermsStatsFacet facet = searchResponse.facets().facet("stats1");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("100"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).min(), closeTo(100d, 0.00001d));
assertThat(facet.entries().get(0).max(), closeTo(200d, 0.00001d));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("200"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).min(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).max(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));
@ -1664,12 +1666,12 @@ public class SimpleFacetsTests extends AbstractNodesTests {
facet = searchResponse.facets().facet("stats2");
assertThat(facet.entries().size(), equalTo(2));
assertThat(facet.entries().get(0).term(), equalTo("100.1"));
assertThat(facet.entries().get(0).count(), equalTo(2));
assertThat(facet.entries().get(0).count(), equalTo(2l));
assertThat(facet.entries().get(0).min(), closeTo(100d, 0.00001d));
assertThat(facet.entries().get(0).max(), closeTo(200d, 0.00001d));
assertThat(facet.entries().get(0).total(), closeTo(300d, 0.00001d));
assertThat(facet.entries().get(1).term(), equalTo("200.2"));
assertThat(facet.entries().get(1).count(), equalTo(1));
assertThat(facet.entries().get(1).count(), equalTo(1l));
assertThat(facet.entries().get(1).min(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).max(), closeTo(500d, 0.00001d));
assertThat(facet.entries().get(1).total(), closeTo(500d, 0.00001d));