Aggregations Added Filters aggregation
A multi-bucket aggregation where multiple filters can be defined (each filter defines a bucket). The buckets will collect all the documents that match their associated filter. This aggregation can be very useful when one wants to compare analytics between different criterias. It can also be accomplished using multiple definitions of the single filter aggregation, but here, the user will only need to define the sub-aggregations only once. Closes #6118
This commit is contained in:
parent
d9d5b35be9
commit
3c9c9f33e2
|
@ -4,6 +4,8 @@ include::bucket/global-aggregation.asciidoc[]
|
|||
|
||||
include::bucket/filter-aggregation.asciidoc[]
|
||||
|
||||
include::bucket/filters-aggregation.asciidoc[]
|
||||
|
||||
include::bucket/missing-aggregation.asciidoc[]
|
||||
|
||||
include::bucket/nested-aggregation.asciidoc[]
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
[[search-aggregations-bucket-filters-aggregation]]
|
||||
=== Filters
|
||||
|
||||
Defines a multi bucket aggregations where each bucket is associated with a filter. Each bucket will collect all
|
||||
documents that match its associated filter.
|
||||
|
||||
Example:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggs" : {
|
||||
"messages" : {
|
||||
"filters" : {
|
||||
"filters" : {
|
||||
"errors" : { "query" : { "match" : { "body" : "error" } } },
|
||||
"warnings" : { "query" : { "match" : { "body" : "warning" } } }
|
||||
}
|
||||
},
|
||||
"aggs" : { "monthly" : { "histogram" : { "field" : "timestamp", "interval" : "1M" } } }
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
In the above example, we analyze log messages. The aggregation will build two collection (buckets) of log messages - one
|
||||
for all those containing an error, and another for all those containing a warning. And for each of these buckets it will
|
||||
break them down by month.
|
||||
|
||||
Response:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
...
|
||||
|
||||
"aggs" : {
|
||||
"messages" : {
|
||||
"buckets" : {
|
||||
"errors" : {
|
||||
"doc_count" : 34,
|
||||
"monthly" : {
|
||||
"buckets : [
|
||||
... // the histogram monthly breakdown
|
||||
]
|
||||
}
|
||||
},
|
||||
"warnings" : {
|
||||
"doc_count" : 439,
|
||||
"monthly" : {
|
||||
"buckets : [
|
||||
... // the histogram monthly breakdown
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
==== Anonymous filters
|
||||
|
||||
The filters field can also be provided as an array of filters, as in the following request:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggs" : {
|
||||
"messages" : {
|
||||
"filters" : {
|
||||
"filters" : [
|
||||
"query" : { "match" : { "body" : "error" } },
|
||||
"query" : { "match" : { "body" : "warning" } }
|
||||
]
|
||||
},
|
||||
"aggs" : { "monthly" : { "histogram" : { "field" : "timestamp", "interval" : "1M" } } }
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
The filtered buckets are returned in the same order as provided in the request. The response for this example would be:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
...
|
||||
|
||||
"aggs" : {
|
||||
"messages" : {
|
||||
"buckets" : [
|
||||
{
|
||||
"doc_count" : 34,
|
||||
"monthly" : {
|
||||
"buckets : [
|
||||
... // the histogram monthly breakdown
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"doc_count" : 439,
|
||||
"monthly" : {
|
||||
"buckets : [
|
||||
... // the histogram monthly breakdown
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
|
||||
|
@ -85,6 +86,10 @@ public class AggregationBuilders {
|
|||
return new FilterAggregationBuilder(name);
|
||||
}
|
||||
|
||||
public static FiltersAggregationBuilder filters(String name) {
|
||||
return new FiltersAggregationBuilder(name);
|
||||
}
|
||||
|
||||
public static GlobalBuilder global(String name) {
|
||||
return new GlobalBuilder(name);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
|
|||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser;
|
||||
|
@ -72,6 +73,7 @@ public class AggregationModule extends AbstractModule {
|
|||
parsers.add(GlobalParser.class);
|
||||
parsers.add(MissingParser.class);
|
||||
parsers.add(FilterParser.class);
|
||||
parsers.add(FiltersParser.class);
|
||||
parsers.add(TermsParser.class);
|
||||
parsers.add(SignificantTermsParser.class);
|
||||
parsers.add(RangeParser.class);
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations;
|
|||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters;
|
||||
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
|
||||
|
@ -74,6 +75,7 @@ public class TransportAggregationModule extends AbstractModule {
|
|||
// buckets
|
||||
InternalGlobal.registerStreams();
|
||||
InternalFilter.registerStreams();
|
||||
InternalFilters.registerStream();
|
||||
InternalMissing.registerStreams();
|
||||
StringTerms.registerStreams();
|
||||
LongTerms.registerStreams();
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.filters;
|
||||
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket
|
||||
* will collect all documents matching its filter.
|
||||
*/
|
||||
public interface Filters extends MultiBucketsAggregation {
|
||||
|
||||
/**
|
||||
* A bucket associated with a specific filter (identified by its key)
|
||||
*/
|
||||
public static interface Bucket extends MultiBucketsAggregation.Bucket {
|
||||
}
|
||||
|
||||
Collection<? extends Bucket> getBuckets();
|
||||
|
||||
@Override
|
||||
Bucket getBucketByKey(String key);
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.filters;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.FilterBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilderException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class FiltersAggregationBuilder extends AggregationBuilder<FiltersAggregationBuilder> {
|
||||
|
||||
private Map<String, FilterBuilder> keyedFilters = null;
|
||||
private List<FilterBuilder> nonKeyedFilters = null;
|
||||
|
||||
public FiltersAggregationBuilder(String name) {
|
||||
super(name, InternalFilters.TYPE.name());
|
||||
}
|
||||
|
||||
public FiltersAggregationBuilder filter(String key, FilterBuilder filter) {
|
||||
if (keyedFilters == null) {
|
||||
keyedFilters = new LinkedHashMap<>();
|
||||
}
|
||||
keyedFilters.put(key, filter);
|
||||
return this;
|
||||
}
|
||||
|
||||
public FiltersAggregationBuilder filter(FilterBuilder filter) {
|
||||
if (nonKeyedFilters == null) {
|
||||
nonKeyedFilters = new ArrayList<>();
|
||||
}
|
||||
nonKeyedFilters.add(filter);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (keyedFilters == null && nonKeyedFilters == null) {
|
||||
throw new SearchSourceBuilderException("At least one filter must be set on filter aggregation [" + name + "]");
|
||||
}
|
||||
if (keyedFilters != null && nonKeyedFilters != null) {
|
||||
throw new SearchSourceBuilderException("Cannot add both keyed and non-keyed filters to filters aggregation");
|
||||
}
|
||||
|
||||
if (keyedFilters != null) {
|
||||
builder.startObject("filters");
|
||||
for (Map.Entry<String, FilterBuilder> entry : keyedFilters.entrySet()) {
|
||||
builder.field(entry.getKey());
|
||||
entry.getValue().toXContent(builder, params);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
if (nonKeyedFilters != null) {
|
||||
builder.startArray("filters");
|
||||
for (FilterBuilder filterBuilder : nonKeyedFilters) {
|
||||
filterBuilder.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.filters;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.common.lucene.docset.DocIdSets;
|
||||
import org.elasticsearch.search.aggregations.*;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class FiltersAggregator extends BucketsAggregator {
|
||||
|
||||
static class KeyedFilter {
|
||||
|
||||
final String key;
|
||||
final Filter filter;
|
||||
|
||||
KeyedFilter(String key, Filter filter) {
|
||||
this.key = key;
|
||||
this.filter = filter;
|
||||
}
|
||||
}
|
||||
|
||||
private final KeyedFilter[] filters;
|
||||
private final Bits[] bits;
|
||||
private boolean keyed;
|
||||
|
||||
public FiltersAggregator(String name, AggregatorFactories factories, List<KeyedFilter> filters, boolean keyed, AggregationContext aggregationContext,
|
||||
Aggregator parent) {
|
||||
super(name, BucketAggregationMode.MULTI_BUCKETS, factories, filters.size() * (parent == null ? 1 : parent.estimatedBucketCount()),
|
||||
aggregationContext, parent);
|
||||
this.keyed = keyed;
|
||||
this.filters = filters.toArray(new KeyedFilter[filters.size()]);
|
||||
this.bits = new Bits[this.filters.length];
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCollect() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext reader) {
|
||||
try {
|
||||
for (int i = 0; i < filters.length; i++) {
|
||||
bits[i] = DocIdSets.toSafeBits(reader.reader(), filters[i].filter.getDocIdSet(reader, reader.reader().getLiveDocs()));
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new AggregationExecutionException("Failed to aggregate filter aggregator [" + name + "]", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
for (int i = 0; i < bits.length; i++) {
|
||||
if (bits[i].get(doc)) {
|
||||
collectBucket(doc, bucketOrd(owningBucketOrdinal, i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
|
||||
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
|
||||
for (int i = 0; i < filters.length; i++) {
|
||||
KeyedFilter filter = filters[i];
|
||||
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
|
||||
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd));
|
||||
buckets.add(bucket);
|
||||
}
|
||||
return new InternalFilters(name, buckets, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation buildEmptyAggregation() {
|
||||
InternalAggregations subAggs = buildEmptySubAggregations();
|
||||
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
|
||||
for (int i = 0; i < filters.length; i++) {
|
||||
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs);
|
||||
buckets.add(bucket);
|
||||
}
|
||||
return new InternalFilters(name, buckets, keyed);
|
||||
}
|
||||
|
||||
private final long bucketOrd(long owningBucketOrdinal, int filterOrd) {
|
||||
return owningBucketOrdinal * filters.length + filterOrd;
|
||||
}
|
||||
|
||||
public static class Factory extends AggregatorFactory {
|
||||
|
||||
private final List<KeyedFilter> filters;
|
||||
private boolean keyed;
|
||||
|
||||
public Factory(String name, List<KeyedFilter> filters, boolean keyed) {
|
||||
super(name, InternalFilters.TYPE.name());
|
||||
this.filters = filters;
|
||||
this.keyed = keyed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) {
|
||||
return new FiltersAggregator(name, factories, filters, keyed, context, parent);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.filters;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.ParsedFilter;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class FiltersParser implements Aggregator.Parser {
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return InternalFilters.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
List<FiltersAggregator.KeyedFilter> filters = new ArrayList<>();
|
||||
|
||||
XContentParser.Token token = null;
|
||||
String currentFieldName = null;
|
||||
Boolean keyed = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("filters".equals(currentFieldName)) {
|
||||
keyed = true;
|
||||
String key = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
key = parser.currentName();
|
||||
} else {
|
||||
ParsedFilter filter = context.queryParserService().parseInnerFilter(parser);
|
||||
filters.add(new FiltersAggregator.KeyedFilter(key, filter.filter()));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
if ("filters".equals(currentFieldName)) {
|
||||
keyed = false;
|
||||
int idx = 0;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
ParsedFilter filter = context.queryParserService().parseInnerFilter(parser);
|
||||
filters.add(new FiltersAggregator.KeyedFilter(String.valueOf(idx), filter.filter()));
|
||||
idx++;
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
}
|
||||
|
||||
return new FiltersAggregator.Factory(aggregationName, filters, keyed);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,220 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.filters;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class InternalFilters extends InternalAggregation implements Filters {
|
||||
|
||||
public final static Type TYPE = new Type("filters");
|
||||
|
||||
private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
@Override
|
||||
public InternalFilters readResult(StreamInput in) throws IOException {
|
||||
InternalFilters filters = new InternalFilters();
|
||||
filters.readFrom(in);
|
||||
return filters;
|
||||
}
|
||||
};
|
||||
|
||||
public static void registerStream() {
|
||||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
public static class Bucket implements Filters.Bucket {
|
||||
|
||||
private String key;
|
||||
private long docCount;
|
||||
InternalAggregations aggregations;
|
||||
|
||||
public Bucket(String key, long docCount, InternalAggregations aggregations) {
|
||||
this.key = key;
|
||||
this.docCount = docCount;
|
||||
this.aggregations = aggregations;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getKeyAsText() {
|
||||
return new StringText(getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDocCount() {
|
||||
return docCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregations getAggregations() {
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
Bucket reduce(List<Bucket> buckets, BigArrays bigArrays) {
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations);
|
||||
} else {
|
||||
reduced.docCount += bucket.docCount;
|
||||
}
|
||||
aggregationsList.add(bucket.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return reduced;
|
||||
}
|
||||
|
||||
void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IOException {
|
||||
if (keyed) {
|
||||
builder.startObject(key);
|
||||
} else {
|
||||
builder.startObject();
|
||||
}
|
||||
builder.field(CommonFields.DOC_COUNT, docCount);
|
||||
aggregations.toXContentInternal(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
||||
private List<Bucket> buckets;
|
||||
private Map<String, Bucket> bucketMap;
|
||||
private boolean keyed;
|
||||
|
||||
public InternalFilters() {} // for serialization
|
||||
|
||||
public InternalFilters(String name, List<Bucket> buckets, boolean keyed) {
|
||||
super(name);
|
||||
this.buckets = buckets;
|
||||
this.keyed = keyed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Bucket> getBuckets() {
|
||||
return buckets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket getBucketByKey(String key) {
|
||||
if (bucketMap == null) {
|
||||
bucketMap = new HashMap<>(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
bucketMap.put(bucket.getKey(), bucket);
|
||||
}
|
||||
}
|
||||
return bucketMap.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
List<List<Bucket>> bucketsList = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalFilters filters = (InternalFilters) aggregation;
|
||||
if (bucketsList == null) {
|
||||
bucketsList = new ArrayList<>(filters.buckets.size());
|
||||
for (Bucket bucket : filters.buckets) {
|
||||
List<Bucket> sameRangeList = new ArrayList<>(aggregations.size());
|
||||
sameRangeList.add(bucket);
|
||||
bucketsList.add(sameRangeList);
|
||||
}
|
||||
} else {
|
||||
int i = 0;
|
||||
for (Bucket bucket : filters.buckets) {
|
||||
bucketsList.get(i++).add(bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
InternalFilters reduced = new InternalFilters(name, new ArrayList<Bucket>(bucketsList.size()), keyed);
|
||||
for (List<Bucket> sameRangeList : bucketsList) {
|
||||
reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays()));
|
||||
}
|
||||
return reduced;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
keyed = in.readBoolean();
|
||||
int size = in.readVInt();
|
||||
List<Bucket> buckets = Lists.newArrayListWithCapacity(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
String key = in.readOptionalString();
|
||||
buckets.add(new Bucket(key, in.readVLong(), InternalAggregations.readAggregations(in)));
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketMap = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeBoolean(keyed);
|
||||
out.writeVInt(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
out.writeOptionalString(bucket.key);
|
||||
out.writeVLong(bucket.docCount);
|
||||
bucket.aggregations.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
if (keyed) {
|
||||
builder.startObject(CommonFields.BUCKETS);
|
||||
} else {
|
||||
builder.startArray(CommonFields.BUCKETS);
|
||||
}
|
||||
for (Bucket bucket : buckets) {
|
||||
bucket.toXContent(builder, params, keyed);
|
||||
}
|
||||
if (keyed) {
|
||||
builder.endObject();
|
||||
} else {
|
||||
builder.endArray();
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.search.aggregations.bucket.filters.Filters;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.FilterBuilders.matchAllFilter;
|
||||
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ElasticsearchIntegrationTest.SuiteScopeTest
|
||||
public class FiltersTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
static int numDocs, numTag1Docs, numTag2Docs;
|
||||
|
||||
@Override
|
||||
public void setupSuiteScopeCluster() throws Exception {
|
||||
createIndex("idx");
|
||||
createIndex("idx2");
|
||||
numDocs = randomIntBetween(5, 20);
|
||||
numTag1Docs = randomIntBetween(1, numDocs - 1);
|
||||
List<IndexRequestBuilder> builders = new ArrayList<>();
|
||||
for (int i = 0; i < numTag1Docs; i++) {
|
||||
builders.add(client().prepareIndex("idx", "type", ""+i).setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.field("value", i + 1)
|
||||
.field("tag", "tag1")
|
||||
.endObject()));
|
||||
}
|
||||
for (int i = numTag1Docs; i < numDocs; i++) {
|
||||
numTag2Docs++;
|
||||
builders.add(client().prepareIndex("idx", "type", ""+i).setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.field("value", i)
|
||||
.field("tag", "tag2")
|
||||
.field("name", "name" + i)
|
||||
.endObject()));
|
||||
}
|
||||
prepareCreate("empty_bucket_idx").addMapping("type", "value", "type=integer").execute().actionGet();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
builders.add(client().prepareIndex("empty_bucket_idx", "type", ""+i).setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.field("value", i*2)
|
||||
.endObject()));
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
ensureSearchable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simple() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
filters("tags")
|
||||
.filter("tag1", termFilter("tag", "tag1"))
|
||||
.filter("tag2", termFilter("tag", "tag2")))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Filters filters = response.getAggregations().get("tags");
|
||||
assertThat(filters, notNullValue());
|
||||
assertThat(filters.getName(), equalTo("tags"));
|
||||
|
||||
assertThat(filters.getBuckets().size(), equalTo(2));
|
||||
|
||||
Filters.Bucket bucket = filters.getBucketByKey("tag1");
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs));
|
||||
|
||||
bucket = filters.getBucketByKey("tag2");
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void withSubAggregation() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
filters("tags")
|
||||
.filter("tag1", termFilter("tag", "tag1"))
|
||||
.filter("tag2", termFilter("tag", "tag2"))
|
||||
.subAggregation(avg("avg_value").field("value")))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Filters filters = response.getAggregations().get("tags");
|
||||
assertThat(filters, notNullValue());
|
||||
assertThat(filters.getName(), equalTo("tags"));
|
||||
|
||||
assertThat(filters.getBuckets().size(), equalTo(2));
|
||||
|
||||
Filters.Bucket bucket = filters.getBucketByKey("tag1");
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs));
|
||||
long sum = 0;
|
||||
for (int i = 0; i < numTag1Docs; ++i) {
|
||||
sum += i + 1;
|
||||
}
|
||||
assertThat(bucket.getAggregations().asList().isEmpty(), is(false));
|
||||
Avg avgValue = bucket.getAggregations().get("avg_value");
|
||||
assertThat(avgValue, notNullValue());
|
||||
assertThat(avgValue.getName(), equalTo("avg_value"));
|
||||
assertThat(avgValue.getValue(), equalTo((double) sum / numTag1Docs));
|
||||
|
||||
bucket = filters.getBucketByKey("tag2");
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs));
|
||||
sum = 0;
|
||||
for (int i = numTag1Docs; i < numDocs; ++i) {
|
||||
sum += i;
|
||||
}
|
||||
assertThat(bucket.getAggregations().asList().isEmpty(), is(false));
|
||||
avgValue = bucket.getAggregations().get("avg_value");
|
||||
assertThat(avgValue, notNullValue());
|
||||
assertThat(avgValue.getName(), equalTo("avg_value"));
|
||||
assertThat(avgValue.getValue(), equalTo((double) sum / numTag2Docs));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void withContextBasedSubAggregation() throws Exception {
|
||||
|
||||
try {
|
||||
client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
filters("tags")
|
||||
.filter("tag1", termFilter("tag", "tag1"))
|
||||
.filter("tag2", termFilter("tag", "tag2"))
|
||||
.subAggregation(avg("avg_value"))
|
||||
)
|
||||
.execute().actionGet();
|
||||
|
||||
fail("expected execution to fail - an attempt to have a context based numeric sub-aggregation, but there is not value source" +
|
||||
"context which the sub-aggregation can inherit");
|
||||
|
||||
} catch (ElasticsearchException ese) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyAggregation() throws Exception {
|
||||
SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
|
||||
.setQuery(matchAllQuery())
|
||||
.addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0)
|
||||
.subAggregation(filters("filters").filter("all", matchAllFilter())))
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
|
||||
Histogram histo = searchResponse.getAggregations().get("histo");
|
||||
assertThat(histo, Matchers.notNullValue());
|
||||
Histogram.Bucket bucket = histo.getBucketByKey(1l);
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
|
||||
Filters filters = bucket.getAggregations().get("filters");
|
||||
assertThat(filters, notNullValue());
|
||||
Filters.Bucket all = filters.getBucketByKey("all");
|
||||
assertThat(all, Matchers.notNullValue());
|
||||
assertThat(all.getKey(), equalTo("all"));
|
||||
assertThat(all.getDocCount(), is(0l));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simple_nonKeyed() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(
|
||||
filters("tags")
|
||||
.filter(termFilter("tag", "tag1"))
|
||||
.filter(termFilter("tag", "tag2")))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Filters filters = response.getAggregations().get("tags");
|
||||
assertThat(filters, notNullValue());
|
||||
assertThat(filters.getName(), equalTo("tags"));
|
||||
|
||||
assertThat(filters.getBuckets().size(), equalTo(2));
|
||||
|
||||
Collection<? extends Filters.Bucket> buckets = filters.getBuckets();
|
||||
Iterator<? extends Filters.Bucket> itr = buckets.iterator();
|
||||
|
||||
Filters.Bucket bucket = itr.next();
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs));
|
||||
|
||||
bucket = itr.next();
|
||||
assertThat(bucket, Matchers.notNullValue());
|
||||
assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue