Terms Facet: Performance improvements, closes #822.

This commit is contained in:
kimchy 2011-04-01 16:30:45 +03:00
parent fb8a389380
commit e4cbdfa05b
58 changed files with 2906 additions and 406 deletions

View File

@ -2,6 +2,7 @@
<dictionary name="kimchy">
<words>
<w>addr</w>
<w>aggregators</w>
<w>amazonaws</w>
<w>apis</w>
<w>appendable</w>
@ -121,6 +122,7 @@
<w>rackspace</w>
<w>rebalance</w>
<w>rebalancing</w>
<w>recycler</w>
<w>regex</w>
<w>reparse</w>
<w>reparsed</w>
@ -133,6 +135,7 @@
<w>scriptable</w>
<w>searchable</w>
<w>segs</w>
<w>sentinal</w>
<w>serializers</w>
<w>sigar</w>
<w>slurper</w>

View File

@ -61,17 +61,17 @@ public class TermsFacetSearchBenchmark {
Client client = clientNode.client();
long COUNT = SizeValue.parseSizeValue("5m").singles();
long COUNT = SizeValue.parseSizeValue("2m").singles();
int BATCH = 100;
int QUERY_WARMUP = 20;
int QUERY_COUNT = 200;
int NUMBER_OF_TERMS = 200;
int NUMBER_OF_MULTI_VALUE_TERMS = 5;
int NUMBER_OF_MULTI_VALUE_TERMS = 10;
int STRING_TERM_SIZE = 5;
long[] lValues = new long[NUMBER_OF_TERMS];
for (int i = 0; i < NUMBER_OF_TERMS; i++) {
lValues[i] = i;
lValues[i] = ThreadLocalRandom.current().nextLong();
}
String[] sValues = new String[NUMBER_OF_TERMS];
for (int i = 0; i < NUMBER_OF_TERMS; i++) {
@ -104,6 +104,12 @@ public class TermsFacetSearchBenchmark {
}
builder.endArray();
builder.startArray("lm_value");
for (int k = 0; k < NUMBER_OF_MULTI_VALUE_TERMS; k++) {
builder.value(lValues[ThreadLocalRandom.current().nextInt(sValues.length)]);
}
builder.endArray();
builder.endObject();
request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter))
@ -127,12 +133,14 @@ public class TermsFacetSearchBenchmark {
}
}
client.admin().indices().prepareRefresh().execute().actionGet();
System.out.println("--> Number of docs in index: " + client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().count());
COUNT = client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().count();
System.out.println("--> Number of docs in index: " + COUNT);
long totalQueryTime = 0;
// S_VALUE
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (s_value) ...");
// run just the child query, warm up first
@ -163,6 +171,40 @@ public class TermsFacetSearchBenchmark {
}
System.out.println("--> Terms Facet (s_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
// S_VALUE (Map)
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (s_value) ...");
// run just the child query, warm up first
for (int j = 0; j < QUERY_WARMUP; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("s_value").field("s_value").executionHint("map"))
.execute().actionGet();
if (j == 0) {
System.out.println("--> Loading (s_value) took: " + searchResponse.took());
}
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
}
System.out.println("--> Warmup (s_value) DONE");
totalQueryTime = 0;
for (int j = 0; j < QUERY_COUNT; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("s_value").field("s_value").executionHint("map"))
.execute().actionGet();
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
totalQueryTime += searchResponse.tookInMillis();
}
System.out.println("--> Terms Facet (map) (s_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
// L VALUE
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (l_value) ...");
@ -194,6 +236,8 @@ public class TermsFacetSearchBenchmark {
}
System.out.println("--> Terms Facet (l_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
// SM VALUE
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (sm_value) ...");
@ -226,6 +270,75 @@ public class TermsFacetSearchBenchmark {
}
System.out.println("--> Terms Facet (sm_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
// SM VALUE (map)
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (sm_value) ...");
// run just the child query, warm up first
for (int j = 0; j < QUERY_WARMUP; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("sm_value").field("sm_value").executionHint("map"))
.execute().actionGet();
if (j == 0) {
System.out.println("--> Loading (sm_value) took: " + searchResponse.took());
}
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
}
System.out.println("--> Warmup (sm_value) DONE");
totalQueryTime = 0;
for (int j = 0; j < QUERY_COUNT; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("sm_value").field("sm_value").executionHint("map"))
.execute().actionGet();
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
totalQueryTime += searchResponse.tookInMillis();
}
System.out.println("--> Terms Facet (map) (sm_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
// LM VALUE
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
System.out.println("--> Warmup (lm_value) ...");
// run just the child query, warm up first
for (int j = 0; j < QUERY_WARMUP; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("lm_value").field("lm_value"))
.execute().actionGet();
if (j == 0) {
System.out.println("--> Loading (lm_value) took: " + searchResponse.took());
}
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
}
System.out.println("--> Warmup (lm_value) DONE");
totalQueryTime = 0;
for (int j = 0; j < QUERY_COUNT; j++) {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("lm_value").field("lm_value"))
.execute().actionGet();
if (searchResponse.hits().totalHits() != COUNT) {
System.err.println("--> mismatch on hits");
}
totalQueryTime += searchResponse.tookInMillis();
}
System.out.println("--> Terms Facet (lm_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
clientNode.close();
node1.close();

View File

@ -46,6 +46,7 @@ import org.elasticsearch.client.transport.action.ClientTransportActionModule;
import org.elasticsearch.client.transport.support.InternalTransportClient;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Injector;
@ -229,6 +230,7 @@ public class TransportClient extends AbstractClient {
// ignore
}
CacheRecycler.clear();
ThreadLocals.clearReferencesThreadLocals();
}

View File

