mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
Terms Facet: Performance and memory improvements when faceting numeric fields, closes #583.
This commit is contained in:
parent
019359a896
commit
82298d890c
2
.idea/dictionaries/kimchy.xml
generated
2
.idea/dictionaries/kimchy.xml
generated
@ -82,6 +82,8 @@
|
||||
<w>linefeeds</w>
|
||||
<w>lons</w>
|
||||
<w>loopback</w>
|
||||
<w>lstag</w>
|
||||
<w>ltag</w>
|
||||
<w>lucene</w>
|
||||
<w>mcast</w>
|
||||
<w>memcached</w>
|
||||
|
@ -0,0 +1,168 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.benchmark.search.facet.terms;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.client.Requests.*;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.*;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
||||
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
|
||||
import static org.elasticsearch.node.NodeBuilder.*;
|
||||
import static org.elasticsearch.search.facet.FacetBuilders.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsFacetSearchBenchmark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("index.engine.robin.refreshInterval", "-1")
|
||||
.put("gateway.type", "local")
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.build();
|
||||
|
||||
Node node1 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node1")).node();
|
||||
Node node2 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node2")).node();
|
||||
|
||||
Node clientNode = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "client")).client(true).node();
|
||||
|
||||
Client client = clientNode.client();
|
||||
|
||||
long COUNT = SizeValue.parseSizeValue("1m").singles();
|
||||
int BATCH = 100;
|
||||
int QUERY_WARMUP = 20;
|
||||
int QUERY_COUNT = 200;
|
||||
int NUMBER_OF_TERMS = 10;
|
||||
|
||||
long[] lValues = new long[NUMBER_OF_TERMS];
|
||||
for (int i = 0; i < NUMBER_OF_TERMS; i++) {
|
||||
lValues[i] = i;
|
||||
}
|
||||
String[] sValues = new String[NUMBER_OF_TERMS];
|
||||
for (int i = 0; i < NUMBER_OF_TERMS; i++) {
|
||||
sValues[i] = Integer.toString(i);
|
||||
}
|
||||
|
||||
Thread.sleep(10000);
|
||||
try {
|
||||
client.admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
|
||||
System.out.println("--> Indexing [" + COUNT + "] ...");
|
||||
long ITERS = COUNT / BATCH;
|
||||
long i = 1;
|
||||
int counter = 0;
|
||||
for (; i <= ITERS; i++) {
|
||||
BulkRequestBuilder request = client.prepareBulk();
|
||||
for (int j = 0; j < BATCH; j++) {
|
||||
counter++;
|
||||
request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter))
|
||||
.source(source(Integer.toString(counter), sValues[counter % sValues.length], lValues[counter % lValues.length])));
|
||||
}
|
||||
BulkResponse response = request.execute().actionGet();
|
||||
if (response.hasFailures()) {
|
||||
System.err.println("--> failures...");
|
||||
}
|
||||
if (((i * BATCH) % 10000) == 0) {
|
||||
System.out.println("--> Indexed " + (i * BATCH) + " took " + stopWatch.stop().lastTaskTime());
|
||||
stopWatch.start();
|
||||
}
|
||||
}
|
||||
System.out.println("--> Indexing took " + stopWatch.totalTime() + ", TPS " + (((double) (COUNT)) / stopWatch.totalTime().secondsFrac()));
|
||||
} catch (Exception e) {
|
||||
System.out.println("--> Index already exists, ignoring indexing phase, waiting for green");
|
||||
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
|
||||
if (clusterHealthResponse.timedOut()) {
|
||||
System.err.println("--> Timed out waiting for cluster health");
|
||||
}
|
||||
}
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
System.out.println("--> Number of docs in index: " + client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().count());
|
||||
|
||||
System.out.println("--> Warmup...");
|
||||
// run just the child query, warm up first
|
||||
for (int j = 0; j < QUERY_WARMUP; j++) {
|
||||
SearchResponse searchResponse = client.prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
.addFacet(termsFacet("s_value").field("s_value"))
|
||||
.addFacet(termsFacet("l_value").field("l_value"))
|
||||
.execute().actionGet();
|
||||
if (j == 0) {
|
||||
System.out.println("--> Warmup took: " + searchResponse.took());
|
||||
}
|
||||
if (searchResponse.hits().totalHits() != COUNT) {
|
||||
System.err.println("--> mismatch on hits");
|
||||
}
|
||||
}
|
||||
|
||||
long totalQueryTime = 0;
|
||||
for (int j = 0; j < QUERY_COUNT; j++) {
|
||||
SearchResponse searchResponse = client.prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
.addFacet(termsFacet("s_value").field("s_value"))
|
||||
.execute().actionGet();
|
||||
if (searchResponse.hits().totalHits() != COUNT) {
|
||||
System.err.println("--> mismatch on hits");
|
||||
}
|
||||
totalQueryTime += searchResponse.tookInMillis();
|
||||
}
|
||||
System.out.println("--> Terms Facet (s_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
|
||||
|
||||
totalQueryTime = 0;
|
||||
for (int j = 0; j < QUERY_COUNT; j++) {
|
||||
SearchResponse searchResponse = client.prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
.addFacet(termsFacet("l_value").field("l_value"))
|
||||
.execute().actionGet();
|
||||
if (searchResponse.hits().totalHits() != COUNT) {
|
||||
System.err.println("--> mismatch on hits");
|
||||
}
|
||||
totalQueryTime += searchResponse.tookInMillis();
|
||||
}
|
||||
System.out.println("--> Terms Facet (l_value) " + (totalQueryTime / QUERY_COUNT) + "ms");
|
||||
|
||||
|
||||
clientNode.close();
|
||||
|
||||
node1.close();
|
||||
node2.close();
|
||||
}
|
||||
|
||||
private static XContentBuilder source(String id, String sValue, long lValue) throws IOException {
|
||||
return jsonBuilder().startObject().field("id", id).field("s_value", sValue).field("l_value", lValue).endObject();
|
||||
}
|
||||
}
|
@ -107,6 +107,11 @@ public abstract class DoubleFieldData extends NumericFieldData<DoubleDocFieldDat
|
||||
void onValue(double value);
|
||||
}
|
||||
|
||||
public abstract void forEachValueInDoc(int docId, ValueInDocProc proc);
|
||||
|
||||
public static interface ValueInDocProc {
|
||||
void onValue(int docId, double value);
|
||||
}
|
||||
|
||||
public static DoubleFieldData load(IndexReader reader, String field) throws IOException {
|
||||
return FieldDataLoader.load(reader, field, new DoubleTypeLoader());
|
||||
|
@ -84,6 +84,16 @@ public class MultiValueDoubleFieldData extends DoubleFieldData {
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
|
||||
int[] docOrders = ordinals[docId];
|
||||
if (docOrders == null) {
|
||||
return;
|
||||
}
|
||||
for (int docOrder : docOrders) {
|
||||
proc.onValue(docId, values[docOrder]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public double[] doubleValues(int docId) {
|
||||
return values(docId);
|
||||
}
|
||||
|
@ -70,6 +70,14 @@ public class SingleValueDoubleFieldData extends DoubleFieldData {
|
||||
proc.onValue(docId, values[loc]);
|
||||
}
|
||||
|
||||
@Override public void forEachValueInDoc(int docId, ValueInDocProc proc) {
|
||||
int loc = ordinals[docId];
|
||||
if (loc == 0) {
|
||||
return;
|
||||
}
|
||||
proc.onValue(docId, values[loc]);
|
||||
}
|
||||
|
||||
@Override public double[] doubleValues(int docId) {
|
||||
return values(docId);
|
||||
}
|
||||
|
@ -32,6 +32,8 @@ import java.io.IOException;
|
||||
*/
|
||||
public interface InternalFacet extends Facet, Streamable, ToXContent {
|
||||
|
||||
String streamType();
|
||||
|
||||
public static interface Stream {
|
||||
Facet readFacet(String type, StreamInput in) throws IOException;
|
||||
}
|
||||
|
@ -142,8 +142,9 @@ public class InternalFacets implements Facets, Streamable, ToXContent, Iterable<
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(facets.size());
|
||||
for (Facet facet : facets) {
|
||||
out.writeUTF(facet.type());
|
||||
((InternalFacet) facet).writeTo(out);
|
||||
InternalFacet internalFacet = (InternalFacet) facet;
|
||||
out.writeUTF(internalFacet.streamType());
|
||||
internalFacet.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import org.elasticsearch.search.facet.histogram.InternalHistogramFacet;
|
||||
import org.elasticsearch.search.facet.query.InternalQueryFacet;
|
||||
import org.elasticsearch.search.facet.range.InternalRangeFacet;
|
||||
import org.elasticsearch.search.facet.statistical.InternalStatisticalFacet;
|
||||
import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
@ -34,12 +34,12 @@ import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet;
|
||||
public class TransportFacetModule extends AbstractModule {
|
||||
|
||||
@Override protected void configure() {
|
||||
InternalFacet.Streams.registerStream(InternalFilterFacet.STREAM, InternalFilterFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalQueryFacet.STREAM, InternalQueryFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalGeoDistanceFacet.STREAM, InternalGeoDistanceFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalHistogramFacet.STREAM, InternalHistogramFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalRangeFacet.STREAM, InternalRangeFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalStatisticalFacet.STREAM, InternalStatisticalFacet.TYPE);
|
||||
InternalFacet.Streams.registerStream(InternalStringTermsFacet.STREAM, InternalStringTermsFacet.TYPE);
|
||||
InternalFilterFacet.registerStreams();
|
||||
InternalQueryFacet.registerStreams();
|
||||
InternalGeoDistanceFacet.registerStreams();
|
||||
InternalHistogramFacet.registerStreams();
|
||||
InternalRangeFacet.registerStreams();
|
||||
InternalStatisticalFacet.registerStreams();
|
||||
InternalTermsFacet.registerStreams();
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.facet.InternalFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -41,7 +40,7 @@ public class FilterFacetProcessor extends AbstractComponent implements FacetProc
|
||||
|
||||
@Inject public FilterFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalFilterFacet.STREAM, InternalFilterFacet.TYPE);
|
||||
InternalFilterFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -33,12 +33,22 @@ import java.io.IOException;
|
||||
*/
|
||||
public class InternalFilterFacet implements FilterFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "filter";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readFilterFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private long count;
|
||||
|
@ -28,7 +28,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.xcontent.geo.GeoPointFieldMapper;
|
||||
import org.elasticsearch.index.search.geo.GeoDistance;
|
||||
import org.elasticsearch.index.search.geo.GeoHashUtils;
|
||||
import org.elasticsearch.search.facet.*;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -42,7 +45,7 @@ public class GeoDistanceFacetProcessor extends AbstractComponent implements Face
|
||||
|
||||
@Inject public GeoDistanceFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalGeoDistanceFacet.STREAM, InternalGeoDistanceFacet.TYPE);
|
||||
InternalGeoDistanceFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -37,12 +37,22 @@ import java.util.List;
|
||||
*/
|
||||
public class InternalGeoDistanceFacet implements GeoDistanceFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "geoDistance";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readGeoDistanceFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
@ -28,7 +28,10 @@ import org.elasticsearch.common.trove.TLongLongHashMap;
|
||||
import org.elasticsearch.common.trove.TLongLongIterator;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.facet.*;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -42,7 +45,7 @@ public class HistogramFacetProcessor extends AbstractComponent implements FacetP
|
||||
|
||||
@Inject public HistogramFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalHistogramFacet.STREAM, InternalHistogramFacet.TYPE);
|
||||
InternalHistogramFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -41,12 +41,22 @@ import java.util.TreeSet;
|
||||
*/
|
||||
public class InternalHistogramFacet implements HistogramFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "histogram";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readHistogramFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String keyFieldName;
|
||||
|
@ -33,12 +33,22 @@ import java.io.IOException;
|
||||
*/
|
||||
public class InternalQueryFacet implements QueryFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "query";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readQueryFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private long count;
|
||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.facet.InternalFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -41,7 +40,7 @@ public class QueryFacetProcessor extends AbstractComponent implements FacetProce
|
||||
|
||||
@Inject public QueryFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalQueryFacet.STREAM, InternalQueryFacet.TYPE);
|
||||
InternalQueryFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -36,12 +36,22 @@ import java.util.List;
|
||||
*/
|
||||
public class InternalRangeFacet implements RangeFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "range";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readRangeFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String keyFieldName;
|
||||
|
@ -25,7 +25,10 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.search.facet.*;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -39,7 +42,7 @@ public class RangeFacetProcessor extends AbstractComponent implements FacetProce
|
||||
|
||||
@Inject public RangeFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalRangeFacet.STREAM, InternalRangeFacet.TYPE);
|
||||
InternalRangeFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -33,12 +33,22 @@ import java.io.IOException;
|
||||
*/
|
||||
public class InternalStatisticalFacet implements StatisticalFacet, InternalFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "statistical";
|
||||
|
||||
public static void registerStreams() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readStatisticalFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
@ -24,7 +24,10 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.facet.*;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -38,7 +41,7 @@ public class StatisticalFacetProcessor extends AbstractComponent implements Face
|
||||
|
||||
@Inject public StatisticalFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalStatisticalFacet.STREAM, InternalStatisticalFacet.TYPE);
|
||||
InternalStatisticalFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
|
@ -20,13 +20,29 @@
|
||||
package org.elasticsearch.search.facet.terms;
|
||||
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.InternalFacet;
|
||||
import org.elasticsearch.search.facet.terms.doubles.InternalDoubleTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.floats.InternalFloatTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.ints.InternalIntTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.longs.InternalLongTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.shorts.InternalShortTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface InternalTermsFacet extends TermsFacet {
|
||||
public abstract class InternalTermsFacet implements TermsFacet, InternalFacet {
|
||||
|
||||
Facet reduce(String name, List<Facet> facets);
|
||||
public static void registerStreams() {
|
||||
InternalStringTermsFacet.registerStream();
|
||||
InternalLongTermsFacet.registerStream();
|
||||
InternalDoubleTermsFacet.registerStream();
|
||||
InternalIntTermsFacet.registerStream();
|
||||
InternalFloatTermsFacet.registerStream();
|
||||
InternalShortTermsFacet.registerStream();
|
||||
}
|
||||
|
||||
public abstract Facet reduce(String name, List<Facet> facets);
|
||||
}
|
||||
|
@ -43,6 +43,10 @@ public interface TermsFacet extends Facet, Iterable<TermsFacet.Entry> {
|
||||
|
||||
String getTerm();
|
||||
|
||||
Number termAsNumber();
|
||||
|
||||
Number getTermAsNumber();
|
||||
|
||||
int count();
|
||||
|
||||
int getCount();
|
||||
|
@ -26,13 +26,18 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.FacetCollector;
|
||||
import org.elasticsearch.search.facet.FacetProcessor;
|
||||
import org.elasticsearch.search.facet.InternalFacet;
|
||||
import org.elasticsearch.search.facet.terms.doubles.TermsDoubleFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.floats.TermsFloatFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.index.IndexNameFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.ints.TermsIntFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.longs.TermsLongFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.shorts.TermsShortFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.strings.FieldsTermsStringFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.strings.ScriptTermsStringFieldFacetCollector;
|
||||
import org.elasticsearch.search.facet.terms.strings.TermsStringFacetCollector;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
@ -49,7 +54,7 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
|
||||
|
||||
@Inject public TermsFacetProcessor(Settings settings) {
|
||||
super(settings);
|
||||
InternalFacet.Streams.registerStream(InternalStringTermsFacet.STREAM, InternalStringTermsFacet.TYPE);
|
||||
InternalTermsFacet.registerStreams();
|
||||
}
|
||||
|
||||
@Override public String[] types() {
|
||||
@ -127,6 +132,21 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
|
||||
if (field == null && fieldsNames == null && script != null) {
|
||||
return new ScriptTermsStringFieldFacetCollector(facetName, size, comparatorType, context, excluded, pattern, scriptLang, script, params);
|
||||
}
|
||||
|
||||
FieldMapper fieldMapper = context.mapperService().smartNameFieldMapper(field);
|
||||
if (fieldMapper != null) {
|
||||
if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) {
|
||||
return new TermsLongFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params);
|
||||
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.DOUBLE) {
|
||||
return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params);
|
||||
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.INT) {
|
||||
return new TermsIntFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params);
|
||||
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.FLOAT) {
|
||||
return new TermsFloatFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params);
|
||||
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.SHORT) {
|
||||
return new TermsShortFacetCollector(facetName, field, size, comparatorType, context, scriptLang, script, params);
|
||||
}
|
||||
}
|
||||
return new TermsStringFacetCollector(facetName, field, size, comparatorType, context, excluded, pattern, scriptLang, script, params);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,267 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.doubles;
|
||||
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TDoubleIntHashMap;
|
||||
import org.elasticsearch.common.trove.TDoubleIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalDoubleTermsFacet extends InternalTermsFacet {
|
||||
|
||||
private static final String STREAM_TYPE = "dTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class DoubleEntry implements Entry {
|
||||
|
||||
double term;
|
||||
int count;
|
||||
|
||||
public DoubleEntry(double term, int count) {
|
||||
this.term = term;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return Double.toString(term);
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count();
|
||||
}
|
||||
|
||||
@Override public int compareTo(Entry o) {
|
||||
double anotherVal = ((DoubleEntry) o).term;
|
||||
if (term < anotherVal) {
|
||||
return -1;
|
||||
}
|
||||
if (term == anotherVal) {
|
||||
int i = count - o.count();
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(this) - System.identityHashCode(o);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
int requiredSize;
|
||||
|
||||
Collection<DoubleEntry> entries = ImmutableList.of();
|
||||
|
||||
private ComparatorType comparatorType;
|
||||
|
||||
InternalDoubleTermsFacet() {
|
||||
}
|
||||
|
||||
public InternalDoubleTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection<DoubleEntry> entries) {
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.comparatorType = comparatorType;
|
||||
this.requiredSize = requiredSize;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String fieldName() {
|
||||
return this.fieldName;
|
||||
}
|
||||
|
||||
@Override public String getFieldName() {
|
||||
return fieldName();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override public String getType() {
|
||||
return type();
|
||||
}
|
||||
|
||||
@Override public ComparatorType comparatorType() {
|
||||
return comparatorType;
|
||||
}
|
||||
|
||||
@Override public ComparatorType getComparatorType() {
|
||||
return comparatorType();
|
||||
}
|
||||
|
||||
@Override public List<DoubleEntry> entries() {
|
||||
if (!(entries instanceof List)) {
|
||||
entries = ImmutableList.copyOf(entries);
|
||||
}
|
||||
return (List<DoubleEntry>) entries;
|
||||
}
|
||||
|
||||
@Override public List<DoubleEntry> getEntries() {
|
||||
return entries();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
|
||||
return (Iterator) entries.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<TDoubleIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TDoubleIntHashMap>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<TDoubleIntHashMap> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<TDoubleIntHashMap>(new TDoubleIntHashMap());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override public Facet reduce(String name, List<Facet> facets) {
|
||||
if (facets.size() == 1) {
|
||||
return facets.get(0);
|
||||
}
|
||||
InternalDoubleTermsFacet first = (InternalDoubleTermsFacet) facets.get(0);
|
||||
TDoubleIntHashMap aggregated = aggregateCache.get().get();
|
||||
aggregated.clear();
|
||||
|
||||
for (Facet facet : facets) {
|
||||
InternalDoubleTermsFacet mFacet = (InternalDoubleTermsFacet) facet;
|
||||
for (DoubleEntry entry : mFacet.entries) {
|
||||
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
|
||||
}
|
||||
}
|
||||
|
||||
BoundedTreeSet<DoubleEntry> ordered = new BoundedTreeSet<DoubleEntry>(first.comparatorType().comparator(), first.requiredSize);
|
||||
for (TDoubleIntIterator it = aggregated.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new DoubleEntry(it.key(), it.value()));
|
||||
}
|
||||
first.entries = ordered;
|
||||
return first;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _FIELD = new XContentBuilderString("_field");
|
||||
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
|
||||
static final XContentBuilderString TERM = new XContentBuilderString("term");
|
||||
static final XContentBuilderString COUNT = new XContentBuilderString("count");
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field(Fields._TYPE, TermsFacet.TYPE);
|
||||
builder.field(Fields._FIELD, fieldName);
|
||||
builder.startArray(Fields.TERMS);
|
||||
for (DoubleEntry entry : entries) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.TERM, entry.term);
|
||||
builder.field(Fields.COUNT, entry.count());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static InternalDoubleTermsFacet readTermsFacet(StreamInput in) throws IOException {
|
||||
InternalDoubleTermsFacet facet = new InternalDoubleTermsFacet();
|
||||
facet.readFrom(in);
|
||||
return facet;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readUTF();
|
||||
fieldName = in.readUTF();
|
||||
comparatorType = ComparatorType.fromId(in.readByte());
|
||||
requiredSize = in.readVInt();
|
||||
|
||||
int size = in.readVInt();
|
||||
entries = new ArrayList<DoubleEntry>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
entries.add(new DoubleEntry(in.readDouble(), in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(name);
|
||||
out.writeUTF(fieldName);
|
||||
out.writeByte(comparatorType.id());
|
||||
|
||||
out.writeVInt(requiredSize);
|
||||
|
||||
out.writeVInt(entries.size());
|
||||
for (DoubleEntry entry : entries) {
|
||||
out.writeDouble(entry.term);
|
||||
out.writeVInt(entry.count());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,213 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.doubles;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TDoubleIntHashMap;
|
||||
import org.elasticsearch.common.trove.TDoubleIntIterator;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsDoubleFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<Deque<TDoubleIntHashMap>>(new ArrayDeque<TDoubleIntHashMap>());
|
||||
}
|
||||
};
|
||||
|
||||
private final FieldDataCache fieldDataCache;
|
||||
|
||||
private final String fieldName;
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
private final int numberOfShards;
|
||||
|
||||
private final FieldDataType fieldDataType;
|
||||
|
||||
private DoubleFieldData fieldData;
|
||||
|
||||
private final StaticAggregatorValueProc aggregator;
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsDoubleFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
this.size = size;
|
||||
this.comparatorType = comparatorType;
|
||||
this.numberOfShards = context.numberOfShards();
|
||||
|
||||
this.fieldName = fieldName;
|
||||
|
||||
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
|
||||
if (smartMappers == null || !smartMappers.hasMapper()) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms double facet collector on it");
|
||||
} else {
|
||||
// add type filter if there is exact doc mapper associated with it
|
||||
if (smartMappers.hasDocMapper()) {
|
||||
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
|
||||
}
|
||||
|
||||
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.DOUBLE) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of double type, can't run terms double facet collector on it");
|
||||
}
|
||||
|
||||
this.indexFieldName = smartMappers.mapper().names().indexName();
|
||||
this.fieldDataType = smartMappers.mapper().fieldDataType();
|
||||
}
|
||||
|
||||
if (script != null) {
|
||||
this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService());
|
||||
} else {
|
||||
this.script = null;
|
||||
}
|
||||
|
||||
if (this.script == null) {
|
||||
aggregator = new StaticAggregatorValueProc(popFacets());
|
||||
} else {
|
||||
aggregator = new AggregatorValueProc(popFacets(), this.script);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
fieldData = (DoubleFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
|
||||
if (script != null) {
|
||||
script.setNextReader(reader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doCollect(int doc) throws IOException {
|
||||
fieldData.forEachValueInDoc(doc, aggregator);
|
||||
}
|
||||
|
||||
@Override public Facet facet() {
|
||||
TDoubleIntHashMap facets = aggregator.facets();
|
||||
if (facets.isEmpty()) {
|
||||
pushFacets(facets);
|
||||
return new InternalDoubleTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalDoubleTermsFacet.DoubleEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TDoubleIntIterator it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
|
||||
}
|
||||
pushFacets(facets);
|
||||
return new InternalDoubleTermsFacet(facetName, fieldName, comparatorType, size, ordered);
|
||||
}
|
||||
}
|
||||
|
||||
static TDoubleIntHashMap popFacets() {
|
||||
Deque<TDoubleIntHashMap> deque = cache.get().get();
|
||||
if (deque.isEmpty()) {
|
||||
deque.add(new TDoubleIntHashMap());
|
||||
}
|
||||
TDoubleIntHashMap facets = deque.pollFirst();
|
||||
facets.clear();
|
||||
return facets;
|
||||
}
|
||||
|
||||
static void pushFacets(TDoubleIntHashMap facets) {
|
||||
facets.clear();
|
||||
Deque<TDoubleIntHashMap> deque = cache.get().get();
|
||||
if (deque != null) {
|
||||
deque.add(facets);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AggregatorValueProc extends StaticAggregatorValueProc {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
private final Map<String, Object> scriptParams;
|
||||
|
||||
public AggregatorValueProc(TDoubleIntHashMap facets, SearchScript script) {
|
||||
super(facets);
|
||||
this.script = script;
|
||||
if (script != null) {
|
||||
scriptParams = Maps.newHashMapWithExpectedSize(4);
|
||||
} else {
|
||||
scriptParams = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, double value) {
|
||||
if (script != null) {
|
||||
scriptParams.put("term", value);
|
||||
Object scriptValue = script.execute(docId, scriptParams);
|
||||
if (scriptValue == null) {
|
||||
return;
|
||||
}
|
||||
if (scriptValue instanceof Boolean) {
|
||||
if (!((Boolean) scriptValue)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
value = ((Number) scriptValue).doubleValue();
|
||||
}
|
||||
}
|
||||
super.onValue(docId, value);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StaticAggregatorValueProc implements DoubleFieldData.ValueInDocProc {
|
||||
|
||||
private final TDoubleIntHashMap facets;
|
||||
|
||||
public StaticAggregatorValueProc(TDoubleIntHashMap facets) {
|
||||
this.facets = facets;
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, double value) {
|
||||
facets.adjustOrPutValue(value, 1, 1);
|
||||
}
|
||||
|
||||
public final TDoubleIntHashMap facets() {
|
||||
return facets;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,267 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.floats;
|
||||
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TFloatIntHashMap;
|
||||
import org.elasticsearch.common.trove.TFloatIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalFloatTermsFacet extends InternalTermsFacet {
|
||||
|
||||
private static final String STREAM_TYPE = "fTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class FloatEntry implements Entry {
|
||||
|
||||
float term;
|
||||
int count;
|
||||
|
||||
public FloatEntry(float term, int count) {
|
||||
this.term = term;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return Float.toString(term);
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count();
|
||||
}
|
||||
|
||||
@Override public int compareTo(Entry o) {
|
||||
float anotherVal = ((FloatEntry) o).term;
|
||||
if (term < anotherVal) {
|
||||
return -1;
|
||||
}
|
||||
if (term == anotherVal) {
|
||||
int i = count - o.count();
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(this) - System.identityHashCode(o);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
int requiredSize;
|
||||
|
||||
Collection<FloatEntry> entries = ImmutableList.of();
|
||||
|
||||
private ComparatorType comparatorType;
|
||||
|
||||
InternalFloatTermsFacet() {
|
||||
}
|
||||
|
||||
public InternalFloatTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection<FloatEntry> entries) {
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.comparatorType = comparatorType;
|
||||
this.requiredSize = requiredSize;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String fieldName() {
|
||||
return this.fieldName;
|
||||
}
|
||||
|
||||
@Override public String getFieldName() {
|
||||
return fieldName();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override public String getType() {
|
||||
return type();
|
||||
}
|
||||
|
||||
@Override public ComparatorType comparatorType() {
|
||||
return comparatorType;
|
||||
}
|
||||
|
||||
@Override public ComparatorType getComparatorType() {
|
||||
return comparatorType();
|
||||
}
|
||||
|
||||
@Override public List<FloatEntry> entries() {
|
||||
if (!(entries instanceof List)) {
|
||||
entries = ImmutableList.copyOf(entries);
|
||||
}
|
||||
return (List<FloatEntry>) entries;
|
||||
}
|
||||
|
||||
@Override public List<FloatEntry> getEntries() {
|
||||
return entries();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
|
||||
return (Iterator) entries.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<TFloatIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TFloatIntHashMap>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<TFloatIntHashMap> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<TFloatIntHashMap>(new TFloatIntHashMap());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override public Facet reduce(String name, List<Facet> facets) {
|
||||
if (facets.size() == 1) {
|
||||
return facets.get(0);
|
||||
}
|
||||
InternalFloatTermsFacet first = (InternalFloatTermsFacet) facets.get(0);
|
||||
TFloatIntHashMap aggregated = aggregateCache.get().get();
|
||||
aggregated.clear();
|
||||
|
||||
for (Facet facet : facets) {
|
||||
InternalFloatTermsFacet mFacet = (InternalFloatTermsFacet) facet;
|
||||
for (FloatEntry entry : mFacet.entries) {
|
||||
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
|
||||
}
|
||||
}
|
||||
|
||||
BoundedTreeSet<FloatEntry> ordered = new BoundedTreeSet<FloatEntry>(first.comparatorType().comparator(), first.requiredSize);
|
||||
for (TFloatIntIterator it = aggregated.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new FloatEntry(it.key(), it.value()));
|
||||
}
|
||||
first.entries = ordered;
|
||||
return first;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _FIELD = new XContentBuilderString("_field");
|
||||
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
|
||||
static final XContentBuilderString TERM = new XContentBuilderString("term");
|
||||
static final XContentBuilderString COUNT = new XContentBuilderString("count");
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field(Fields._TYPE, TermsFacet.TYPE);
|
||||
builder.field(Fields._FIELD, fieldName);
|
||||
builder.startArray(Fields.TERMS);
|
||||
for (FloatEntry entry : entries) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.TERM, entry.term);
|
||||
builder.field(Fields.COUNT, entry.count());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static InternalFloatTermsFacet readTermsFacet(StreamInput in) throws IOException {
|
||||
InternalFloatTermsFacet facet = new InternalFloatTermsFacet();
|
||||
facet.readFrom(in);
|
||||
return facet;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readUTF();
|
||||
fieldName = in.readUTF();
|
||||
comparatorType = ComparatorType.fromId(in.readByte());
|
||||
requiredSize = in.readVInt();
|
||||
|
||||
int size = in.readVInt();
|
||||
entries = new ArrayList<FloatEntry>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
entries.add(new FloatEntry(in.readFloat(), in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(name);
|
||||
out.writeUTF(fieldName);
|
||||
out.writeByte(comparatorType.id());
|
||||
|
||||
out.writeVInt(requiredSize);
|
||||
|
||||
out.writeVInt(entries.size());
|
||||
for (FloatEntry entry : entries) {
|
||||
out.writeFloat(entry.term);
|
||||
out.writeVInt(entry.count());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,213 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.floats;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TFloatIntHashMap;
|
||||
import org.elasticsearch.common.trove.TFloatIntIterator;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.field.data.floats.FloatFieldData;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsFloatFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<Deque<TFloatIntHashMap>>(new ArrayDeque<TFloatIntHashMap>());
|
||||
}
|
||||
};
|
||||
|
||||
private final FieldDataCache fieldDataCache;
|
||||
|
||||
private final String fieldName;
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
private final int numberOfShards;
|
||||
|
||||
private final FieldDataType fieldDataType;
|
||||
|
||||
private FloatFieldData fieldData;
|
||||
|
||||
private final StaticAggregatorValueProc aggregator;
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsFloatFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
this.size = size;
|
||||
this.comparatorType = comparatorType;
|
||||
this.numberOfShards = context.numberOfShards();
|
||||
|
||||
this.fieldName = fieldName;
|
||||
|
||||
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
|
||||
if (smartMappers == null || !smartMappers.hasMapper()) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms float facet collector on it");
|
||||
} else {
|
||||
// add type filter if there is exact doc mapper associated with it
|
||||
if (smartMappers.hasDocMapper()) {
|
||||
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
|
||||
}
|
||||
|
||||
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.FLOAT) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't is not of float type, can't run terms float facet collector on it");
|
||||
}
|
||||
|
||||
this.indexFieldName = smartMappers.mapper().names().indexName();
|
||||
this.fieldDataType = smartMappers.mapper().fieldDataType();
|
||||
}
|
||||
|
||||
if (script != null) {
|
||||
this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService());
|
||||
} else {
|
||||
this.script = null;
|
||||
}
|
||||
|
||||
if (this.script == null) {
|
||||
aggregator = new StaticAggregatorValueProc(popFacets());
|
||||
} else {
|
||||
aggregator = new AggregatorValueProc(popFacets(), this.script);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
fieldData = (FloatFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
|
||||
if (script != null) {
|
||||
script.setNextReader(reader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doCollect(int doc) throws IOException {
|
||||
fieldData.forEachValueInDoc(doc, aggregator);
|
||||
}
|
||||
|
||||
@Override public Facet facet() {
|
||||
TFloatIntHashMap facets = aggregator.facets();
|
||||
if (facets.isEmpty()) {
|
||||
pushFacets(facets);
|
||||
return new InternalFloatTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalFloatTermsFacet.FloatEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalFloatTermsFacet.FloatEntry> ordered = new BoundedTreeSet<InternalFloatTermsFacet.FloatEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TFloatIntIterator it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalFloatTermsFacet.FloatEntry(it.key(), it.value()));
|
||||
}
|
||||
pushFacets(facets);
|
||||
return new InternalFloatTermsFacet(facetName, fieldName, comparatorType, size, ordered);
|
||||
}
|
||||
}
|
||||
|
||||
static TFloatIntHashMap popFacets() {
|
||||
Deque<TFloatIntHashMap> deque = cache.get().get();
|
||||
if (deque.isEmpty()) {
|
||||
deque.add(new TFloatIntHashMap());
|
||||
}
|
||||
TFloatIntHashMap facets = deque.pollFirst();
|
||||
facets.clear();
|
||||
return facets;
|
||||
}
|
||||
|
||||
static void pushFacets(TFloatIntHashMap facets) {
|
||||
facets.clear();
|
||||
Deque<TFloatIntHashMap> deque = cache.get().get();
|
||||
if (deque != null) {
|
||||
deque.add(facets);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AggregatorValueProc extends StaticAggregatorValueProc {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
private final Map<String, Object> scriptParams;
|
||||
|
||||
public AggregatorValueProc(TFloatIntHashMap facets, SearchScript script) {
|
||||
super(facets);
|
||||
this.script = script;
|
||||
if (script != null) {
|
||||
scriptParams = Maps.newHashMapWithExpectedSize(4);
|
||||
} else {
|
||||
scriptParams = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, float value) {
|
||||
if (script != null) {
|
||||
scriptParams.put("term", value);
|
||||
Object scriptValue = script.execute(docId, scriptParams);
|
||||
if (scriptValue == null) {
|
||||
return;
|
||||
}
|
||||
if (scriptValue instanceof Boolean) {
|
||||
if (!((Boolean) scriptValue)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
value = ((Number) scriptValue).floatValue();
|
||||
}
|
||||
}
|
||||
super.onValue(docId, value);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StaticAggregatorValueProc implements FloatFieldData.ValueInDocProc {
|
||||
|
||||
private final TFloatIntHashMap facets;
|
||||
|
||||
public StaticAggregatorValueProc(TFloatIntHashMap facets) {
|
||||
this.facets = facets;
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, float value) {
|
||||
facets.adjustOrPutValue(value, 1, 1);
|
||||
}
|
||||
|
||||
public final TFloatIntHashMap facets() {
|
||||
return facets;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,264 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.ints;
|
||||
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TIntIntHashMap;
|
||||
import org.elasticsearch.common.trove.TIntIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalIntTermsFacet extends InternalTermsFacet {
|
||||
|
||||
private static final String STREAM_TYPE = "iTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class IntEntry implements Entry {
|
||||
|
||||
int term;
|
||||
int count;
|
||||
|
||||
public IntEntry(int term, int count) {
|
||||
this.term = term;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return Integer.toString(term);
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count();
|
||||
}
|
||||
|
||||
@Override public int compareTo(Entry o) {
|
||||
int anotherVal = ((IntEntry) o).term;
|
||||
int i = term - anotherVal;
|
||||
if (i == 0) {
|
||||
i = count - o.count();
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(this) - System.identityHashCode(o);
|
||||
}
|
||||
}
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
int requiredSize;
|
||||
|
||||
Collection<IntEntry> entries = ImmutableList.of();
|
||||
|
||||
private ComparatorType comparatorType;
|
||||
|
||||
InternalIntTermsFacet() {
|
||||
}
|
||||
|
||||
public InternalIntTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection<IntEntry> entries) {
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.comparatorType = comparatorType;
|
||||
this.requiredSize = requiredSize;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String fieldName() {
|
||||
return this.fieldName;
|
||||
}
|
||||
|
||||
@Override public String getFieldName() {
|
||||
return fieldName();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override public String getType() {
|
||||
return type();
|
||||
}
|
||||
|
||||
@Override public ComparatorType comparatorType() {
|
||||
return comparatorType;
|
||||
}
|
||||
|
||||
@Override public ComparatorType getComparatorType() {
|
||||
return comparatorType();
|
||||
}
|
||||
|
||||
@Override public List<IntEntry> entries() {
|
||||
if (!(entries instanceof List)) {
|
||||
entries = ImmutableList.copyOf(entries);
|
||||
}
|
||||
return (List<IntEntry>) entries;
|
||||
}
|
||||
|
||||
@Override public List<IntEntry> getEntries() {
|
||||
return entries();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
|
||||
return (Iterator) entries.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<TIntIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TIntIntHashMap>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<TIntIntHashMap> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<TIntIntHashMap>(new TIntIntHashMap());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override public Facet reduce(String name, List<Facet> facets) {
|
||||
if (facets.size() == 1) {
|
||||
return facets.get(0);
|
||||
}
|
||||
InternalIntTermsFacet first = (InternalIntTermsFacet) facets.get(0);
|
||||
TIntIntHashMap aggregated = aggregateCache.get().get();
|
||||
aggregated.clear();
|
||||
|
||||
for (Facet facet : facets) {
|
||||
InternalIntTermsFacet mFacet = (InternalIntTermsFacet) facet;
|
||||
for (IntEntry entry : mFacet.entries) {
|
||||
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
|
||||
}
|
||||
}
|
||||
|
||||
BoundedTreeSet<IntEntry> ordered = new BoundedTreeSet<IntEntry>(first.comparatorType().comparator(), first.requiredSize);
|
||||
for (TIntIntIterator it = aggregated.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new IntEntry(it.key(), it.value()));
|
||||
}
|
||||
first.entries = ordered;
|
||||
return first;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _FIELD = new XContentBuilderString("_field");
|
||||
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
|
||||
static final XContentBuilderString TERM = new XContentBuilderString("term");
|
||||
static final XContentBuilderString COUNT = new XContentBuilderString("count");
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field(Fields._TYPE, TermsFacet.TYPE);
|
||||
builder.field(Fields._FIELD, fieldName);
|
||||
builder.startArray(Fields.TERMS);
|
||||
for (IntEntry entry : entries) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.TERM, entry.term);
|
||||
builder.field(Fields.COUNT, entry.count());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static InternalIntTermsFacet readTermsFacet(StreamInput in) throws IOException {
|
||||
InternalIntTermsFacet facet = new InternalIntTermsFacet();
|
||||
facet.readFrom(in);
|
||||
return facet;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readUTF();
|
||||
fieldName = in.readUTF();
|
||||
comparatorType = ComparatorType.fromId(in.readByte());
|
||||
requiredSize = in.readVInt();
|
||||
|
||||
int size = in.readVInt();
|
||||
entries = new ArrayList<IntEntry>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
entries.add(new IntEntry(in.readInt(), in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(name);
|
||||
out.writeUTF(fieldName);
|
||||
out.writeByte(comparatorType.id());
|
||||
|
||||
out.writeVInt(requiredSize);
|
||||
|
||||
out.writeVInt(entries.size());
|
||||
for (IntEntry entry : entries) {
|
||||
out.writeInt(entry.term);
|
||||
out.writeVInt(entry.count());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,213 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.ints;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TIntIntHashMap;
|
||||
import org.elasticsearch.common.trove.TIntIntIterator;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.field.data.ints.IntFieldData;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsIntFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Deque<TIntIntHashMap>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<Deque<TIntIntHashMap>>(new ArrayDeque<TIntIntHashMap>());
|
||||
}
|
||||
};
|
||||
|
||||
private final FieldDataCache fieldDataCache;
|
||||
|
||||
private final String fieldName;
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
private final int numberOfShards;
|
||||
|
||||
private final FieldDataType fieldDataType;
|
||||
|
||||
private IntFieldData fieldData;
|
||||
|
||||
private final StaticAggregatorValueProc aggregator;
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsIntFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
this.size = size;
|
||||
this.comparatorType = comparatorType;
|
||||
this.numberOfShards = context.numberOfShards();
|
||||
|
||||
this.fieldName = fieldName;
|
||||
|
||||
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
|
||||
if (smartMappers == null || !smartMappers.hasMapper()) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms int facet collector on it");
|
||||
} else {
|
||||
// add type filter if there is exact doc mapper associated with it
|
||||
if (smartMappers.hasDocMapper()) {
|
||||
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
|
||||
}
|
||||
|
||||
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.INT) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of int type, can't run terms int facet collector on it");
|
||||
}
|
||||
|
||||
this.indexFieldName = smartMappers.mapper().names().indexName();
|
||||
this.fieldDataType = smartMappers.mapper().fieldDataType();
|
||||
}
|
||||
|
||||
if (script != null) {
|
||||
this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService());
|
||||
} else {
|
||||
this.script = null;
|
||||
}
|
||||
|
||||
if (this.script == null) {
|
||||
aggregator = new StaticAggregatorValueProc(popFacets());
|
||||
} else {
|
||||
aggregator = new AggregatorValueProc(popFacets(), this.script);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
fieldData = (IntFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
|
||||
if (script != null) {
|
||||
script.setNextReader(reader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doCollect(int doc) throws IOException {
|
||||
fieldData.forEachValueInDoc(doc, aggregator);
|
||||
}
|
||||
|
||||
@Override public Facet facet() {
|
||||
TIntIntHashMap facets = aggregator.facets();
|
||||
if (facets.isEmpty()) {
|
||||
pushFacets(facets);
|
||||
return new InternalIntTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalIntTermsFacet.IntEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalIntTermsFacet.IntEntry> ordered = new BoundedTreeSet<InternalIntTermsFacet.IntEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TIntIntIterator it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalIntTermsFacet.IntEntry(it.key(), it.value()));
|
||||
}
|
||||
pushFacets(facets);
|
||||
return new InternalIntTermsFacet(facetName, fieldName, comparatorType, size, ordered);
|
||||
}
|
||||
}
|
||||
|
||||
static TIntIntHashMap popFacets() {
|
||||
Deque<TIntIntHashMap> deque = cache.get().get();
|
||||
if (deque.isEmpty()) {
|
||||
deque.add(new TIntIntHashMap());
|
||||
}
|
||||
TIntIntHashMap facets = deque.pollFirst();
|
||||
facets.clear();
|
||||
return facets;
|
||||
}
|
||||
|
||||
static void pushFacets(TIntIntHashMap facets) {
|
||||
facets.clear();
|
||||
Deque<TIntIntHashMap> deque = cache.get().get();
|
||||
if (deque != null) {
|
||||
deque.add(facets);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AggregatorValueProc extends StaticAggregatorValueProc {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
private final Map<String, Object> scriptParams;
|
||||
|
||||
public AggregatorValueProc(TIntIntHashMap facets, SearchScript script) {
|
||||
super(facets);
|
||||
this.script = script;
|
||||
if (script != null) {
|
||||
scriptParams = Maps.newHashMapWithExpectedSize(4);
|
||||
} else {
|
||||
scriptParams = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, int value) {
|
||||
if (script != null) {
|
||||
scriptParams.put("term", value);
|
||||
Object scriptValue = script.execute(docId, scriptParams);
|
||||
if (scriptValue == null) {
|
||||
return;
|
||||
}
|
||||
if (scriptValue instanceof Boolean) {
|
||||
if (!((Boolean) scriptValue)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
value = ((Number) scriptValue).intValue();
|
||||
}
|
||||
}
|
||||
super.onValue(docId, value);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StaticAggregatorValueProc implements IntFieldData.ValueInDocProc {
|
||||
|
||||
private final TIntIntHashMap facets;
|
||||
|
||||
public StaticAggregatorValueProc(TIntIntHashMap facets) {
|
||||
this.facets = facets;
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, int value) {
|
||||
facets.adjustOrPutValue(value, 1, 1);
|
||||
}
|
||||
|
||||
public final TIntIntHashMap facets() {
|
||||
return facets;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,267 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.longs;
|
||||
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TLongIntHashMap;
|
||||
import org.elasticsearch.common.trove.TLongIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalLongTermsFacet extends InternalTermsFacet {
|
||||
|
||||
private static final String STREAM_TYPE = "lTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class LongEntry implements Entry {
|
||||
|
||||
long term;
|
||||
int count;
|
||||
|
||||
public LongEntry(long term, int count) {
|
||||
this.term = term;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return Long.toString(term);
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count();
|
||||
}
|
||||
|
||||
@Override public int compareTo(Entry o) {
|
||||
long anotherVal = ((LongEntry) o).term;
|
||||
if (term < anotherVal) {
|
||||
return -1;
|
||||
}
|
||||
if (term == anotherVal) {
|
||||
int i = count - o.count();
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(this) - System.identityHashCode(o);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
int requiredSize;
|
||||
|
||||
Collection<LongEntry> entries = ImmutableList.of();
|
||||
|
||||
private ComparatorType comparatorType;
|
||||
|
||||
InternalLongTermsFacet() {
|
||||
}
|
||||
|
||||
public InternalLongTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection<LongEntry> entries) {
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.comparatorType = comparatorType;
|
||||
this.requiredSize = requiredSize;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String fieldName() {
|
||||
return this.fieldName;
|
||||
}
|
||||
|
||||
@Override public String getFieldName() {
|
||||
return fieldName();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override public String getType() {
|
||||
return type();
|
||||
}
|
||||
|
||||
@Override public ComparatorType comparatorType() {
|
||||
return comparatorType;
|
||||
}
|
||||
|
||||
@Override public ComparatorType getComparatorType() {
|
||||
return comparatorType();
|
||||
}
|
||||
|
||||
@Override public List<LongEntry> entries() {
|
||||
if (!(entries instanceof List)) {
|
||||
entries = ImmutableList.copyOf(entries);
|
||||
}
|
||||
return (List<LongEntry>) entries;
|
||||
}
|
||||
|
||||
@Override public List<LongEntry> getEntries() {
|
||||
return entries();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
|
||||
return (Iterator) entries.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<TLongIntHashMap> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<TLongIntHashMap>(new TLongIntHashMap());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override public Facet reduce(String name, List<Facet> facets) {
|
||||
if (facets.size() == 1) {
|
||||
return facets.get(0);
|
||||
}
|
||||
InternalLongTermsFacet first = (InternalLongTermsFacet) facets.get(0);
|
||||
TLongIntHashMap aggregated = aggregateCache.get().get();
|
||||
aggregated.clear();
|
||||
|
||||
for (Facet facet : facets) {
|
||||
InternalLongTermsFacet mFacet = (InternalLongTermsFacet) facet;
|
||||
for (LongEntry entry : mFacet.entries) {
|
||||
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
|
||||
}
|
||||
}
|
||||
|
||||
BoundedTreeSet<LongEntry> ordered = new BoundedTreeSet<LongEntry>(first.comparatorType().comparator(), first.requiredSize);
|
||||
for (TLongIntIterator it = aggregated.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new LongEntry(it.key(), it.value()));
|
||||
}
|
||||
first.entries = ordered;
|
||||
return first;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _FIELD = new XContentBuilderString("_field");
|
||||
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
|
||||
static final XContentBuilderString TERM = new XContentBuilderString("term");
|
||||
static final XContentBuilderString COUNT = new XContentBuilderString("count");
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field(Fields._TYPE, TermsFacet.TYPE);
|
||||
builder.field(Fields._FIELD, fieldName);
|
||||
builder.startArray(Fields.TERMS);
|
||||
for (LongEntry entry : entries) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.TERM, entry.term);
|
||||
builder.field(Fields.COUNT, entry.count());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static InternalLongTermsFacet readTermsFacet(StreamInput in) throws IOException {
|
||||
InternalLongTermsFacet facet = new InternalLongTermsFacet();
|
||||
facet.readFrom(in);
|
||||
return facet;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readUTF();
|
||||
fieldName = in.readUTF();
|
||||
comparatorType = ComparatorType.fromId(in.readByte());
|
||||
requiredSize = in.readVInt();
|
||||
|
||||
int size = in.readVInt();
|
||||
entries = new ArrayList<LongEntry>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
entries.add(new LongEntry(in.readLong(), in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(name);
|
||||
out.writeUTF(fieldName);
|
||||
out.writeByte(comparatorType.id());
|
||||
|
||||
out.writeVInt(requiredSize);
|
||||
|
||||
out.writeVInt(entries.size());
|
||||
for (LongEntry entry : entries) {
|
||||
out.writeLong(entry.term);
|
||||
out.writeVInt(entry.count());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,214 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.longs;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TLongIntHashMap;
|
||||
import org.elasticsearch.common.trove.TLongIntIterator;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.field.data.longs.LongFieldData;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsLongFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Deque<TLongIntHashMap>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>(new ArrayDeque<TLongIntHashMap>());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
private final FieldDataCache fieldDataCache;
|
||||
|
||||
private final String fieldName;
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
private final int numberOfShards;
|
||||
|
||||
private final FieldDataType fieldDataType;
|
||||
|
||||
private LongFieldData fieldData;
|
||||
|
||||
private final StaticAggregatorValueProc aggregator;
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsLongFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
this.size = size;
|
||||
this.comparatorType = comparatorType;
|
||||
this.numberOfShards = context.numberOfShards();
|
||||
|
||||
this.fieldName = fieldName;
|
||||
|
||||
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
|
||||
if (smartMappers == null || !smartMappers.hasMapper()) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it");
|
||||
} else {
|
||||
// add type filter if there is exact doc mapper associated with it
|
||||
if (smartMappers.hasDocMapper()) {
|
||||
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
|
||||
}
|
||||
|
||||
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it");
|
||||
}
|
||||
|
||||
this.indexFieldName = smartMappers.mapper().names().indexName();
|
||||
this.fieldDataType = smartMappers.mapper().fieldDataType();
|
||||
}
|
||||
|
||||
if (script != null) {
|
||||
this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService());
|
||||
} else {
|
||||
this.script = null;
|
||||
}
|
||||
|
||||
if (this.script == null) {
|
||||
aggregator = new StaticAggregatorValueProc(popFacets());
|
||||
} else {
|
||||
aggregator = new AggregatorValueProc(popFacets(), this.script);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
|
||||
if (script != null) {
|
||||
script.setNextReader(reader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doCollect(int doc) throws IOException {
|
||||
fieldData.forEachValueInDoc(doc, aggregator);
|
||||
}
|
||||
|
||||
@Override public Facet facet() {
|
||||
TLongIntHashMap facets = aggregator.facets();
|
||||
if (facets.isEmpty()) {
|
||||
pushFacets(facets);
|
||||
return new InternalLongTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalLongTermsFacet.LongEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
|
||||
}
|
||||
pushFacets(facets);
|
||||
return new InternalLongTermsFacet(facetName, fieldName, comparatorType, size, ordered);
|
||||
}
|
||||
}
|
||||
|
||||
static TLongIntHashMap popFacets() {
|
||||
Deque<TLongIntHashMap> deque = cache.get().get();
|
||||
if (deque.isEmpty()) {
|
||||
deque.add(new TLongIntHashMap());
|
||||
}
|
||||
TLongIntHashMap facets = deque.pollFirst();
|
||||
facets.clear();
|
||||
return facets;
|
||||
}
|
||||
|
||||
static void pushFacets(TLongIntHashMap facets) {
|
||||
facets.clear();
|
||||
Deque<TLongIntHashMap> deque = cache.get().get();
|
||||
if (deque != null) {
|
||||
deque.add(facets);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AggregatorValueProc extends StaticAggregatorValueProc {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
private final Map<String, Object> scriptParams;
|
||||
|
||||
public AggregatorValueProc(TLongIntHashMap facets, SearchScript script) {
|
||||
super(facets);
|
||||
this.script = script;
|
||||
if (script != null) {
|
||||
scriptParams = Maps.newHashMapWithExpectedSize(4);
|
||||
} else {
|
||||
scriptParams = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, long value) {
|
||||
if (script != null) {
|
||||
scriptParams.put("term", value);
|
||||
Object scriptValue = script.execute(docId, scriptParams);
|
||||
if (scriptValue == null) {
|
||||
return;
|
||||
}
|
||||
if (scriptValue instanceof Boolean) {
|
||||
if (!((Boolean) scriptValue)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
value = ((Number) scriptValue).longValue();
|
||||
}
|
||||
}
|
||||
super.onValue(docId, value);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StaticAggregatorValueProc implements LongFieldData.ValueInDocProc {
|
||||
|
||||
private final TLongIntHashMap facets;
|
||||
|
||||
public StaticAggregatorValueProc(TLongIntHashMap facets) {
|
||||
this.facets = facets;
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, long value) {
|
||||
facets.adjustOrPutValue(value, 1, 1);
|
||||
}
|
||||
|
||||
public final TLongIntHashMap facets() {
|
||||
return facets;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,264 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.shorts;
|
||||
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TShortIntHashMap;
|
||||
import org.elasticsearch.common.trove.TShortIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalShortTermsFacet extends InternalTermsFacet {
|
||||
|
||||
private static final String STREAM_TYPE = "sTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class ShortEntry implements Entry {
|
||||
|
||||
short term;
|
||||
int count;
|
||||
|
||||
public ShortEntry(short term, int count) {
|
||||
this.term = term;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public String term() {
|
||||
return Short.toString(term);
|
||||
}
|
||||
|
||||
public String getTerm() {
|
||||
return term();
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count();
|
||||
}
|
||||
|
||||
@Override public int compareTo(Entry o) {
|
||||
short anotherVal = ((ShortEntry) o).term;
|
||||
int i = term - anotherVal;
|
||||
if (i == 0) {
|
||||
i = count - o.count();
|
||||
if (i == 0) {
|
||||
i = System.identityHashCode(this) - System.identityHashCode(o);
|
||||
}
|
||||
}
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
||||
private String fieldName;
|
||||
|
||||
int requiredSize;
|
||||
|
||||
Collection<ShortEntry> entries = ImmutableList.of();
|
||||
|
||||
private ComparatorType comparatorType;
|
||||
|
||||
InternalShortTermsFacet() {
|
||||
}
|
||||
|
||||
public InternalShortTermsFacet(String name, String fieldName, ComparatorType comparatorType, int requiredSize, Collection<ShortEntry> entries) {
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.comparatorType = comparatorType;
|
||||
this.requiredSize = requiredSize;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
@Override public String name() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
@Override public String fieldName() {
|
||||
return this.fieldName;
|
||||
}
|
||||
|
||||
@Override public String getFieldName() {
|
||||
return fieldName();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override public String getType() {
|
||||
return type();
|
||||
}
|
||||
|
||||
@Override public ComparatorType comparatorType() {
|
||||
return comparatorType;
|
||||
}
|
||||
|
||||
@Override public ComparatorType getComparatorType() {
|
||||
return comparatorType();
|
||||
}
|
||||
|
||||
@Override public List<ShortEntry> entries() {
|
||||
if (!(entries instanceof List)) {
|
||||
entries = ImmutableList.copyOf(entries);
|
||||
}
|
||||
return (List<ShortEntry>) entries;
|
||||
}
|
||||
|
||||
@Override public List<ShortEntry> getEntries() {
|
||||
return entries();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
|
||||
return (Iterator) entries.iterator();
|
||||
}
|
||||
|
||||
|
||||
private static ThreadLocal<ThreadLocals.CleanableValue<TShortIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TShortIntHashMap>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<TShortIntHashMap> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<TShortIntHashMap>(new TShortIntHashMap());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override public Facet reduce(String name, List<Facet> facets) {
|
||||
if (facets.size() == 1) {
|
||||
return facets.get(0);
|
||||
}
|
||||
InternalShortTermsFacet first = (InternalShortTermsFacet) facets.get(0);
|
||||
TShortIntHashMap aggregated = aggregateCache.get().get();
|
||||
aggregated.clear();
|
||||
|
||||
for (Facet facet : facets) {
|
||||
InternalShortTermsFacet mFacet = (InternalShortTermsFacet) facet;
|
||||
for (ShortEntry entry : mFacet.entries) {
|
||||
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
|
||||
}
|
||||
}
|
||||
|
||||
BoundedTreeSet<ShortEntry> ordered = new BoundedTreeSet<ShortEntry>(first.comparatorType().comparator(), first.requiredSize);
|
||||
for (TShortIntIterator it = aggregated.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new ShortEntry(it.key(), it.value()));
|
||||
}
|
||||
first.entries = ordered;
|
||||
return first;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _FIELD = new XContentBuilderString("_field");
|
||||
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
|
||||
static final XContentBuilderString TERM = new XContentBuilderString("term");
|
||||
static final XContentBuilderString COUNT = new XContentBuilderString("count");
|
||||
}
|
||||
|
||||
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field(Fields._TYPE, TermsFacet.TYPE);
|
||||
builder.field(Fields._FIELD, fieldName);
|
||||
builder.startArray(Fields.TERMS);
|
||||
for (ShortEntry entry : entries) {
|
||||
builder.startObject();
|
||||
builder.field(Fields.TERM, entry.term);
|
||||
builder.field(Fields.COUNT, entry.count());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static InternalShortTermsFacet readTermsFacet(StreamInput in) throws IOException {
|
||||
InternalShortTermsFacet facet = new InternalShortTermsFacet();
|
||||
facet.readFrom(in);
|
||||
return facet;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readUTF();
|
||||
fieldName = in.readUTF();
|
||||
comparatorType = ComparatorType.fromId(in.readByte());
|
||||
requiredSize = in.readVInt();
|
||||
|
||||
int size = in.readVInt();
|
||||
entries = new ArrayList<ShortEntry>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
entries.add(new ShortEntry(in.readShort(), in.readVInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(name);
|
||||
out.writeUTF(fieldName);
|
||||
out.writeByte(comparatorType.id());
|
||||
|
||||
out.writeVInt(requiredSize);
|
||||
|
||||
out.writeVInt(entries.size());
|
||||
for (ShortEntry entry : entries) {
|
||||
out.writeShort(entry.term);
|
||||
out.writeVInt(entry.count());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,213 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.facet.terms.shorts;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.collect.BoundedTreeSet;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.trove.TShortIntHashMap;
|
||||
import org.elasticsearch.common.trove.TShortIntIterator;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
import org.elasticsearch.index.field.data.shorts.ShortFieldData;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TermsShortFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<Deque<TShortIntHashMap>> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<Deque<TShortIntHashMap>>(new ArrayDeque<TShortIntHashMap>());
|
||||
}
|
||||
};
|
||||
|
||||
private final FieldDataCache fieldDataCache;
|
||||
|
||||
private final String fieldName;
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
private final int numberOfShards;
|
||||
|
||||
private final FieldDataType fieldDataType;
|
||||
|
||||
private ShortFieldData fieldData;
|
||||
|
||||
private final StaticAggregatorValueProc aggregator;
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsShortFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
this.size = size;
|
||||
this.comparatorType = comparatorType;
|
||||
this.numberOfShards = context.numberOfShards();
|
||||
|
||||
this.fieldName = fieldName;
|
||||
|
||||
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
|
||||
if (smartMappers == null || !smartMappers.hasMapper()) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms short facet collector on it");
|
||||
} else {
|
||||
// add type filter if there is exact doc mapper associated with it
|
||||
if (smartMappers.hasDocMapper()) {
|
||||
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
|
||||
}
|
||||
|
||||
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.SHORT) {
|
||||
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of short type, can't run terms short facet collector on it");
|
||||
}
|
||||
|
||||
this.indexFieldName = smartMappers.mapper().names().indexName();
|
||||
this.fieldDataType = smartMappers.mapper().fieldDataType();
|
||||
}
|
||||
|
||||
if (script != null) {
|
||||
this.script = new SearchScript(context.lookup(), scriptLang, script, params, context.scriptService());
|
||||
} else {
|
||||
this.script = null;
|
||||
}
|
||||
|
||||
if (this.script == null) {
|
||||
aggregator = new StaticAggregatorValueProc(popFacets());
|
||||
} else {
|
||||
aggregator = new AggregatorValueProc(popFacets(), this.script);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
fieldData = (ShortFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
|
||||
if (script != null) {
|
||||
script.setNextReader(reader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override protected void doCollect(int doc) throws IOException {
|
||||
fieldData.forEachValueInDoc(doc, aggregator);
|
||||
}
|
||||
|
||||
@Override public Facet facet() {
|
||||
TShortIntHashMap facets = aggregator.facets();
|
||||
if (facets.isEmpty()) {
|
||||
pushFacets(facets);
|
||||
return new InternalShortTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalShortTermsFacet.ShortEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalShortTermsFacet.ShortEntry> ordered = new BoundedTreeSet<InternalShortTermsFacet.ShortEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TShortIntIterator it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalShortTermsFacet.ShortEntry(it.key(), it.value()));
|
||||
}
|
||||
pushFacets(facets);
|
||||
return new InternalShortTermsFacet(facetName, fieldName, comparatorType, size, ordered);
|
||||
}
|
||||
}
|
||||
|
||||
static TShortIntHashMap popFacets() {
|
||||
Deque<TShortIntHashMap> deque = cache.get().get();
|
||||
if (deque.isEmpty()) {
|
||||
deque.add(new TShortIntHashMap());
|
||||
}
|
||||
TShortIntHashMap facets = deque.pollFirst();
|
||||
facets.clear();
|
||||
return facets;
|
||||
}
|
||||
|
||||
static void pushFacets(TShortIntHashMap facets) {
|
||||
facets.clear();
|
||||
Deque<TShortIntHashMap> deque = cache.get().get();
|
||||
if (deque != null) {
|
||||
deque.add(facets);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AggregatorValueProc extends StaticAggregatorValueProc {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
private final Map<String, Object> scriptParams;
|
||||
|
||||
public AggregatorValueProc(TShortIntHashMap facets, SearchScript script) {
|
||||
super(facets);
|
||||
this.script = script;
|
||||
if (script != null) {
|
||||
scriptParams = Maps.newHashMapWithExpectedSize(4);
|
||||
} else {
|
||||
scriptParams = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, short value) {
|
||||
if (script != null) {
|
||||
scriptParams.put("term", value);
|
||||
Object scriptValue = script.execute(docId, scriptParams);
|
||||
if (scriptValue == null) {
|
||||
return;
|
||||
}
|
||||
if (scriptValue instanceof Boolean) {
|
||||
if (!((Boolean) scriptValue)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
value = ((Number) scriptValue).shortValue();
|
||||
}
|
||||
}
|
||||
super.onValue(docId, value);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StaticAggregatorValueProc implements ShortFieldData.ValueInDocProc {
|
||||
|
||||
private final TShortIntHashMap facets;
|
||||
|
||||
public StaticAggregatorValueProc(TShortIntHashMap facets) {
|
||||
this.facets = facets;
|
||||
}
|
||||
|
||||
@Override public void onValue(int docId, short value) {
|
||||
facets.adjustOrPutValue(value, 1, 1);
|
||||
}
|
||||
|
||||
public final TShortIntHashMap facets() {
|
||||
return facets;
|
||||
}
|
||||
}
|
||||
}
|
@ -128,7 +128,7 @@ public class FieldsTermsStringFacetCollector extends AbstractFacetCollector {
|
||||
return new InternalStringTermsFacet(facetName, arrayToCommaDelimitedString(fieldsNames), comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards);
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
|
||||
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.trove.TObjectIntIterator;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.InternalFacet;
|
||||
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
|
||||
@ -42,14 +41,24 @@ import java.util.List;
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class InternalStringTermsFacet implements InternalFacet, InternalTermsFacet {
|
||||
public class InternalStringTermsFacet extends InternalTermsFacet {
|
||||
|
||||
public static Stream STREAM = new Stream() {
|
||||
private static final String STREAM_TYPE = "tTerms";
|
||||
|
||||
public static void registerStream() {
|
||||
Streams.registerStream(STREAM, STREAM_TYPE);
|
||||
}
|
||||
|
||||
static Stream STREAM = new Stream() {
|
||||
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
|
||||
return readTermsFacet(in);
|
||||
}
|
||||
};
|
||||
|
||||
@Override public String streamType() {
|
||||
return STREAM_TYPE;
|
||||
}
|
||||
|
||||
public static class StringEntry implements Entry {
|
||||
|
||||
private String term;
|
||||
@ -68,6 +77,14 @@ public class InternalStringTermsFacet implements InternalFacet, InternalTermsFac
|
||||
return term;
|
||||
}
|
||||
|
||||
@Override public Number termAsNumber() {
|
||||
return Double.parseDouble(term);
|
||||
}
|
||||
|
||||
@Override public Number getTermAsNumber() {
|
||||
return termAsNumber();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ public class ScriptTermsStringFieldFacetCollector extends AbstractFacetCollector
|
||||
return new InternalStringTermsFacet(facetName, sScript, comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards);
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
|
||||
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.script.search.SearchScript;
|
||||
import org.elasticsearch.search.facet.AbstractFacetCollector;
|
||||
import org.elasticsearch.search.facet.Facet;
|
||||
import org.elasticsearch.search.facet.terms.TermsFacet;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -61,7 +62,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
private final String indexFieldName;
|
||||
|
||||
private final InternalStringTermsFacet.ComparatorType comparatorType;
|
||||
private final TermsFacet.ComparatorType comparatorType;
|
||||
|
||||
private final int size;
|
||||
|
||||
@ -75,7 +76,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector {
|
||||
|
||||
private final SearchScript script;
|
||||
|
||||
public TermsStringFacetCollector(String facetName, String fieldName, int size, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
public TermsStringFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, SearchContext context,
|
||||
ImmutableSet<String> excluded, Pattern pattern, String scriptLang, String script, Map<String, Object> params) {
|
||||
super(facetName);
|
||||
this.fieldDataCache = context.fieldDataCache();
|
||||
@ -130,7 +131,7 @@ public class TermsStringFacetCollector extends AbstractFacetCollector {
|
||||
return new InternalStringTermsFacet(facetName, fieldName, comparatorType, size, ImmutableList.<InternalStringTermsFacet.StringEntry>of());
|
||||
} else {
|
||||
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(InternalStringTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards);
|
||||
BoundedTreeSet<InternalStringTermsFacet.StringEntry> ordered = new BoundedTreeSet<InternalStringTermsFacet.StringEntry>(comparatorType.comparator(), size * numberOfShards);
|
||||
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
|
||||
it.advance();
|
||||
ordered.add(new InternalStringTermsFacet.StringEntry(it.key(), it.value()));
|
||||
|
@ -78,13 +78,17 @@ public class SimpleFacetsTests extends AbstractNodesTests {
|
||||
|
||||
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
|
||||
.field("stag", "111")
|
||||
.field("lstag", 111)
|
||||
.startArray("tag").value("xxx").value("yyy").endArray()
|
||||
.startArray("ltag").value(1000l).value(2000l).endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
|
||||
|
||||
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
|
||||
.field("stag", "111")
|
||||
.field("lstag", 111)
|
||||
.startArray("tag").value("zzz").value("yyy").endArray()
|
||||
.startArray("ltag").value(3000l).value(2000l).endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
@ -207,13 +211,21 @@ public class SimpleFacetsTests extends AbstractNodesTests {
|
||||
|
||||
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
|
||||
.field("stag", "111")
|
||||
.field("lstag", 111)
|
||||
.field("dstag", 111.1)
|
||||
.startArray("tag").value("xxx").value("yyy").endArray()
|
||||
.startArray("ltag").value(1000l).value(2000l).endArray()
|
||||
.startArray("dtag").value(1000.1).value(2000.1).endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
|
||||
|
||||
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
|
||||
.field("stag", "111")
|
||||
.field("lstag", 111)
|
||||
.field("dstag", 111.1)
|
||||
.startArray("tag").value("zzz").value("yyy").endArray()
|
||||
.startArray("ltag").value(3000l).value(2000l).endArray()
|
||||
.startArray("dtag").value(3000.1).value(2000.1).endArray()
|
||||
.endObject()).execute().actionGet();
|
||||
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
@ -236,6 +248,52 @@ public class SimpleFacetsTests extends AbstractNodesTests {
|
||||
assertThat(facet.entries().get(0).term(), equalTo("yyy"));
|
||||
assertThat(facet.entries().get(0).count(), equalTo(2));
|
||||
|
||||
// Numeric
|
||||
|
||||
searchResponse = client.prepareSearch()
|
||||
.setQuery(termQuery("stag", "111"))
|
||||
.addFacet(termsFacet("facet1").field("lstag").size(10))
|
||||
.addFacet(termsFacet("facet2").field("ltag").size(10))
|
||||
.execute().actionGet();
|
||||
|
||||
facet = searchResponse.facets().facet("facet1");
|
||||
assertThat(facet.name(), equalTo("facet1"));
|
||||
assertThat(facet.entries().size(), equalTo(1));
|
||||
assertThat(facet.entries().get(0).term(), equalTo("111"));
|
||||
assertThat(facet.entries().get(0).count(), equalTo(2));
|
||||
|
||||
facet = searchResponse.facets().facet("facet2");
|
||||
assertThat(facet.name(), equalTo("facet2"));
|
||||
assertThat(facet.entries().size(), equalTo(3));
|
||||
assertThat(facet.entries().get(0).term(), equalTo("2000"));
|
||||
assertThat(facet.entries().get(0).count(), equalTo(2));
|
||||
assertThat(facet.entries().get(1).term(), anyOf(equalTo("1000"), equalTo("3000")));
|
||||
assertThat(facet.entries().get(1).count(), equalTo(1));
|
||||
assertThat(facet.entries().get(2).term(), anyOf(equalTo("1000"), equalTo("3000")));
|
||||
assertThat(facet.entries().get(2).count(), equalTo(1));
|
||||
|
||||
searchResponse = client.prepareSearch()
|
||||
.setQuery(termQuery("stag", "111"))
|
||||
.addFacet(termsFacet("facet1").field("dstag").size(10))
|
||||
.addFacet(termsFacet("facet2").field("dtag").size(10))
|
||||
.execute().actionGet();
|
||||
|
||||
facet = searchResponse.facets().facet("facet1");
|
||||
assertThat(facet.name(), equalTo("facet1"));
|
||||
assertThat(facet.entries().size(), equalTo(1));
|
||||
assertThat(facet.entries().get(0).term(), equalTo("111.1"));
|
||||
assertThat(facet.entries().get(0).count(), equalTo(2));
|
||||
|
||||
facet = searchResponse.facets().facet("facet2");
|
||||
assertThat(facet.name(), equalTo("facet2"));
|
||||
assertThat(facet.entries().size(), equalTo(3));
|
||||
assertThat(facet.entries().get(0).term(), equalTo("2000.1"));
|
||||
assertThat(facet.entries().get(0).count(), equalTo(2));
|
||||
assertThat(facet.entries().get(1).term(), anyOf(equalTo("1000.1"), equalTo("3000.1")));
|
||||
assertThat(facet.entries().get(1).count(), equalTo(1));
|
||||
assertThat(facet.entries().get(2).term(), anyOf(equalTo("1000.1"), equalTo("3000.1")));
|
||||
assertThat(facet.entries().get(2).count(), equalTo(1));
|
||||
|
||||
// Test Facet Filter
|
||||
|
||||
searchResponse = client.prepareSearch()
|
||||
|
Loading…
x
Reference in New Issue
Block a user