diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 2f9f80926c1..5578def548f 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -82,6 +82,8 @@ linefeeds lons loopback + lstag + ltag lucene mcast memcached diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/search/facet/terms/TermsFacetSearchBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/search/facet/terms/TermsFacetSearchBenchmark.java new file mode 100644 index 00000000000..f5902c8a4ff --- /dev/null +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/search/facet/terms/TermsFacetSearchBenchmark.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.search.facet.terms; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.node.Node; + +import java.io.IOException; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.elasticsearch.common.xcontent.XContentFactory.*; +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; +import static org.elasticsearch.node.NodeBuilder.*; +import static org.elasticsearch.search.facet.FacetBuilders.*; + +/** + * @author kimchy (shay.banon) + */ +public class TermsFacetSearchBenchmark { + + public static void main(String[] args) throws Exception { + Settings settings = settingsBuilder() + .put("index.engine.robin.refreshInterval", "-1") + .put("gateway.type", "local") + .put(SETTING_NUMBER_OF_SHARDS, 2) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + + Node node1 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node1")).node(); + Node node2 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node2")).node(); + + Node clientNode = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "client")).client(true).node(); + + Client client = clientNode.client(); + + long COUNT = SizeValue.parseSizeValue("1m").singles(); + int BATCH = 100; + int QUERY_WARMUP = 20; + int QUERY_COUNT = 200; + int NUMBER_OF_TERMS = 10; + + long[] lValues = new long[NUMBER_OF_TERMS]; + for (int i = 0; i < NUMBER_OF_TERMS; i++) { + lValues[i] = i; + } + String[] sValues = new String[NUMBER_OF_TERMS]; + for (int i = 0; i < NUMBER_OF_TERMS; i++) { + sValues[i] = Integer.toString(i); + } + + Thread.sleep(10000); + try { + client.admin().indices().create(createIndexRequest("test")).actionGet(); + + StopWatch stopWatch = new StopWatch().start(); + + System.out.println("--> Indexing [" + COUNT + "] ..."); + long ITERS = COUNT / BATCH; + long i = 1; + int counter = 0; + for (; i <= ITERS; i++) { + BulkRequestBuilder request = client.prepareBulk(); + for (int j = 0; j < BATCH; j++) { + counter++; + request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter)) + .source(source(Integer.toString(counter), sValues[counter % sValues.length], lValues[counter % lValues.length]))); + } + BulkResponse response = request.execute().actionGet(); + if (response.hasFailures()) { + System.err.println("--> failures..."); + } + if (((i * BATCH) % 10000) == 0) { + System.out.println("--> Indexed " + (i * BATCH) + " took " + stopWatch.stop().lastTaskTime()); + stopWatch.start(); + } + } + System.out.println("--> Indexing took " + stopWatch.totalTime() + ", TPS " + (((double) (COUNT)) / stopWatch.totalTime().secondsFrac())); + } catch (Exception e) { + System.out.println("--> Index already exists, ignoring indexing phase, waiting for green"); + ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + if (clusterHealthResponse.timedOut()) { + System.err.println("--> Timed out waiting for cluster health"); + } + } + client.admin().indices().prepareRefresh().execute().actionGet(); + System.out.println("--> Number of docs in index: " + client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().count()); + + System.out.println("--> Warmup..."); + // run just the child query, warm up first + for (int j = 0; j < QUERY_WARMUP; j++) { + SearchResponse searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("s_value").field("s_value")) + .addFacet(termsFacet("l_value").field("l_value")) + .execute().actionGet(); + if (j == 0) { + System.out.println("--> Warmup took: " + searchResponse.took()); + } + if (searchResponse.hits().totalHits() != COUNT) { + System.err.println("--> mismatch on hits"); + } + } + + long totalQueryTime = 0; + for (int j = 0; j < QUERY_COUNT; j++) { + SearchResponse searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("s_value").field("s_value")) + .execute().actionGet(); + if (searchResponse.hits().totalHits() != COUNT) { + System.err.println("--> mismatch on hits"); + } + totalQueryTime += searchResponse.tookInMillis(); + } + System.out.println("--> Terms Facet (s_value) " + (totalQueryTime / QUERY_COUNT) + "ms"); + + totalQueryTime = 0; + for (int j = 0; j < QUERY_COUNT; j++) { + SearchResponse searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("l_value").field("l_value")) + .execute().actionGet(); + if (searchResponse.hits().totalHits() != COUNT) { + System.err.println("--> mismatch on hits"); + } + totalQueryTime += searchResponse.tookInMillis(); + } + System.out.println("--> Terms Facet (l_value) " + (totalQueryTime / QUERY_COUNT) + "ms"); + + + clientNode.close(); + + node1.close(); + node2.close(); + } + + private static XContentBuilder source(String id, String sValue, long lValue) throws IOException { + return jsonBuilder().startObject().field("id", id).field("s_value", sValue).field("l_value", lValue).endObject(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/field/data/doubles/DoubleFieldData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/field/data/doubles/DoubleFieldData.java index 3b91193af51..2a13204dc8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/field/data/doubles/DoubleFieldData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/field/data/doubles/DoubleFieldData.java @@ -107,6 +107,11 @@ public abstract class DoubleFieldData extends NumericFieldData facets); + public static void registerStreams() { + InternalStringTermsFacet.registerStream(); + InternalLongTermsFacet.registerStream(); + InternalDoubleTermsFacet.registerStream(); + InternalIntTermsFacet.registerStream(); + InternalFloatTermsFacet.registerStream(); + InternalShortTermsFacet.registerStream(); + } + + public abstract Facet reduce(String name, List facets); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacet.java index 504dda6179d..491a53660f5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacet.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacet.java @@ -43,6 +43,10 @@ public interface TermsFacet extends Facet, Iterable { String getTerm(); + Number termAsNumber(); + + Number getTermAsNumber(); + int count(); int getCount(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java index aa35260d9d7..bae3fba9c6c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java @@ -26,13 +26,18 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.search.facet.Facet; import org.elasticsearch.search.facet.FacetCollector; import org.elasticsearch.search.facet.FacetProcessor; -import org.elasticsearch.search.facet.InternalFacet; +import org.elasticsearch.search.facet.terms.doubles.TermsDoubleFacetCollector; +import org.elasticsearch.search.facet.terms.floats.TermsFloatFacetCollector; import org.elasticsearch.search.facet.terms.index.IndexNameFacetCollector; +import org.elasticsearch.search.facet.terms.ints.TermsIntFacetCollector; +import org.elasticsearch.search.facet.terms.longs.TermsLongFacetCollector; +import org.elasticsearch.search.facet.terms.shorts.TermsShortFacetCollector; import org.elasticsearch.search.facet.terms.strings.FieldsTermsStringFacetCollector; -import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet; import org.elasticsearch.search.facet.terms.strings.ScriptTermsStringFieldFacetCollector; import org.elasticsearch.search.facet.terms.strings.TermsStringFacetCollector; import org.elasticsearch.search.internal.SearchContext; @@ -49,7 +54,7 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce @Inject public TermsFacetProcessor(Settings settings) { super(settings); - InternalFacet.Streams.registerStream(InternalStringTermsFacet.STREAM, InternalStringTermsFacet.TYPE); + InternalTermsFacet.registerStreams(); } @Override public String[] types() { @@ -127,6 +132,21 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce if (field == null && fieldsNames == null && script != null) { return new ScriptTermsStringFieldFacetCollector(facetName, size, comparatorType, context, excluded, pattern, scriptLang, script, params); } + + FieldMapper fieldMapper = context.mapperService().smartNameFieldMapper(field); + if (fieldMapper != null) { + if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) { + return new TermsLongFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params); + } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.DOUBLE) { + return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params); + } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.INT) { + return new TermsIntFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params); + } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.FLOAT) { + return new TermsFloatFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params); + } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.SHORT) { + return new TermsShortFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params); + } + } return new TermsStringFacetCollector(facetName, field, size, comparatorType, context, excluded, pattern, scriptLang, script, params); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java new file mode 100644 index 00000000000..830fa7ca366 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java @@ -0,0 +1,267 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.doubles; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TDoubleIntHashMap; +import org.elasticsearch.common.trove.TDoubleIntIterator; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalDoubleTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "dTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class DoubleEntry implements Entry { + + double term; + int count; + + public DoubleEntry(double term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return Double.toString(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + double anotherVal = ((DoubleEntry) o).term; + if (term < anotherVal) { + return -1; + } + if (term == anotherVal) { + int i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + return i; + } + return 1; + } + } + + private String name; + + private String fieldName; + + int requiredSize; + + Collection entries = ImmutableList.of(); + + private ComparatorType comparatorType; + + InternalDoubleTermsFacet() { + } + + public InternalDoubleTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection entries) { + this.name = name; + this.fieldName = fieldName; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String fieldName() { + return this.fieldName; + } + + @Override public String getFieldName() { + return fieldName(); + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public ComparatorType comparatorType() { + return comparatorType; + } + + @Override public ComparatorType getComparatorType() { + return comparatorType(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TDoubleIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalDoubleTermsFacet first = (InternalDoubleTermsFacet) facets.get(0); + TDoubleIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + + for (Facet facet : facets) { + InternalDoubleTermsFacet mFacet = (InternalDoubleTermsFacet) facet; + for (DoubleEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType().comparator(), first.requiredSize); + for (TDoubleIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new DoubleEntry(it.key(), it.value())); + } + first.entries = ordered; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _FIELD = new XContentBuilderString("_field"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields._FIELD, fieldName); + builder.startArray(Fields.TERMS); + for (DoubleEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term); + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + + public static InternalDoubleTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalDoubleTermsFacet facet = new InternalDoubleTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + fieldName = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new DoubleEntry(in.readDouble(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(fieldName); + out.writeByte(comparatorType.id()); + + out.writeVInt(requiredSize); + + out.writeVInt(entries.size()); + for (DoubleEntry entry : entries) { + out.writeDouble(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetCollector.java new file mode 100644 index 00000000000..450a5a3bd89 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetCollector.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.doubles; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TDoubleIntHashMap; +import org.elasticsearch.common.trove.TDoubleIntIterator; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.doubles.DoubleFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.search.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsDoubleFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + private final FieldDataCache fieldDataCache; + + private final String fieldName; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private DoubleFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsDoubleFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + this.fieldName = fieldName; + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms double facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.DOUBLE) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of double type, can't run terms double facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService()); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (DoubleFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TDoubleIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalDoubleTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TDoubleIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalDoubleTermsFacet(facetName, fieldName, comparatorType, size, ordered); + } + } + + static TDoubleIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TDoubleIntHashMap()); + } + TDoubleIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TDoubleIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + private final Map scriptParams; + + public AggregatorValueProc(TDoubleIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + if (script != null) { + scriptParams = Maps.newHashMapWithExpectedSize(4); + } else { + scriptParams = null; + } + } + + @Override public void onValue(int docId, double value) { + if (script != null) { + scriptParams.put("term", value); + Object scriptValue = script.execute(docId, scriptParams); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).doubleValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements DoubleFieldData.ValueInDocProc { + + private final TDoubleIntHashMap facets; + + public StaticAggregatorValueProc(TDoubleIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(int docId, double value) { + facets.adjustOrPutValue(value, 1, 1); + } + + public final TDoubleIntHashMap facets() { + return facets; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/InternalFloatTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/InternalFloatTermsFacet.java new file mode 100644 index 00000000000..a559883d70e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/InternalFloatTermsFacet.java @@ -0,0 +1,267 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.floats; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TFloatIntHashMap; +import org.elasticsearch.common.trove.TFloatIntIterator; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalFloatTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "fTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class FloatEntry implements Entry { + + float term; + int count; + + public FloatEntry(float term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return Float.toString(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + float anotherVal = ((FloatEntry) o).term; + if (term < anotherVal) { + return -1; + } + if (term == anotherVal) { + int i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + return i; + } + return 1; + } + } + + private String name; + + private String fieldName; + + int requiredSize; + + Collection entries = ImmutableList.of(); + + private ComparatorType comparatorType; + + InternalFloatTermsFacet() { + } + + public InternalFloatTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection entries) { + this.name = name; + this.fieldName = fieldName; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String fieldName() { + return this.fieldName; + } + + @Override public String getFieldName() { + return fieldName(); + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public ComparatorType comparatorType() { + return comparatorType; + } + + @Override public ComparatorType getComparatorType() { + return comparatorType(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TFloatIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalFloatTermsFacet first = (InternalFloatTermsFacet) facets.get(0); + TFloatIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + + for (Facet facet : facets) { + InternalFloatTermsFacet mFacet = (InternalFloatTermsFacet) facet; + for (FloatEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType().comparator(), first.requiredSize); + for (TFloatIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new FloatEntry(it.key(), it.value())); + } + first.entries = ordered; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _FIELD = new XContentBuilderString("_field"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields._FIELD, fieldName); + builder.startArray(Fields.TERMS); + for (FloatEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term); + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + + public static InternalFloatTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalFloatTermsFacet facet = new InternalFloatTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + fieldName = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new FloatEntry(in.readFloat(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(fieldName); + out.writeByte(comparatorType.id()); + + out.writeVInt(requiredSize); + + out.writeVInt(entries.size()); + for (FloatEntry entry : entries) { + out.writeFloat(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/TermsFloatFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/TermsFloatFacetCollector.java new file mode 100644 index 00000000000..103a070248f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/floats/TermsFloatFacetCollector.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.floats; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TFloatIntHashMap; +import org.elasticsearch.common.trove.TFloatIntIterator; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.floats.FloatFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.search.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsFloatFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + private final FieldDataCache fieldDataCache; + + private final String fieldName; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private FloatFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsFloatFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + this.fieldName = fieldName; + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms float facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.FLOAT) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't is not of float type, can't run terms float facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService()); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (FloatFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TFloatIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalFloatTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TFloatIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalFloatTermsFacet.FloatEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalFloatTermsFacet(facetName, fieldName, comparatorType, size, ordered); + } + } + + static TFloatIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TFloatIntHashMap()); + } + TFloatIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TFloatIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + private final Map scriptParams; + + public AggregatorValueProc(TFloatIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + if (script != null) { + scriptParams = Maps.newHashMapWithExpectedSize(4); + } else { + scriptParams = null; + } + } + + @Override public void onValue(int docId, float value) { + if (script != null) { + scriptParams.put("term", value); + Object scriptValue = script.execute(docId, scriptParams); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).floatValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements FloatFieldData.ValueInDocProc { + + private final TFloatIntHashMap facets; + + public StaticAggregatorValueProc(TFloatIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(int docId, float value) { + facets.adjustOrPutValue(value, 1, 1); + } + + public final TFloatIntHashMap facets() { + return facets; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/InternalIntTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/InternalIntTermsFacet.java new file mode 100644 index 00000000000..80172f7c625 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/InternalIntTermsFacet.java @@ -0,0 +1,264 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.ints; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TIntIntHashMap; +import org.elasticsearch.common.trove.TIntIntIterator; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalIntTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "iTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class IntEntry implements Entry { + + int term; + int count; + + public IntEntry(int term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return Integer.toString(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + int anotherVal = ((IntEntry) o).term; + int i = term - anotherVal; + if (i == 0) { + i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + } + return i; + } + } + + private String name; + + private String fieldName; + + int requiredSize; + + Collection entries = ImmutableList.of(); + + private ComparatorType comparatorType; + + InternalIntTermsFacet() { + } + + public InternalIntTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection entries) { + this.name = name; + this.fieldName = fieldName; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String fieldName() { + return this.fieldName; + } + + @Override public String getFieldName() { + return fieldName(); + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public ComparatorType comparatorType() { + return comparatorType; + } + + @Override public ComparatorType getComparatorType() { + return comparatorType(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TIntIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalIntTermsFacet first = (InternalIntTermsFacet) facets.get(0); + TIntIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + + for (Facet facet : facets) { + InternalIntTermsFacet mFacet = (InternalIntTermsFacet) facet; + for (IntEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType().comparator(), first.requiredSize); + for (TIntIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new IntEntry(it.key(), it.value())); + } + first.entries = ordered; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _FIELD = new XContentBuilderString("_field"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields._FIELD, fieldName); + builder.startArray(Fields.TERMS); + for (IntEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term); + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + + public static InternalIntTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalIntTermsFacet facet = new InternalIntTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + fieldName = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new IntEntry(in.readInt(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(fieldName); + out.writeByte(comparatorType.id()); + + out.writeVInt(requiredSize); + + out.writeVInt(entries.size()); + for (IntEntry entry : entries) { + out.writeInt(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/TermsIntFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/TermsIntFacetCollector.java new file mode 100644 index 00000000000..99722d30d34 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ints/TermsIntFacetCollector.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.ints; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TIntIntHashMap; +import org.elasticsearch.common.trove.TIntIntIterator; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.ints.IntFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.search.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsIntFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + private final FieldDataCache fieldDataCache; + + private final String fieldName; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private IntFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsIntFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + this.fieldName = fieldName; + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms int facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.INT) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of int type, can't run terms int facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService()); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (IntFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TIntIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalIntTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TIntIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalIntTermsFacet.IntEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalIntTermsFacet(facetName, fieldName, comparatorType, size, ordered); + } + } + + static TIntIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TIntIntHashMap()); + } + TIntIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TIntIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + private final Map scriptParams; + + public AggregatorValueProc(TIntIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + if (script != null) { + scriptParams = Maps.newHashMapWithExpectedSize(4); + } else { + scriptParams = null; + } + } + + @Override public void onValue(int docId, int value) { + if (script != null) { + scriptParams.put("term", value); + Object scriptValue = script.execute(docId, scriptParams); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).intValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements IntFieldData.ValueInDocProc { + + private final TIntIntHashMap facets; + + public StaticAggregatorValueProc(TIntIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(int docId, int value) { + facets.adjustOrPutValue(value, 1, 1); + } + + public final TIntIntHashMap facets() { + return facets; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java new file mode 100644 index 00000000000..e113ef9180c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java @@ -0,0 +1,267 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.longs; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TLongIntHashMap; +import org.elasticsearch.common.trove.TLongIntIterator; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalLongTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "lTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class LongEntry implements Entry { + + long term; + int count; + + public LongEntry(long term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return Long.toString(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + long anotherVal = ((LongEntry) o).term; + if (term < anotherVal) { + return -1; + } + if (term == anotherVal) { + int i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + return i; + } + return 1; + } + } + + private String name; + + private String fieldName; + + int requiredSize; + + Collection entries = ImmutableList.of(); + + private ComparatorType comparatorType; + + InternalLongTermsFacet() { + } + + public InternalLongTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection entries) { + this.name = name; + this.fieldName = fieldName; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String fieldName() { + return this.fieldName; + } + + @Override public String getFieldName() { + return fieldName(); + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public ComparatorType comparatorType() { + return comparatorType; + } + + @Override public ComparatorType getComparatorType() { + return comparatorType(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TLongIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalLongTermsFacet first = (InternalLongTermsFacet) facets.get(0); + TLongIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + + for (Facet facet : facets) { + InternalLongTermsFacet mFacet = (InternalLongTermsFacet) facet; + for (LongEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType().comparator(), first.requiredSize); + for (TLongIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new LongEntry(it.key(), it.value())); + } + first.entries = ordered; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _FIELD = new XContentBuilderString("_field"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields._FIELD, fieldName); + builder.startArray(Fields.TERMS); + for (LongEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term); + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + + public static InternalLongTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalLongTermsFacet facet = new InternalLongTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + fieldName = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new LongEntry(in.readLong(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(fieldName); + out.writeByte(comparatorType.id()); + + out.writeVInt(requiredSize); + + out.writeVInt(entries.size()); + for (LongEntry entry : entries) { + out.writeLong(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetCollector.java new file mode 100644 index 00000000000..ebf3daeb90f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetCollector.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.longs; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TLongIntHashMap; +import org.elasticsearch.common.trove.TLongIntIterator; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.longs.LongFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.search.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsLongFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + + private final FieldDataCache fieldDataCache; + + private final String fieldName; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private LongFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsLongFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + this.fieldName = fieldName; + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService()); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TLongIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalLongTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TLongIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalLongTermsFacet(facetName, fieldName, comparatorType, size, ordered); + } + } + + static TLongIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TLongIntHashMap()); + } + TLongIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TLongIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + private final Map scriptParams; + + public AggregatorValueProc(TLongIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + if (script != null) { + scriptParams = Maps.newHashMapWithExpectedSize(4); + } else { + scriptParams = null; + } + } + + @Override public void onValue(int docId, long value) { + if (script != null) { + scriptParams.put("term", value); + Object scriptValue = script.execute(docId, scriptParams); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).longValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements LongFieldData.ValueInDocProc { + + private final TLongIntHashMap facets; + + public StaticAggregatorValueProc(TLongIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(int docId, long value) { + facets.adjustOrPutValue(value, 1, 1); + } + + public final TLongIntHashMap facets() { + return facets; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/InternalShortTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/InternalShortTermsFacet.java new file mode 100644 index 00000000000..5ad7040bbdd --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/InternalShortTermsFacet.java @@ -0,0 +1,264 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.shorts; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TShortIntHashMap; +import org.elasticsearch.common.trove.TShortIntIterator; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalShortTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "sTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class ShortEntry implements Entry { + + short term; + int count; + + public ShortEntry(short term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return Short.toString(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + short anotherVal = ((ShortEntry) o).term; + int i = term - anotherVal; + if (i == 0) { + i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + } + return i; + } + } + + private String name; + + private String fieldName; + + int requiredSize; + + Collection entries = ImmutableList.of(); + + private ComparatorType comparatorType; + + InternalShortTermsFacet() { + } + + public InternalShortTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection entries) { + this.name = name; + this.fieldName = fieldName; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String fieldName() { + return this.fieldName; + } + + @Override public String getFieldName() { + return fieldName(); + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public ComparatorType comparatorType() { + return comparatorType; + } + + @Override public ComparatorType getComparatorType() { + return comparatorType(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TShortIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalShortTermsFacet first = (InternalShortTermsFacet) facets.get(0); + TShortIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + + for (Facet facet : facets) { + InternalShortTermsFacet mFacet = (InternalShortTermsFacet) facet; + for (ShortEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType().comparator(), first.requiredSize); + for (TShortIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new ShortEntry(it.key(), it.value())); + } + first.entries = ordered; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _FIELD = new XContentBuilderString("_field"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields._FIELD, fieldName); + builder.startArray(Fields.TERMS); + for (ShortEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term); + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + } + + public static InternalShortTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalShortTermsFacet facet = new InternalShortTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + fieldName = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new ShortEntry(in.readShort(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(fieldName); + out.writeByte(comparatorType.id()); + + out.writeVInt(requiredSize); + + out.writeVInt(entries.size()); + for (ShortEntry entry : entries) { + out.writeShort(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/TermsShortFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/TermsShortFacetCollector.java new file mode 100644 index 00000000000..834334b631b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/shorts/TermsShortFacetCollector.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.shorts; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.TShortIntHashMap; +import org.elasticsearch.common.trove.TShortIntIterator; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.shorts.ShortFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.search.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsShortFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + private final FieldDataCache fieldDataCache; + + private final String fieldName; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private ShortFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsShortFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + this.fieldName = fieldName; + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms short facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.SHORT) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of short type, can't run terms short facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService()); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (ShortFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TShortIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalShortTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TShortIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalShortTermsFacet.ShortEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalShortTermsFacet(facetName, fieldName, comparatorType, size, ordered); + } + } + + static TShortIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TShortIntHashMap()); + } + TShortIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TShortIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + private final Map scriptParams; + + public AggregatorValueProc(TShortIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + if (script != null) { + scriptParams = Maps.newHashMapWithExpectedSize(4); + } else { + scriptParams = null; + } + } + + @Override public void onValue(int docId, short value) { + if (script != null) { + scriptParams.put("term", value); + Object scriptValue = script.execute(docId, scriptParams); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).shortValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements ShortFieldData.ValueInDocProc { + + private final TShortIntHashMap facets; + + public StaticAggregatorValueProc(TShortIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(int docId, short value) { + facets.adjustOrPutValue(value, 1, 1); + } + + public final TShortIntHashMap facets() { + return facets; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetCollector.java index 91928e71928..7230007bf2a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetCollector.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetCollector.java @@ -128,7 +128,7 @@ public class FieldsTermsStringFacetCollector extends AbstractFacetCollector { return new InternalStringTermsFacet(facetName, arrayToCommaDelimitedString(fieldsNames), comparatorType, size, ImmutableList.of()); } else { // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards - BoundedTreeSet ordered = new BoundedTreeSet(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); for (TObjectIntIterator it = facets.iterator(); it.hasNext();) { it.advance(); ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value())); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java index db3f1860a57..ffc6b9c4ff4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.trove.TObjectIntIterator; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.search.facet.Facet; -import org.elasticsearch.search.facet.InternalFacet; import org.elasticsearch.search.facet.terms.InternalTermsFacet; import org.elasticsearch.search.facet.terms.TermsFacet; @@ -42,14 +41,24 @@ import java.util.List; /** * @author kimchy (shay.banon) */ -public class InternalStringTermsFacet implements InternalFacet, InternalTermsFacet { +public class InternalStringTermsFacet extends InternalTermsFacet { - public static Stream STREAM = new Stream() { + private static final String STREAM_TYPE = "tTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { @Override public Facet readFacet(String type, StreamInput in) throws IOException { return readTermsFacet(in); } }; + @Override public String streamType() { + return STREAM_TYPE; + } + public static class StringEntry implements Entry { private String term; @@ -68,6 +77,14 @@ public class InternalStringTermsFacet implements InternalFacet, InternalTermsFac return term; } + @Override public Number termAsNumber() { + return Double.parseDouble(term); + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + public int count() { return count; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetCollector.java index 379c4a23027..4d91af554a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetCollector.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetCollector.java @@ -118,7 +118,7 @@ public class ScriptTermsStringFieldFacetCollector extends AbstractFacetCollector return new InternalStringTermsFacet(facetName, sScript, comparatorType, size, ImmutableList.of()); } else { // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards - BoundedTreeSet ordered = new BoundedTreeSet(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); for (TObjectIntIterator it = facets.iterator(); it.hasNext();) { it.advance(); ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value())); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetCollector.java index 63bac26d2d2..f5466d19457 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetCollector.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetCollector.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.script.search.SearchScript; import org.elasticsearch.search.facet.AbstractFacetCollector; import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.TermsFacet; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -61,7 +62,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector { private final String indexFieldName; - private final InternalStringTermsFacet.ComparatorType comparatorType; + private final TermsFacet.ComparatorType comparatorType; private final int size; @@ -75,7 +76,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector { private final SearchScript script; - public TermsStringFacetCollector(String facetName, String fieldName, int size, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context, + public TermsStringFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context, ImmutableSet excluded, Pattern pattern, String scriptLang, String script, Map params) { super(facetName); this.fieldDataCache = context.fieldDataCache(); @@ -130,7 +131,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector { return new InternalStringTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.of()); } else { // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards - BoundedTreeSet ordered = new BoundedTreeSet(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); for (TObjectIntIterator it = facets.iterator(); it.hasNext();) { it.advance(); ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value())); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facet/SimpleFacetsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facet/SimpleFacetsTests.java index d0fe940bea2..4c0e5046a4a 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facet/SimpleFacetsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facet/SimpleFacetsTests.java @@ -78,13 +78,17 @@ public class SimpleFacetsTests extends AbstractNodesTests { client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() .field("stag", "111") + .field("lstag", 111) .startArray("tag").value("xxx").value("yyy").endArray() + .startArray("ltag").value(1000l).value(2000l).endArray() .endObject()).execute().actionGet(); client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet(); client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() .field("stag", "111") + .field("lstag", 111) .startArray("tag").value("zzz").value("yyy").endArray() + .startArray("ltag").value(3000l).value(2000l).endArray() .endObject()).execute().actionGet(); client.admin().indices().prepareRefresh().execute().actionGet(); @@ -207,13 +211,21 @@ public class SimpleFacetsTests extends AbstractNodesTests { client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() .field("stag", "111") + .field("lstag", 111) + .field("dstag", 111.1) .startArray("tag").value("xxx").value("yyy").endArray() + .startArray("ltag").value(1000l).value(2000l).endArray() + .startArray("dtag").value(1000.1).value(2000.1).endArray() .endObject()).execute().actionGet(); client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet(); client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() .field("stag", "111") + .field("lstag", 111) + .field("dstag", 111.1) .startArray("tag").value("zzz").value("yyy").endArray() + .startArray("ltag").value(3000l).value(2000l).endArray() + .startArray("dtag").value(3000.1).value(2000.1).endArray() .endObject()).execute().actionGet(); client.admin().indices().prepareRefresh().execute().actionGet(); @@ -236,6 +248,52 @@ public class SimpleFacetsTests extends AbstractNodesTests { assertThat(facet.entries().get(0).term(), equalTo("yyy")); assertThat(facet.entries().get(0).count(), equalTo(2)); + // Numeric + + searchResponse = client.prepareSearch() + .setQuery(termQuery("stag", "111")) + .addFacet(termsFacet("facet1").field("lstag").size(10)) + .addFacet(termsFacet("facet2").field("ltag").size(10)) + .execute().actionGet(); + + facet = searchResponse.facets().facet("facet1"); + assertThat(facet.name(), equalTo("facet1")); + assertThat(facet.entries().size(), equalTo(1)); + assertThat(facet.entries().get(0).term(), equalTo("111")); + assertThat(facet.entries().get(0).count(), equalTo(2)); + + facet = searchResponse.facets().facet("facet2"); + assertThat(facet.name(), equalTo("facet2")); + assertThat(facet.entries().size(), equalTo(3)); + assertThat(facet.entries().get(0).term(), equalTo("2000")); + assertThat(facet.entries().get(0).count(), equalTo(2)); + assertThat(facet.entries().get(1).term(), anyOf(equalTo("1000"), equalTo("3000"))); + assertThat(facet.entries().get(1).count(), equalTo(1)); + assertThat(facet.entries().get(2).term(), anyOf(equalTo("1000"), equalTo("3000"))); + assertThat(facet.entries().get(2).count(), equalTo(1)); + + searchResponse = client.prepareSearch() + .setQuery(termQuery("stag", "111")) + .addFacet(termsFacet("facet1").field("dstag").size(10)) + .addFacet(termsFacet("facet2").field("dtag").size(10)) + .execute().actionGet(); + + facet = searchResponse.facets().facet("facet1"); + assertThat(facet.name(), equalTo("facet1")); + assertThat(facet.entries().size(), equalTo(1)); + assertThat(facet.entries().get(0).term(), equalTo("111.1")); + assertThat(facet.entries().get(0).count(), equalTo(2)); + + facet = searchResponse.facets().facet("facet2"); + assertThat(facet.name(), equalTo("facet2")); + assertThat(facet.entries().size(), equalTo(3)); + assertThat(facet.entries().get(0).term(), equalTo("2000.1")); + assertThat(facet.entries().get(0).count(), equalTo(2)); + assertThat(facet.entries().get(1).term(), anyOf(equalTo("1000.1"), equalTo("3000.1"))); + assertThat(facet.entries().get(1).count(), equalTo(1)); + assertThat(facet.entries().get(2).term(), anyOf(equalTo("1000.1"), equalTo("3000.1"))); + assertThat(facet.entries().get(2).count(), equalTo(1)); + // Test Facet Filter searchResponse = client.prepareSearch()