@ -0,0 +1,291 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.common.trove.map.hash.*;
import java.lang.ref.SoftReference;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
public class CacheRecycler {
public static void clear() {
intIntHashMap.remove();
floatIntHashMap.remove();
doubleIntHashMap.remove();
shortIntHashMap.remove();
longIntHashMap.remove();
objectIntHashMap.remove();
intArray.remove();
}
// ----- TIntIntHashMap ----
private static ThreadLocal<SoftReference<Deque<TIntIntHashMap>>> intIntHashMap = new ThreadLocal<SoftReference<Deque<TIntIntHashMap>>>();
public static TIntIntHashMap popIntIntMap() {
SoftReference<Deque<TIntIntHashMap>> ref = intIntHashMap.get();
Deque<TIntIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TIntIntHashMap>();
intIntHashMap.set(new SoftReference<Deque<TIntIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TIntIntHashMap();
}
TIntIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushIntIntMap(TIntIntHashMap map) {
SoftReference<Deque<TIntIntHashMap>> ref = intIntHashMap.get();
Deque<TIntIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TIntIntHashMap>();
intIntHashMap.set(new SoftReference<Deque<TIntIntHashMap>>(deque));
}
deque.add(map);
}
// ----- TFloatIntHashMap ---
private static ThreadLocal<SoftReference<Deque<TFloatIntHashMap>>> floatIntHashMap = new ThreadLocal<SoftReference<Deque<TFloatIntHashMap>>>();
public static TFloatIntHashMap popFloatIntMap() {
SoftReference<Deque<TFloatIntHashMap>> ref = floatIntHashMap.get();
Deque<TFloatIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TFloatIntHashMap>();
floatIntHashMap.set(new SoftReference<Deque<TFloatIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TFloatIntHashMap();
}
TFloatIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushFloatIntMap(TFloatIntHashMap map) {
SoftReference<Deque<TFloatIntHashMap>> ref = floatIntHashMap.get();
Deque<TFloatIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TFloatIntHashMap>();
floatIntHashMap.set(new SoftReference<Deque<TFloatIntHashMap>>(deque));
}
deque.add(map);
}
// ----- TDoubleIntHashMap ---
private static ThreadLocal<SoftReference<Deque<TDoubleIntHashMap>>> doubleIntHashMap = new ThreadLocal<SoftReference<Deque<TDoubleIntHashMap>>>();
public static TDoubleIntHashMap popDoubleIntMap() {
SoftReference<Deque<TDoubleIntHashMap>> ref = doubleIntHashMap.get();
Deque<TDoubleIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TDoubleIntHashMap>();
doubleIntHashMap.set(new SoftReference<Deque<TDoubleIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TDoubleIntHashMap();
}
TDoubleIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushDoubleIntMap(TDoubleIntHashMap map) {
SoftReference<Deque<TDoubleIntHashMap>> ref = doubleIntHashMap.get();
Deque<TDoubleIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TDoubleIntHashMap>();
doubleIntHashMap.set(new SoftReference<Deque<TDoubleIntHashMap>>(deque));
}
deque.add(map);
}
// ----- TByteIntHashMap ---
private static ThreadLocal<SoftReference<Deque<TByteIntHashMap>>> byteIntHashMap = new ThreadLocal<SoftReference<Deque<TByteIntHashMap>>>();
public static TByteIntHashMap popByteIntMap() {
SoftReference<Deque<TByteIntHashMap>> ref = byteIntHashMap.get();
Deque<TByteIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TByteIntHashMap>();
byteIntHashMap.set(new SoftReference<Deque<TByteIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TByteIntHashMap();
}
TByteIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushByteIntMap(TByteIntHashMap map) {
SoftReference<Deque<TByteIntHashMap>> ref = byteIntHashMap.get();
Deque<TByteIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TByteIntHashMap>();
byteIntHashMap.set(new SoftReference<Deque<TByteIntHashMap>>(deque));
}
deque.add(map);
}
// ----- TShortIntHashMap ---
private static ThreadLocal<SoftReference<Deque<TShortIntHashMap>>> shortIntHashMap = new ThreadLocal<SoftReference<Deque<TShortIntHashMap>>>();
public static TShortIntHashMap popShortIntMap() {
SoftReference<Deque<TShortIntHashMap>> ref = shortIntHashMap.get();
Deque<TShortIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TShortIntHashMap>();
shortIntHashMap.set(new SoftReference<Deque<TShortIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TShortIntHashMap();
}
TShortIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushShortIntMap(TShortIntHashMap map) {
SoftReference<Deque<TShortIntHashMap>> ref = shortIntHashMap.get();
Deque<TShortIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TShortIntHashMap>();
shortIntHashMap.set(new SoftReference<Deque<TShortIntHashMap>>(deque));
}
deque.add(map);
}
// ----- TLongIntHashMap ----
private static ThreadLocal<SoftReference<Deque<TLongIntHashMap>>> longIntHashMap = new ThreadLocal<SoftReference<Deque<TLongIntHashMap>>>();
public static TLongIntHashMap popLongIntMap() {
SoftReference<Deque<TLongIntHashMap>> ref = longIntHashMap.get();
Deque<TLongIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TLongIntHashMap>();
longIntHashMap.set(new SoftReference<Deque<TLongIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TLongIntHashMap();
}
TLongIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static void pushLongIntMap(TLongIntHashMap map) {
SoftReference<Deque<TLongIntHashMap>> ref = longIntHashMap.get();
Deque<TLongIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TLongIntHashMap>();
longIntHashMap.set(new SoftReference<Deque<TLongIntHashMap>>(deque));
}
deque.add(map);
}
// ------ TObjectIntHashMap -----
private static ThreadLocal<SoftReference<Deque<TObjectIntHashMap>>> objectIntHashMap = new ThreadLocal<SoftReference<Deque<TObjectIntHashMap>>>();
@SuppressWarnings({"unchecked"})
public static <T> TObjectIntHashMap<T> popObjectIntMap() {
SoftReference<Deque<TObjectIntHashMap>> ref = objectIntHashMap.get();
Deque<TObjectIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TObjectIntHashMap>();
objectIntHashMap.set(new SoftReference<Deque<TObjectIntHashMap>>(deque));
}
if (deque.isEmpty()) {
return new TObjectIntHashMap();
}
TObjectIntHashMap map = deque.pollFirst();
map.clear();
return map;
}
public static <T> void pushObjectIntMap(TObjectIntHashMap<T> map) {
SoftReference<Deque<TObjectIntHashMap>> ref = objectIntHashMap.get();
Deque<TObjectIntHashMap> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<TObjectIntHashMap>();
objectIntHashMap.set(new SoftReference<Deque<TObjectIntHashMap>>(deque));
}
deque.add(map);
}
// ----- int[] -----
private static ThreadLocal<SoftReference<Deque<int[]>>> intArray = new ThreadLocal<SoftReference<Deque<int[]>>>();
public static int[] popIntArray(int size) {
return popIntArray(size, 0);
}
public static int[] popIntArray(int size, int sentinal) {
SoftReference<Deque<int[]>> ref = intArray.get();
Deque<int[]> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<int[]>();
intArray.set(new SoftReference<Deque<int[]>>(deque));
}
if (deque.isEmpty()) {
return new int[size];
}
int[] ints = deque.pollFirst();
if (ints.length < size) {
return new int[size];
}
Arrays.fill(ints, sentinal);
return ints;
}
public static void pushIntArray(int[] ints) {
SoftReference<Deque<int[]>> ref = intArray.get();
Deque<int[]> deque = ref == null ? null : ref.get();
if (deque == null) {
deque = new ArrayDeque<int[]>();
intArray.set(new SoftReference<Deque<int[]>>(deque));
}
deque.add(ints);
}
}

