improve terms stats facet internal data structure and sorting

This commit is contained in:
kimchy 2011-02-23 05:04:45 +02:00
parent 2845cbefaa
commit 9cccfc3bd3
10 changed files with 329 additions and 85 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.trove;
import org.elasticsearch.common.trove.map.TDoubleObjectMap;
import org.elasticsearch.common.trove.map.hash.TDoubleObjectHashMap;
public class ExtTDoubleObjectHashMap<V> extends TDoubleObjectHashMap<V> {
public ExtTDoubleObjectHashMap() {
}
public ExtTDoubleObjectHashMap(int initialCapacity) {
super(initialCapacity);
}
public ExtTDoubleObjectHashMap(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}
public ExtTDoubleObjectHashMap(int initialCapacity, float loadFactor, double noEntryKey) {
super(initialCapacity, loadFactor, noEntryKey);
}
public ExtTDoubleObjectHashMap(TDoubleObjectMap<V> vtDoubleObjectMap) {
super(vtDoubleObjectMap);
}
/**
* Internal method to get the actual values associated. Some values might have "null" or no entry
* values.
*/
public Object[] internalValues() {
return this._values;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.trove;
import org.elasticsearch.common.trove.map.hash.THashMap;
import java.util.Map;
public class ExtTHashMap<K, V> extends THashMap<K, V> {
public ExtTHashMap() {
}
public ExtTHashMap(int initialCapacity) {
super(initialCapacity);
}
public ExtTHashMap(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}
public ExtTHashMap(Map<K, V> kvMap) {
super(kvMap);
}
public ExtTHashMap(THashMap<K, V> kvtHashMap) {
super(kvtHashMap);
}
/**
* Internal method to get the actual values associated. Some values might have "null" or no entry
* values.
*/
public Object[] internalValues() {
return this._values;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.trove;
import org.elasticsearch.common.trove.map.TLongObjectMap;
import org.elasticsearch.common.trove.map.hash.TLongObjectHashMap;
public class ExtTLongObjectHashMap<V> extends TLongObjectHashMap<V> {
public ExtTLongObjectHashMap() {
}
public ExtTLongObjectHashMap(int initialCapacity) {
super(initialCapacity);
}
public ExtTLongObjectHashMap(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}
public ExtTLongObjectHashMap(int initialCapacity, float loadFactor, long noEntryKey) {
super(initialCapacity, loadFactor, noEntryKey);
}
public ExtTLongObjectHashMap(TLongObjectMap<V> vtLongObjectMap) {
super(vtLongObjectMap);
}
/**
* Internal method to get the actual values associated. Some values might have "null" or no entry
* values.
*/
public Object[] internalValues() {
return this._values;
}
}

View File

@ -60,6 +60,13 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
COUNT((byte) 0, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
int i = o2.count() - o1.count();
if (i == 0) {
i = o2.term().compareTo(o1.term());
@ -76,6 +83,13 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
REVERSE_COUNT((byte) 1, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
return -COUNT.comparator().compare(o1, o2);
}
}),
@ -85,6 +99,13 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
TERM((byte) 2, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
int i = o1.compareTo(o2);
if (i == 0) {
i = COUNT.comparator().compare(o1, o2);
@ -98,12 +119,26 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
REVERSE_TERM((byte) 3, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
return -TERM.comparator().compare(o1, o2);
}
}),
TOTAL((byte) 4, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
if (o2.total() < o1.total()) {
return -1;
} else if (o2.total() == o1.total()) {
@ -116,6 +151,13 @@ public interface TermsStatsFacet extends Facet, Iterable<TermsStatsFacet.Entry>
REVERSE_TOTAL((byte) 5, new Comparator<Entry>() {
@Override public int compare(Entry o1, Entry o2) {
// push nulls to the end
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
return -TOTAL.comparator().compare(o1, o2);
}
});

View File

@ -19,12 +19,11 @@
package org.elasticsearch.search.facet.termsstats.doubles;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.THashMap;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.facet.Facet;
@ -176,9 +175,9 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, DoubleEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, DoubleEntry>>>() {
@Override protected ThreadLocals.CleanableValue<THashMap<String, DoubleEntry>> initialValue() {
return new ThreadLocals.CleanableValue<THashMap<String, DoubleEntry>>(new THashMap<String, DoubleEntry>());
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTDoubleObjectHashMap<DoubleEntry>>(new ExtTDoubleObjectHashMap<DoubleEntry>());
}
};
@ -195,31 +194,39 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
THashMap<String, DoubleEntry> map = aggregateCache.get().get();
ExtTDoubleObjectHashMap<DoubleEntry> map = aggregateCache.get().get();
map.clear();
for (Facet facet : facets) {
InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facet;
missing += tsFacet.missing;
for (Entry entry : tsFacet) {
DoubleEntry doubleEntry = (DoubleEntry) entry;
DoubleEntry current = map.get(doubleEntry.term());
DoubleEntry current = map.get(doubleEntry.term);
if (current != null) {
current.count += doubleEntry.count;
current.total += doubleEntry.total;
} else {
map.put(doubleEntry.term(), doubleEntry);
map.put(doubleEntry.term, doubleEntry);
}
}
}
// sort
if (requiredSize == 0) { // all terms
DoubleEntry[] entries1 = map.values().toArray(new DoubleEntry[map.size()]);
DoubleEntry[] entries1 = map.values(new DoubleEntry[map.size()]);
Arrays.sort(entries1, comparatorType.comparator());
return new InternalTermsStatsDoubleFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
TreeSet<DoubleEntry> ordered = new BoundedTreeSet<DoubleEntry>(comparatorType.comparator(), requiredSize);
ordered.addAll(map.values());
Object[] values = map.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<DoubleEntry> ordered = new ArrayList<DoubleEntry>();
for (int i = 0; i < requiredSize; i++) {
DoubleEntry value = (DoubleEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
return new InternalTermsStatsDoubleFacet(name, comparatorType, requiredSize, ordered, missing);
}
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.termsstats.doubles;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.TDoubleObjectHashMap;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.NumericFieldData;
@ -38,10 +38,7 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.TreeSet;
import java.util.*;
public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
@ -69,7 +66,7 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
private int missing = 0;
private final TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> entries;
private final ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> entries;
public TermsStatsDoubleFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -164,8 +161,18 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
return new InternalTermsStatsDoubleFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.valueCollection(), missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
TreeSet<InternalTermsStatsDoubleFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalTermsStatsDoubleFacet.DoubleEntry>(comparatorType.comparator(), size * numberOfShards);
ordered.addAll(entries.valueCollection());
Object[] values = entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
int limit = size * numberOfShards;
List<InternalTermsStatsDoubleFacet.DoubleEntry> ordered = Lists.newArrayList();
for (int i = 0; i < limit; i++) {
InternalTermsStatsDoubleFacet.DoubleEntry value = (InternalTermsStatsDoubleFacet.DoubleEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
@ -174,27 +181,27 @@ public class TermsStatsDoubleFacetCollector extends AbstractFacetCollector {
}
static TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> popFacets() {
Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
static ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> popFacets() {
Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>());
deque.add(new ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>());
}
TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets = deque.pollFirst();
ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets) {
static void pushFacets(ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry> facets) {
facets.clear();
Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>(new ArrayDeque<TDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>());
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>>(new ArrayDeque<ExtTDoubleObjectHashMap<InternalTermsStatsDoubleFacet.DoubleEntry>>());
}
};
}

View File

@ -19,12 +19,11 @@
package org.elasticsearch.search.facet.termsstats.longs;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.THashMap;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.facet.Facet;
@ -176,9 +175,9 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, LongEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, LongEntry>>>() {
@Override protected ThreadLocals.CleanableValue<THashMap<String, LongEntry>> initialValue() {
return new ThreadLocals.CleanableValue<THashMap<String, LongEntry>>(new THashMap<String, LongEntry>());
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTLongObjectHashMap<LongEntry>>(new ExtTLongObjectHashMap<LongEntry>());
}
};
@ -195,31 +194,39 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
THashMap<String, LongEntry> map = aggregateCache.get().get();
ExtTLongObjectHashMap<LongEntry> map = aggregateCache.get().get();
map.clear();
for (Facet facet : facets) {
InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facet;
missing += tsFacet.missing;
for (Entry entry : tsFacet) {
LongEntry longEntry = (LongEntry) entry;
LongEntry current = map.get(longEntry.term());
LongEntry current = map.get(longEntry.term);
if (current != null) {
current.count += longEntry.count;
current.total += longEntry.total;
} else {
map.put(longEntry.term(), longEntry);
map.put(longEntry.term, longEntry);
}
}
}
// sort
if (requiredSize == 0) { // all terms
LongEntry[] entries1 = map.values().toArray(new LongEntry[map.size()]);
LongEntry[] entries1 = map.values(new LongEntry[map.size()]);
Arrays.sort(entries1, comparatorType.comparator());
return new InternalTermsStatsLongFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
TreeSet<LongEntry> ordered = new BoundedTreeSet<LongEntry>(comparatorType.comparator(), requiredSize);
ordered.addAll(map.values());
Object[] values = map.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<LongEntry> ordered = new ArrayList<LongEntry>();
for (int i = 0; i < requiredSize; i++) {
LongEntry value = (LongEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
return new InternalTermsStatsLongFacet(name, comparatorType, requiredSize, ordered, missing);
}
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.termsstats.longs;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.TLongObjectHashMap;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.NumericFieldData;
@ -38,10 +38,7 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.TreeSet;
import java.util.*;
public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
@ -69,7 +66,7 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
private int missing = 0;
private final TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> entries;
private final ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> entries;
public TermsStatsLongFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -163,10 +160,20 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
// all terms, just return the collection, we will sort it on the way back
return new InternalTermsStatsLongFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.valueCollection(), missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
TreeSet<InternalTermsStatsLongFacet.LongEntry> ordered = new BoundedTreeSet<InternalTermsStatsLongFacet.LongEntry>(comparatorType.comparator(), size * numberOfShards);
ordered.addAll(entries.valueCollection());
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
Object[] values = entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
int limit = size * numberOfShards;
List<InternalTermsStatsLongFacet.LongEntry> ordered = Lists.newArrayList();
for (int i = 0; i < limit; i++) {
InternalTermsStatsLongFacet.LongEntry value = (InternalTermsStatsLongFacet.LongEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
pushFacets(entries);
@ -174,27 +181,27 @@ public class TermsStatsLongFacetCollector extends AbstractFacetCollector {
}
static TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> popFacets() {
Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
static ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> popFacets() {
Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>());
deque.add(new ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>());
}
TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets = deque.pollFirst();
ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets) {
static void pushFacets(ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry> facets) {
facets.clear();
Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>(new ArrayDeque<TLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>());
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>>(new ArrayDeque<ExtTLongObjectHashMap<InternalTermsStatsLongFacet.LongEntry>>());
}
};
}

View File

@ -19,12 +19,11 @@
package org.elasticsearch.search.facet.termsstats.strings;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.THashMap;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.facet.Facet;
@ -175,9 +174,9 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, StringEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<THashMap<String, StringEntry>>>() {
@Override protected ThreadLocals.CleanableValue<THashMap<String, StringEntry>> initialValue() {
return new ThreadLocals.CleanableValue<THashMap<String, StringEntry>>(new THashMap<String, StringEntry>());
private static ThreadLocal<ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>>() {
@Override protected ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>> initialValue() {
return new ThreadLocals.CleanableValue<ExtTHashMap<String, StringEntry>>(new ExtTHashMap<String, StringEntry>());
}
};
@ -194,7 +193,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
return facets.get(0);
}
int missing = 0;
THashMap<String, StringEntry> map = aggregateCache.get().get();
ExtTHashMap<String, StringEntry> map = aggregateCache.get().get();
map.clear();
for (Facet facet : facets) {
InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facet;
@ -217,8 +216,16 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet {
Arrays.sort(entries1, comparatorType.comparator());
return new InternalTermsStatsStringFacet(name, comparatorType, requiredSize, Arrays.asList(entries1), missing);
} else {
TreeSet<StringEntry> ordered = new BoundedTreeSet<StringEntry>(comparatorType.comparator(), requiredSize);
ordered.addAll(map.values());
Object[] values = map.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<StringEntry> ordered = new ArrayList<StringEntry>();
for (int i = 0; i < requiredSize; i++) {
StringEntry value = (StringEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
return new InternalTermsStatsStringFacet(name, comparatorType, requiredSize, ordered, missing);
}
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.termsstats.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.map.hash.THashMap;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
@ -39,10 +39,7 @@ import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.TreeSet;
import java.util.*;
public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
@ -70,7 +67,7 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
private int missing = 0;
private final THashMap<String, InternalTermsStatsStringFacet.StringEntry> entries;
private final ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> entries;
public TermsStatsStringFacetCollector(String facetName, String keyFieldName, String valueFieldName, int size, TermsStatsFacet.ComparatorType comparatorType,
SearchContext context, String scriptLang, String script, Map<String, Object> params) {
@ -165,8 +162,18 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
return new InternalTermsStatsStringFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.values(), missing);
}
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
TreeSet<InternalTermsStatsStringFacet.StringEntry> ordered = new BoundedTreeSet<InternalTermsStatsStringFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
ordered.addAll(entries.values());
Object[] values = entries.internalValues();
Arrays.sort(values, (Comparator) comparatorType.comparator());
List<InternalTermsStatsStringFacet.StringEntry> ordered = Lists.newArrayList();
int limit = size * numberOfShards;
for (int i = 0; i < limit; i++) {
InternalTermsStatsStringFacet.StringEntry value = (InternalTermsStatsStringFacet.StringEntry) values[i];
if (value == null) {
break;
}
ordered.add(value);
}
// that's fine to push here, this thread will be released AFTER the entries have either been serialized
// or processed
@ -175,27 +182,27 @@ public class TermsStatsStringFacetCollector extends AbstractFacetCollector {
}
static THashMap<String, InternalTermsStatsStringFacet.StringEntry> popFacets() {
Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
static ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> popFacets() {
Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new THashMap<String, InternalTermsStatsStringFacet.StringEntry>());
deque.add(new ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>());
}
THashMap<String, InternalTermsStatsStringFacet.StringEntry> facets = deque.pollFirst();
ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(THashMap<String, InternalTermsStatsStringFacet.StringEntry> facets) {
static void pushFacets(ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry> facets) {
facets.clear();
Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}
static ThreadLocal<ThreadLocals.CleanableValue<Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>>>(new ArrayDeque<THashMap<String, InternalTermsStatsStringFacet.StringEntry>>());
static ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>>(new ArrayDeque<ExtTHashMap<String, InternalTermsStatsStringFacet.StringEntry>>());
}
};
}