View File

@ -95,6 +95,12 @@ public abstract class FieldData<Doc extends DocFieldData> {
void onMissing(int docId);
}
public abstract void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc);
public static interface OrdinalInDocProc {
void onOrdinal(int docId, int ordinal);
}
/**
* The type of this field data.
*/

View File

@ -47,6 +47,10 @@ public abstract class ByteFieldData extends NumericFieldData<ByteDocFieldData> {
return 1 * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final byte[] values() {
return this.values;
}
abstract public byte value(int docId);
abstract public byte[] values(int docId);

View File

@ -122,6 +122,12 @@ public class MultiValueByteFieldData extends ByteFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public double[] doubleValues(int docId) {
int length = 0;
for (int[] ordinal : ordinals) {

View File

@ -88,6 +88,10 @@ public class SingleValueByteFieldData extends ByteFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public byte value(int docId) {
return values[ordinals[docId]];
}

View File

@ -47,6 +47,10 @@ public abstract class DoubleFieldData extends NumericFieldData<DoubleDocFieldDat
return RamUsage.NUM_BYTES_DOUBLE * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final double[] values() {
return this.values;
}
abstract public double value(int docId);
abstract public double[] values(int docId);

View File

@ -111,6 +111,12 @@ public class MultiValueDoubleFieldData extends DoubleFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public double[] doubleValues(int docId) {
return values(docId);
}

View File

@ -81,6 +81,10 @@ public class SingleValueDoubleFieldData extends DoubleFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public double[] doubleValues(int docId) {
return values(docId);
}

View File

@ -47,6 +47,10 @@ public abstract class FloatFieldData extends NumericFieldData<FloatDocFieldData>
return RamUsage.NUM_BYTES_FLOAT * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final float[] values() {
return this.values;
}
abstract public float value(int docId);
abstract public float[] values(int docId);

View File

@ -122,6 +122,12 @@ public class MultiValueFloatFieldData extends FloatFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public double[] doubleValues(int docId) {
int length = 0;
for (int[] ordinal : ordinals) {

View File

@ -88,6 +88,10 @@ public class SingleValueFloatFieldData extends FloatFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public double[] doubleValues(int docId) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -47,6 +47,10 @@ public abstract class IntFieldData extends NumericFieldData<IntDocFieldData> {
return RamUsage.NUM_BYTES_INT * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final int[] values() {
return this.values;
}
abstract public int value(int docId);
abstract public int[] values(int docId);

View File

@ -122,6 +122,12 @@ public class MultiValueIntFieldData extends IntFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public double[] doubleValues(int docId) {
int length = 0;
for (int[] ordinal : ordinals) {

View File

@ -88,6 +88,10 @@ public class SingleValueIntFieldData extends IntFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public double[] doubleValues(int docId) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -57,6 +57,10 @@ public abstract class LongFieldData extends NumericFieldData<LongDocFieldData> {
return RamUsage.NUM_BYTES_LONG * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final long[] values() {
return this.values;
}
abstract public long value(int docId);
abstract public long[] values(int docId);

View File

@ -124,6 +124,12 @@ public class MultiValueLongFieldData extends LongFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
boolean found = false;
for (int[] ordinal : ordinals) {

View File

@ -89,6 +89,10 @@ public class SingleValueLongFieldData extends LongFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -122,6 +122,12 @@ public class MultiValueShortFieldData extends ShortFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public double[] doubleValues(int docId) {
int length = 0;
for (int[] ordinal : ordinals) {

View File

@ -47,6 +47,10 @@ public abstract class ShortFieldData extends NumericFieldData<ShortDocFieldData>
return RamUsage.NUM_BYTES_SHORT * values.length + RamUsage.NUM_BYTES_ARRAY_HEADER;
}
public final short[] values() {
return this.values;
}
abstract public short value(int docId);
abstract public short[] values(int docId);

View File

@ -88,6 +88,10 @@ public class SingleValueShortFieldData extends ShortFieldData {
proc.onValue(docId, values[loc]);
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public short value(int docId) {
return values[ordinals[docId]];
}

View File

@ -84,6 +84,12 @@ public class MultiValueStringFieldData extends StringFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public String value(int docId) {
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];

View File

@ -51,8 +51,8 @@ public class SingleValueStringFieldData extends StringFieldData {
return ordinals;
}
String[] values() {
return this.values;
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public boolean multiValued() {

View File

@ -50,6 +50,10 @@ public abstract class StringFieldData extends FieldData<StringDocFieldData> {
return size;
}
public String[] values() {
return this.values;
}
abstract public String value(int docId);
abstract public String[] values(int docId);

View File

@ -108,6 +108,12 @@ public class MultiValueGeoPointFieldData extends GeoPointFieldData {
}
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
for (int[] ordinal : ordinals) {
proc.onOrdinal(docId, ordinal[docId]);
}
}
@Override public GeoPoint value(int docId) {
for (int[] ordinal : ordinals) {
int loc = ordinal[docId];

View File

@ -80,6 +80,10 @@ public class SingleValueGeoPointFieldData extends GeoPointFieldData {
proc.onValue(docId, GeoHashUtils.encode(lat[loc], lon[loc]));
}
@Override public void forEachOrdinalInDoc(int docId, OrdinalInDocProc proc) {
proc.onOrdinal(docId, ordinals[docId]);
}
@Override public GeoPoint value(int docId) {
int loc = ordinals[docId];
if (loc == 0) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
@ -302,6 +303,7 @@ public final class InternalNode implements Node {
}
stopWatch.stop();
CacheRecycler.clear();
ThreadLocals.clearReferencesThreadLocals();
if (logger.isTraceEnabled()) {

View File

@ -46,6 +46,7 @@ public class TermsFacetBuilder extends AbstractFacetBuilder {
private String script;
private String lang;
private Map<String, Object> params;
String executionHint;
/**
* Construct a new term facet with the provided facet name.
@ -163,6 +164,14 @@ public class TermsFacetBuilder extends AbstractFacetBuilder {
return this;
}
/**
* An execution hint to how the facet is computed.
*/
public TermsFacetBuilder executionHint(String executionHint) {
this.executionHint = executionHint;
return this;
}
/**
* A parameter that will be passed to the script.
*
@ -233,6 +242,10 @@ public class TermsFacetBuilder extends AbstractFacetBuilder {
}
}
if (executionHint != null) {
builder.field("execution_hint", executionHint);
}
builder.endObject();
addFilterFacetAndGlobal(builder, params);

View File

@ -33,16 +33,24 @@ import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetCollector;
import org.elasticsearch.search.facet.FacetProcessor;
import org.elasticsearch.search.facet.terms.bytes.TermsByteFacetCollector;
import org.elasticsearch.search.facet.terms.bytes.TermsByteOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.doubles.TermsDoubleFacetCollector;
import org.elasticsearch.search.facet.terms.doubles.TermsDoubleOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.floats.TermsFloatFacetCollector;
import org.elasticsearch.search.facet.terms.floats.TermsFloatOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.index.IndexNameFacetCollector;
import org.elasticsearch.search.facet.terms.ints.TermsIntFacetCollector;
import org.elasticsearch.search.facet.terms.ints.TermsIntOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.ip.TermsIpFacetCollector;
import org.elasticsearch.search.facet.terms.ip.TermsIpOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.longs.TermsLongFacetCollector;
import org.elasticsearch.search.facet.terms.longs.TermsLongOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.shorts.TermsShortFacetCollector;
import org.elasticsearch.search.facet.terms.shorts.TermsShortOrdinalsFacetCollector;
import org.elasticsearch.search.facet.terms.strings.FieldsTermsStringFacetCollector;
import org.elasticsearch.search.facet.terms.strings.ScriptTermsStringFieldFacetCollector;
import org.elasticsearch.search.facet.terms.strings.TermsStringFacetCollector;
import org.elasticsearch.search.facet.terms.strings.TermsStringOrdinalsFacetCollector;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -77,6 +85,7 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
String script = null;
Map<String, Object> params = null;
boolean allTerms = false;
String executionHint = null;
String currentFieldName = null;
XContentParser.Token token;
@ -120,6 +129,8 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
script = parser.text();
} else if ("lang".equals(currentFieldName)) {
scriptLang = parser.text();
} else if ("execution_hint".equals(currentFieldName) || "executionHint".equals(currentFieldName)) {
executionHint = parser.textOrNull();
}
}
}
@ -142,19 +153,51 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
FieldMapper fieldMapper = context.mapperService().smartNameFieldMapper(field);
if (fieldMapper != null) {
if (fieldMapper instanceof IpFieldMapper) {
return new TermsIpFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsIpFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params);
} else {
return new TermsIpOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, null);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) {
return new TermsLongFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsLongFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsLongOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.DOUBLE) {
return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null) {
return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsDoubleOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.INT) {
return new TermsIntFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsIntFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsIntOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.FLOAT) {
return new TermsFloatFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsFloatFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsFloatOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.SHORT) {
return new TermsShortFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsShortFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsShortOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.BYTE) {
return new TermsByteFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
if (script != null || "map".equals(executionHint)) {
return new TermsByteFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, scriptLang, script, params);
} else {
return new TermsByteOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.STRING) {
if (script == null && pattern == null && !"map".equals(executionHint)) {
return new TermsStringOrdinalsFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded);
}
}
}
return new TermsStringFacetCollector(facetName, field, size, comparatorType, allTerms, context, excluded, pattern, scriptLang, script, params);

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.bytes;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TByteIntIterator;
import org.elasticsearch.common.trove.map.hash.TByteIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -167,19 +167,12 @@ public class InternalByteTermsFacet extends InternalTermsFacet {
}
private static ThreadLocal<ThreadLocals.CleanableValue<TByteIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TByteIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TByteIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TByteIntHashMap>(new TByteIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalByteTermsFacet first = (InternalByteTermsFacet) facets.get(0);
TByteIntHashMap aggregated = aggregateCache.get().get();
TByteIntHashMap aggregated = CacheRecycler.popByteIntMap();
aggregated.clear();
long missing = 0;
@ -198,6 +191,9 @@ public class InternalByteTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushByteIntMap(aggregated);
return first;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.terms.bytes;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TByteIntIterator;
import org.elasticsearch.common.trove.map.hash.TByteIntHashMap;
import org.elasticsearch.common.trove.set.hash.TByteHashSet;
@ -38,11 +38,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@ -51,12 +51,6 @@ import java.util.Set;
*/
public class TermsByteFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TByteIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TByteIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TByteIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TByteIntHashMap>>(new ArrayDeque<TByteIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -107,9 +101,9 @@ public class TermsByteFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popByteIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popByteIntMap(), excluded, this.script);
}
if (allTerms) {
@ -144,35 +138,30 @@ public class TermsByteFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TByteIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushByteIntMap(facets);
return new InternalByteTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalByteTermsFacet.ByteEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalByteTermsFacet.ByteEntry> ordered = new BoundedTreeSet<InternalByteTermsFacet.ByteEntry>(comparatorType.comparator(), size * numberOfShards);
for (TByteIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalByteTermsFacet.ByteEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TByteIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalByteTermsFacet.ByteEntry(it.key(), it.value()));
}
InternalByteTermsFacet.ByteEntry[] list = new InternalByteTermsFacet.ByteEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalByteTermsFacet.ByteEntry) ordered.pop();
}
CacheRecycler.pushByteIntMap(facets);
return new InternalByteTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalByteTermsFacet.ByteEntry> ordered = new BoundedTreeSet<InternalByteTermsFacet.ByteEntry>(comparatorType.comparator(), size);
for (TByteIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalByteTermsFacet.ByteEntry(it.key(), it.value()));
}
CacheRecycler.pushByteIntMap(facets);
return new InternalByteTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalByteTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TByteIntHashMap popFacets() {
Deque<TByteIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TByteIntHashMap());
}
TByteIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TByteIntHashMap facets) {
facets.clear();
Deque<TByteIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.bytes;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TByteHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.bytes.ByteFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsByteOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private ByteFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TByteHashSet excluded;
public TermsByteOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms byte facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.BYTE) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of byte type, can't run terms byte facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TByteHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Byte.parseByte(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (ByteFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
byte value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalByteTermsFacet.ByteEntry entry = new InternalByteTermsFacet.ByteEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalByteTermsFacet.ByteEntry[] list = new InternalByteTermsFacet.ByteEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalByteTermsFacet.ByteEntry) ordered.pop();
}
return new InternalByteTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalByteTermsFacet.ByteEntry> ordered = new BoundedTreeSet<InternalByteTermsFacet.ByteEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
byte value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalByteTermsFacet.ByteEntry entry = new InternalByteTermsFacet.ByteEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalByteTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final byte[] values;
final int[] counts;
int position = 0;
byte current;
public ReaderAggregator(ByteFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.doubles;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TDoubleIntIterator;
import org.elasticsearch.common.trove.map.hash.TDoubleIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -169,19 +169,12 @@ public class InternalDoubleTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TDoubleIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TDoubleIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TDoubleIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TDoubleIntHashMap>(new TDoubleIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalDoubleTermsFacet first = (InternalDoubleTermsFacet) facets.get(0);
TDoubleIntHashMap aggregated = aggregateCache.get().get();
TDoubleIntHashMap aggregated = CacheRecycler.popDoubleIntMap();
aggregated.clear();
long missing = 0;
for (Facet facet : facets) {
@ -199,6 +192,9 @@ public class InternalDoubleTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushDoubleIntMap(aggregated);
return first;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.terms.doubles;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TDoubleIntIterator;
import org.elasticsearch.common.trove.map.hash.TDoubleIntHashMap;
import org.elasticsearch.common.trove.set.hash.TDoubleHashSet;
@ -38,11 +38,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@ -51,12 +51,6 @@ import java.util.Set;
*/
public class TermsDoubleFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>(new ArrayDeque<TDoubleIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -107,9 +101,9 @@ public class TermsDoubleFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popDoubleIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popDoubleIntMap(), excluded, this.script);
}
if (allTerms) {
@ -144,35 +138,30 @@ public class TermsDoubleFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TDoubleIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushDoubleIntMap(facets);
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalDoubleTermsFacet.DoubleEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size * numberOfShards);
for (TDoubleIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TDoubleIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
}
InternalDoubleTermsFacet.DoubleEntry[] list = new InternalDoubleTermsFacet.DoubleEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalDoubleTermsFacet.DoubleEntry) ordered.pop();
}
CacheRecycler.pushDoubleIntMap(facets);
return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size);
for (TDoubleIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
}
CacheRecycler.pushDoubleIntMap(facets);
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TDoubleIntHashMap popFacets() {
Deque<TDoubleIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TDoubleIntHashMap());
}
TDoubleIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TDoubleIntHashMap facets) {
facets.clear();
Deque<TDoubleIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.doubles;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TDoubleHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsDoubleOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private DoubleFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TDoubleHashSet excluded;
public TermsDoubleOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms double facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.DOUBLE) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of double type, can't run terms double facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TDoubleHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Double.parseDouble(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (DoubleFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
double value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalDoubleTermsFacet.DoubleEntry entry = new InternalDoubleTermsFacet.DoubleEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalDoubleTermsFacet.DoubleEntry[] list = new InternalDoubleTermsFacet.DoubleEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalDoubleTermsFacet.DoubleEntry) ordered.pop();
}
return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
double value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalDoubleTermsFacet.DoubleEntry entry = new InternalDoubleTermsFacet.DoubleEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final double[] values;
final int[] counts;
int position = 0;
double current;
public ReaderAggregator(DoubleFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.floats;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TFloatIntIterator;
import org.elasticsearch.common.trove.map.hash.TFloatIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -169,19 +169,12 @@ public class InternalFloatTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TFloatIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TFloatIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TFloatIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TFloatIntHashMap>(new TFloatIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalFloatTermsFacet first = (InternalFloatTermsFacet) facets.get(0);
TFloatIntHashMap aggregated = aggregateCache.get().get();
TFloatIntHashMap aggregated = CacheRecycler.popFloatIntMap();
aggregated.clear();
long missing = 0;
@ -200,6 +193,9 @@ public class InternalFloatTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushFloatIntMap(aggregated);
return first;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.terms.floats;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TFloatIntIterator;
import org.elasticsearch.common.trove.map.hash.TFloatIntHashMap;
import org.elasticsearch.common.trove.set.hash.TFloatHashSet;
@ -38,11 +38,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@ -51,12 +51,6 @@ import java.util.Set;
*/
public class TermsFloatFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>(new ArrayDeque<TFloatIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -107,9 +101,9 @@ public class TermsFloatFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popFloatIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popFloatIntMap(), excluded, this.script);
}
if (allTerms) {
@ -144,35 +138,30 @@ public class TermsFloatFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TFloatIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushFloatIntMap(facets);
return new InternalFloatTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalFloatTermsFacet.FloatEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalFloatTermsFacet.FloatEntry> ordered = new BoundedTreeSet<InternalFloatTermsFacet.FloatEntry>(comparatorType.comparator(), size * numberOfShards);
for (TFloatIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalFloatTermsFacet.FloatEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TFloatIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalFloatTermsFacet.FloatEntry(it.key(), it.value()));
}
InternalFloatTermsFacet.FloatEntry[] list = new InternalFloatTermsFacet.FloatEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalFloatTermsFacet.FloatEntry) ordered.pop();
}
CacheRecycler.pushFloatIntMap(facets);
return new InternalFloatTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalFloatTermsFacet.FloatEntry> ordered = new BoundedTreeSet<InternalFloatTermsFacet.FloatEntry>(comparatorType.comparator(), size);
for (TFloatIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalFloatTermsFacet.FloatEntry(it.key(), it.value()));
}
CacheRecycler.pushFloatIntMap(facets);
return new InternalFloatTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalFloatTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TFloatIntHashMap popFacets() {
Deque<TFloatIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TFloatIntHashMap());
}
TFloatIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TFloatIntHashMap facets) {
facets.clear();
Deque<TFloatIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.floats;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TFloatHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.floats.FloatFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsFloatOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private FloatFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TFloatHashSet excluded;
public TermsFloatOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms float facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.FLOAT) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of float type, can't run terms float facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TFloatHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Float.parseFloat(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (FloatFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
float value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalFloatTermsFacet.FloatEntry entry = new InternalFloatTermsFacet.FloatEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalFloatTermsFacet.FloatEntry[] list = new InternalFloatTermsFacet.FloatEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalFloatTermsFacet.FloatEntry) ordered.pop();
}
return new InternalFloatTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalFloatTermsFacet.FloatEntry> ordered = new BoundedTreeSet<InternalFloatTermsFacet.FloatEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
float value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalFloatTermsFacet.FloatEntry entry = new InternalFloatTermsFacet.FloatEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalFloatTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final float[] values;
final int[] counts;
int position = 0;
float current;
public ReaderAggregator(FloatFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.ints;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TIntIntIterator;
import org.elasticsearch.common.trove.map.hash.TIntIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -166,19 +166,12 @@ public class InternalIntTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TIntIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TIntIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TIntIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TIntIntHashMap>(new TIntIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalIntTermsFacet first = (InternalIntTermsFacet) facets.get(0);
TIntIntHashMap aggregated = aggregateCache.get().get();
TIntIntHashMap aggregated = CacheRecycler.popIntIntMap();
aggregated.clear();
long missing = 0;
@ -197,6 +190,9 @@ public class InternalIntTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushIntIntMap(aggregated);
return first;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.terms.ints;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TIntIntIterator;
import org.elasticsearch.common.trove.map.hash.TIntIntHashMap;
import org.elasticsearch.common.trove.set.hash.TIntHashSet;
@ -38,11 +38,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@ -51,12 +51,6 @@ import java.util.Set;
*/
public class TermsIntFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TIntIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>(new ArrayDeque<TIntIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -107,9 +101,9 @@ public class TermsIntFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popIntIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popIntIntMap(), excluded, this.script);
}
if (allTerms) {
@ -144,35 +138,30 @@ public class TermsIntFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TIntIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushIntIntMap(facets);
return new InternalIntTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalIntTermsFacet.IntEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalIntTermsFacet.IntEntry> ordered = new BoundedTreeSet<InternalIntTermsFacet.IntEntry>(comparatorType.comparator(), size * numberOfShards);
for (TIntIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalIntTermsFacet.IntEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TIntIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalIntTermsFacet.IntEntry(it.key(), it.value()));
}
InternalIntTermsFacet.IntEntry[] list = new InternalIntTermsFacet.IntEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalIntTermsFacet.IntEntry) ordered.pop();
}
CacheRecycler.pushIntIntMap(facets);
return new InternalIntTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalIntTermsFacet.IntEntry> ordered = new BoundedTreeSet<InternalIntTermsFacet.IntEntry>(comparatorType.comparator(), size);
for (TIntIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalIntTermsFacet.IntEntry(it.key(), it.value()));
}
CacheRecycler.pushIntIntMap(facets);
return new InternalIntTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalIntTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TIntIntHashMap popFacets() {
Deque<TIntIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TIntIntHashMap());
}
TIntIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TIntIntHashMap facets) {
facets.clear();
Deque<TIntIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.ints;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TIntHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.ints.IntFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsIntOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private IntFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TIntHashSet excluded;
public TermsIntOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms int facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.INT) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of int type, can't run terms int facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TIntHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Integer.parseInt(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (IntFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
int value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalIntTermsFacet.IntEntry entry = new InternalIntTermsFacet.IntEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalIntTermsFacet.IntEntry[] list = new InternalIntTermsFacet.IntEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalIntTermsFacet.IntEntry) ordered.pop();
}
return new InternalIntTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalIntTermsFacet.IntEntry> ordered = new BoundedTreeSet<InternalIntTermsFacet.IntEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
int value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalIntTermsFacet.IntEntry entry = new InternalIntTermsFacet.IntEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalIntTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final int[] values;
final int[] counts;
int position = 0;
int current;
public ReaderAggregator(IntFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.ip;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TLongIntIterator;
import org.elasticsearch.common.trove.map.hash.TLongIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -170,19 +170,12 @@ public class InternalIpTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TLongIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TLongIntHashMap>(new TLongIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalIpTermsFacet first = (InternalIpTermsFacet) facets.get(0);
TLongIntHashMap aggregated = aggregateCache.get().get();
TLongIntHashMap aggregated = CacheRecycler.popLongIntMap();
aggregated.clear();
long missing = 0;
@ -201,6 +194,9 @@ public class InternalIpTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushLongIntMap(aggregated);
return first;
}

View File

@ -22,9 +22,9 @@ package org.elasticsearch.search.facet.terms.ip;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TLongIntIterator;
import org.elasticsearch.common.trove.map.hash.TLongIntHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
@ -36,11 +36,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
/**
@ -48,13 +48,6 @@ import java.util.Map;
*/
public class TermsIpFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TLongIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>(new ArrayDeque<TLongIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -105,9 +98,9 @@ public class TermsIpFacetCollector extends AbstractFacetCollector {
}
if (this.script == null) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popLongIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popLongIntMap(), this.script);
}
if (allTerms) {
@ -142,35 +135,30 @@ public class TermsIpFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TLongIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushLongIntMap(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalIpTermsFacet.LongEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalIpTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalIpTermsFacet.LongEntry>(comparatorType.comparator(), size * numberOfShards);
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalIpTermsFacet.LongEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalIpTermsFacet.LongEntry(it.key(), it.value()));
}
InternalIpTermsFacet.LongEntry[] list = new InternalIpTermsFacet.LongEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalIpTermsFacet.LongEntry) ordered.pop();
}
CacheRecycler.pushLongIntMap(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalIpTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalIpTermsFacet.LongEntry>(comparatorType.comparator(), size);
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalIpTermsFacet.LongEntry(it.key(), it.value()));
}
CacheRecycler.pushLongIntMap(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TLongIntHashMap popFacets() {
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TLongIntHashMap());
}
TLongIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TLongIntHashMap facets) {
facets.clear();
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.ip;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TLongHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.longs.LongFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsIpOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private LongFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TLongHashSet excluded;
public TermsIpOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TLongHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Long.parseLong(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
long value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalIpTermsFacet.LongEntry entry = new InternalIpTermsFacet.LongEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalIpTermsFacet.LongEntry[] list = new InternalIpTermsFacet.LongEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalIpTermsFacet.LongEntry) ordered.pop();
}
return new InternalIpTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalIpTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalIpTermsFacet.LongEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
long value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalIpTermsFacet.LongEntry entry = new InternalIpTermsFacet.LongEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalIpTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final long[] values;
final int[] counts;
int position = 0;
long current = Integer.MIN_VALUE;
public ReaderAggregator(LongFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.longs;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TLongIntIterator;
import org.elasticsearch.common.trove.map.hash.TLongIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -169,20 +169,12 @@ public class InternalLongTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TLongIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TLongIntHashMap>(new TLongIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalLongTermsFacet first = (InternalLongTermsFacet) facets.get(0);
TLongIntHashMap aggregated = aggregateCache.get().get();
aggregated.clear();
TLongIntHashMap aggregated = CacheRecycler.popLongIntMap();
long missing = 0;
for (Facet facet : facets) {
@ -200,6 +192,9 @@ public class InternalLongTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushLongIntMap(aggregated);
return first;
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.facet.terms.longs;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
@ -38,13 +39,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* @author kimchy (shay.banon)
@ -108,9 +107,9 @@ public class TermsLongFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popLongIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popLongIntMap(), excluded, this.script);
}
if (allTerms) {
@ -145,35 +144,30 @@ public class TermsLongFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TLongIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushLongIntMap(facets);
return new InternalLongTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalLongTermsFacet.LongEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size * numberOfShards);
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
}
InternalLongTermsFacet.LongEntry[] list = new InternalLongTermsFacet.LongEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalLongTermsFacet.LongEntry) ordered.pop();
}
CacheRecycler.pushLongIntMap(facets);
return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size);
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
}
CacheRecycler.pushLongIntMap(facets);
return new InternalLongTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalLongTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TLongIntHashMap popFacets() {
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TLongIntHashMap());
}
TLongIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TLongIntHashMap facets) {
facets.clear();
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.longs;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TLongHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.longs.LongFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsLongOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private LongFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TLongHashSet excluded;
public TermsLongOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TLongHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Long.parseLong(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
long value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalLongTermsFacet.LongEntry entry = new InternalLongTermsFacet.LongEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalLongTermsFacet.LongEntry[] list = new InternalLongTermsFacet.LongEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalLongTermsFacet.LongEntry) ordered.pop();
}
return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
long value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalLongTermsFacet.LongEntry entry = new InternalLongTermsFacet.LongEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalLongTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final long[] values;
final int[] counts;
int position = 0;
long current = Integer.MIN_VALUE;
public ReaderAggregator(LongFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.search.facet.terms.shorts;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TShortIntIterator;
import org.elasticsearch.common.trove.map.hash.TShortIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -166,19 +166,12 @@ public class InternalShortTermsFacet extends InternalTermsFacet {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TShortIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TShortIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TShortIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TShortIntHashMap>(new TShortIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalShortTermsFacet first = (InternalShortTermsFacet) facets.get(0);
TShortIntHashMap aggregated = aggregateCache.get().get();
TShortIntHashMap aggregated = CacheRecycler.popShortIntMap();
aggregated.clear();
long missing = 0;
for (Facet facet : facets) {
@ -196,6 +189,9 @@ public class InternalShortTermsFacet extends InternalTermsFacet {
}
first.entries = ordered;
first.missing = missing;
CacheRecycler.pushShortIntMap(aggregated);
return first;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.search.facet.terms.shorts;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TShortIntIterator;
import org.elasticsearch.common.trove.map.hash.TShortIntHashMap;
import org.elasticsearch.common.trove.set.hash.TShortHashSet;
@ -38,11 +38,11 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@ -51,12 +51,6 @@ import java.util.Set;
*/
public class TermsShortFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TShortIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>(new ArrayDeque<TShortIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
@ -107,9 +101,9 @@ public class TermsShortFacetCollector extends AbstractFacetCollector {
}
if (this.script == null && excluded.isEmpty()) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.popShortIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.popShortIntMap(), excluded, this.script);
}
if (allTerms) {
@ -144,35 +138,30 @@ public class TermsShortFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TShortIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushShortIntMap(facets);
return new InternalShortTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalShortTermsFacet.ShortEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalShortTermsFacet.ShortEntry> ordered = new BoundedTreeSet<InternalShortTermsFacet.ShortEntry>(comparatorType.comparator(), size * numberOfShards);
for (TShortIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalShortTermsFacet.ShortEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TShortIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalShortTermsFacet.ShortEntry(it.key(), it.value()));
}
InternalShortTermsFacet.ShortEntry[] list = new InternalShortTermsFacet.ShortEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalShortTermsFacet.ShortEntry) ordered.pop();
}
CacheRecycler.pushShortIntMap(facets);
return new InternalShortTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalShortTermsFacet.ShortEntry> ordered = new BoundedTreeSet<InternalShortTermsFacet.ShortEntry>(comparatorType.comparator(), size);
for (TShortIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalShortTermsFacet.ShortEntry(it.key(), it.value()));
}
CacheRecycler.pushShortIntMap(facets);
return new InternalShortTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalShortTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TShortIntHashMap popFacets() {
Deque<TShortIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TShortIntHashMap());
}
TShortIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TShortIntHashMap facets) {
facets.clear();
Deque<TShortIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.shorts;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.trove.set.hash.TShortHashSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.shorts.ShortFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsShortOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private ShortFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final TShortHashSet excluded;
public TermsShortOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms short facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.SHORT) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of short type, can't run terms short facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = new TShortHashSet(excluded.size());
for (String s : excluded) {
this.excluded.add(Short.parseShort(s));
}
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (ShortFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
short value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalShortTermsFacet.ShortEntry entry = new InternalShortTermsFacet.ShortEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalShortTermsFacet.ShortEntry[] list = new InternalShortTermsFacet.ShortEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalShortTermsFacet.ShortEntry) ordered.pop();
}
return new InternalShortTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalShortTermsFacet.ShortEntry> ordered = new BoundedTreeSet<InternalShortTermsFacet.ShortEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
short value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value == agg.current);
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalShortTermsFacet.ShortEntry entry = new InternalShortTermsFacet.ShortEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalShortTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final short[] values;
final int[] counts;
int position = 0;
short current;
public ReaderAggregator(ShortFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current < b.current;
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.facet.terms.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
@ -34,9 +35,11 @@ import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -95,9 +98,9 @@ public class FieldsTermsStringFacetCollector extends AbstractFacetCollector {
}
if (excluded.isEmpty() && pattern == null && this.script == null) {
aggregator = new StaticAggregatorValueProc(TermsStringFacetCollector.popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.<String>popObjectIntMap());
} else {
aggregator = new AggregatorValueProc(TermsStringFacetCollector.popFacets(), excluded, pattern, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.<String>popObjectIntMap(), excluded, pattern, this.script);
}
if (allTerms) {
@ -138,17 +141,30 @@ public class FieldsTermsStringFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TObjectIntHashMap<String> facets = aggregator.facets();
if (facets.isEmpty()) {
TermsStringFacetCollector.pushFacets(facets);
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
InternalStringTermsFacet.StringEntry[] list = new InternalStringTermsFacet.StringEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ((InternalStringTermsFacet.StringEntry) ordered.pop());
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
TermsStringFacetCollector.pushFacets(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.facet.terms.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
@ -29,9 +30,11 @@ import org.elasticsearch.common.trove.map.hash.TObjectIntHashMap;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -68,7 +71,7 @@ public class ScriptTermsStringFieldFacetCollector extends AbstractFacetCollector
this.excluded = excluded;
this.matcher = pattern != null ? pattern.matcher("") : null;
this.facets = TermsStringFacetCollector.popFacets();
this.facets = CacheRecycler.popObjectIntMap();
}
@Override public void setScorer(Scorer scorer) throws IOException {
@ -132,17 +135,30 @@ public class ScriptTermsStringFieldFacetCollector extends AbstractFacetCollector
@Override public Facet facet() {
if (facets.isEmpty()) {
TermsStringFacetCollector.pushFacets(facets);
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of(), missing);
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
InternalStringTermsFacet.StringEntry[] list = new InternalStringTermsFacet.StringEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ((InternalStringTermsFacet.StringEntry) ordered.pop());
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
} else {
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, missing);
}
TermsStringFacetCollector.pushFacets(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, missing);
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.facet.terms.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableSet;
@ -36,10 +37,12 @@ import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.Map;
import java.util.regex.Matcher;
@ -104,9 +107,9 @@ public class TermsStringFacetCollector extends AbstractFacetCollector {
}
if (excluded.isEmpty() && pattern == null && this.script == null) {
aggregator = new StaticAggregatorValueProc(popFacets());
aggregator = new StaticAggregatorValueProc(CacheRecycler.<String>popObjectIntMap());
} else {
aggregator = new AggregatorValueProc(popFacets(), excluded, pattern, this.script);
aggregator = new AggregatorValueProc(CacheRecycler.<String>popObjectIntMap(), excluded, pattern, this.script);
}
if (allTerms) {
@ -141,35 +144,30 @@ public class TermsStringFacetCollector extends AbstractFacetCollector {
@Override public Facet facet() {
TObjectIntHashMap<String> facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.insertWithOverflow(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
InternalStringTermsFacet.StringEntry[] list = new InternalStringTermsFacet.StringEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ((InternalStringTermsFacet.StringEntry) ordered.pop());
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), aggregator.missing());
} else {
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
}
CacheRecycler.pushObjectIntMap(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
pushFacets(facets);
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TObjectIntHashMap<String> popFacets() {
Deque<TObjectIntHashMap<String>> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TObjectIntHashMap<String>());
}
TObjectIntHashMap<String> facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TObjectIntHashMap<String> facets) {
facets.clear();
Deque<TObjectIntHashMap<String>> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.strings;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.strings.StringFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.support.EntryPriorityQueue;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class TermsStringOrdinalsFacetCollector extends AbstractFacetCollector {
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final int minCount;
private final FieldDataType fieldDataType;
private StringFieldData fieldData;
private final List<ReaderAggregator> aggregators;
private ReaderAggregator current;
long missing;
private final ImmutableSet<String> excluded;
public TermsStringOrdinalsFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.STRING) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of string type, can't run terms string facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (excluded == null || excluded.isEmpty()) {
this.excluded = null;
} else {
this.excluded = excluded;
}
// minCount is offset by -1
if (allTerms) {
minCount = -1;
} else {
minCount = 0;
}
this.aggregators = new ArrayList<ReaderAggregator>(context.searcher().subReaders().length);
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
if (current != null) {
missing += current.counts[0];
if (current.values.length > 1) {
aggregators.add(current);
}
}
fieldData = (StringFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
current = new ReaderAggregator(fieldData);
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachOrdinalInDoc(doc, current);
}
@Override public Facet facet() {
if (current != null) {
missing += current.counts[0];
// if we have values for this one, add it
if (current.values.length > 1) {
aggregators.add(current);
}
}
AggregatorPriorityQueue queue = new AggregatorPriorityQueue(aggregators.size());
for (ReaderAggregator aggregator : aggregators) {
CacheRecycler.pushIntArray(aggregator.counts); // release it here, anyhow we are on the same thread so won't be corrupted
if (aggregator.nextPosition()) {
queue.add(aggregator);
}
}
// YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes
if (size < EntryPriorityQueue.LIMIT) {
// optimize to use priority size
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
String value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value.equals(agg.current));
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalStringTermsFacet.StringEntry entry = new InternalStringTermsFacet.StringEntry(value, count);
ordered.insertWithOverflow(entry);
}
}
}
InternalStringTermsFacet.StringEntry[] list = new InternalStringTermsFacet.StringEntry[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (InternalStringTermsFacet.StringEntry) ordered.pop();
}
return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing);
}
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size);
while (queue.size() > 0) {
ReaderAggregator agg = queue.top();
String value = agg.current;
int count = 0;
do {
count += agg.counts[agg.position];
if (agg.nextPosition()) {
agg = queue.updateTop();
} else {
// we are done with this reader
queue.pop();
agg = queue.top();
}
} while (agg != null && value.equals(agg.current));
if (count > minCount) {
if (excluded == null || !excluded.contains(value)) {
InternalStringTermsFacet.StringEntry entry = new InternalStringTermsFacet.StringEntry(value, count);
ordered.add(entry);
}
}
}
return new InternalStringTermsFacet(facetName, comparatorType, size, ordered, missing);
}
public static class ReaderAggregator implements FieldData.OrdinalInDocProc {
final String[] values;
final int[] counts;
int position = 0;
String current;
public ReaderAggregator(StringFieldData fieldData) {
this.values = fieldData.values();
this.counts = CacheRecycler.popIntArray(fieldData.values().length);
}
@Override public void onOrdinal(int docId, int ordinal) {
counts[ordinal]++;
}
public boolean nextPosition() {
if (++position >= values.length) {
return false;
}
current = values[position];
return true;
}
}
public static class AggregatorPriorityQueue extends PriorityQueue<ReaderAggregator> {
public AggregatorPriorityQueue(int size) {
initialize(size);
}
@Override protected boolean lessThan(ReaderAggregator a, ReaderAggregator b) {
return a.current.compareTo(b.current) < 0;
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.support;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.search.facet.terms.TermsFacet;
import java.util.Comparator;
public class EntryPriorityQueue extends PriorityQueue<TermsFacet.Entry> {
public static final int LIMIT = 5000;
private final Comparator<TermsFacet.Entry> comparator;
public EntryPriorityQueue(int size, Comparator<TermsFacet.Entry> comparator) {
initialize(size);
this.comparator = comparator;
}
@Override protected boolean lessThan(TermsFacet.Entry a, TermsFacet.Entry b) {
return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list
}
}

View File

@ -742,7 +742,7 @@ public class SimpleFacetsTests extends AbstractNodesTests {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(3))
.addFacet(termsFacet("facet1").field("text").size(10))
.execute().actionGet();
TermsFacet facet = searchResponse.facets().facet("facet1");
@ -752,32 +752,6 @@ public class SimpleFacetsTests extends AbstractNodesTests {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(2))
.execute().actionGet();
facet = searchResponse.facets().facet("facet1");
assertThat(facet.name(), equalTo("facet1"));
assertThat(facet.entries().size(), equalTo(2));
for (int i = 0; i < 2; i++) {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(1))
.execute().actionGet();
facet = searchResponse.facets().facet("facet1");
assertThat(facet.name(), equalTo("facet1"));
assertThat(facet.entries().size(), equalTo(1));
for (int i = 0; i < 1; i++) {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
}
@Test public void testStatsFacets() throws Exception {