Add composite aggregator (#26800)

* This change adds a module called `aggs-composite` that defines a new aggregation named `composite`.
The `composite` aggregation is a multi-buckets aggregation that creates composite buckets made of multiple sources.
The sources for each bucket can be defined as:
  * A `terms` source, values are extracted from a field or a script.
  * A `date_histogram` source, values are extracted from a date field and rounded to the provided interval.
This aggregation can be used to retrieve all buckets of a deeply nested aggregation by flattening the nested aggregation in composite buckets.
A composite buckets is composed of one value per source and is built for each document as the combinations of values in the provided sources.
For instance the following aggregation:

````
"test_agg": {
  "terms": {
    "field": "field1"
  },
  "aggs": {
    "nested_test_agg":
      "terms": {
        "field": "field2"
      }
  }
}
````
... which retrieves the top N terms for `field1` and for each top term in `field1` the top N terms for `field2`, can be replaced by a `composite` aggregation in order to retrieve **all** the combinations of `field1`, `field2` in the matching documents:

````
"composite_agg": {
  "composite": {
    "sources": [
      {
	"field1": {
          "terms": {
              "field": "field1"
            }
        }
      },
      {
	"field2": {
          "terms": {
            "field": "field2"
          }
        }
      },
    }
  }
````

The response of the aggregation looks like this:

````
"aggregations": {
  "composite_agg": {
    "buckets": [
      {
        "key": {
          "field1": "alabama",
          "field2": "almanach"
        },
        "doc_count": 100
      },
      {
        "key": {
          "field1": "alabama",
          "field2": "calendar"
        },
        "doc_count": 1
      },
      {
        "key": {
          "field1": "arizona",
          "field2": "calendar"
        },
        "doc_count": 1
      }
    ]
  }
}
````

By default this aggregation returns 10 buckets sorted in ascending order of the composite key.
Pagination can be achieved by providing `after` values, the values of the composite key to aggregate after.
For instance the following aggregation will aggregate all composite keys that sorts after `arizona, calendar`:

````
"composite_agg": {
  "composite": {
    "after": {"field1": "alabama", "field2": "calendar"},
    "size": 100,
    "sources": [
      {
	"field1": {
          "terms": {
            "field": "field1"
          }
        }
      },
      {
	"field2": {
          "terms": {
            "field": "field2"
          }
	}
      }
    }
  }
````

This aggregation is optimized for indices that set an index sorting that match the composite source definition.
For instance the aggregation above could run faster on indices that defines an index sorting like this:

````
"settings": {
  "index.sort.field": ["field1", "field2"]
}
````

In this case the `composite` aggregation can early terminate on each segment.
This aggregation also accepts multi-valued field but disables early termination for these fields even if index sorting matches the sources definition.
This is mandatory because index sorting picks only one value per document to perform the sort.
This commit is contained in:
Jim Ferenczi 2017-11-16 15:13:36 +01:00 committed by GitHub
parent 303e0c0e86
commit 623367d793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 5418 additions and 29 deletions

View File

@ -242,6 +242,7 @@ subprojects {
"org.elasticsearch.plugin:parent-join-client:${version}": ':modules:parent-join',
"org.elasticsearch.plugin:aggs-matrix-stats-client:${version}": ':modules:aggs-matrix-stats',
"org.elasticsearch.plugin:percolator-client:${version}": ':modules:percolator',
"org.elasticsearch.plugin:aggs-composite-client:${version}": ':modules:aggs-composite',
]
if (indexCompatVersions[-1].snapshot) {
/* The last and second to last versions can be snapshots. Rather than use

View File

@ -39,7 +39,8 @@ dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
compile "org.elasticsearch.plugin:parent-join-client:${version}"
compile "org.elasticsearch.plugin:aggs-matrix-stats-client:${version}"
compile "org.elasticsearch.plugin:aggs-composite-client:${version}"
testCompile "org.elasticsearch.client:test:${version}"
testCompile "org.elasticsearch.test:framework:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"

View File

@ -62,6 +62,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
@ -647,7 +648,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testProvidedNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
assertEquals(2, namedXContents.size());
assertEquals(3, namedXContents.size());
Map<Class<?>, Integer> categories = new HashMap<>();
List<String> names = new ArrayList<>();
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
@ -658,9 +659,10 @@ public class RestHighLevelClientTests extends ESTestCase {
}
}
assertEquals(1, categories.size());
assertEquals(Integer.valueOf(2), categories.get(Aggregation.class));
assertEquals(Integer.valueOf(3), categories.get(Aggregation.class));
assertTrue(names.contains(ChildrenAggregationBuilder.NAME));
assertTrue(names.contains(MatrixStatsAggregationBuilder.NAME));
assertTrue(names.contains(CompositeAggregationBuilder.NAME));
}
private static class TrackingActionListener implements ActionListener<Integer> {

View File

@ -32,6 +32,7 @@ dependencies {
compile "org.elasticsearch.plugin:lang-mustache-client:${version}"
compile "org.elasticsearch.plugin:percolator-client:${version}"
compile "org.elasticsearch.plugin:parent-join-client:${version}"
compile "org.elasticsearch.plugin:aggs-composite-client:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"

View File

@ -30,6 +30,7 @@ import org.elasticsearch.join.ParentJoinPlugin;
import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationPlugin;
import org.elasticsearch.transport.Netty4Plugin;
import java.util.Arrays;
@ -44,6 +45,7 @@ import java.util.concurrent.TimeUnit;
* {@link PercolatorPlugin},
* {@link MustachePlugin},
* {@link ParentJoinPlugin}
* {@link CompositeAggregationPlugin}
* plugins for the client. These plugins are all the required modules for Elasticsearch.
*/
@SuppressWarnings({"unchecked","varargs"})
@ -88,7 +90,8 @@ public class PreBuiltTransportClient extends TransportClient {
ReindexPlugin.class,
PercolatorPlugin.class,
MustachePlugin.class,
ParentJoinPlugin.class));
ParentJoinPlugin.class,
CompositeAggregationPlugin.class));
/**
* Creates a new transport client with pre-installed plugins.

View File

@ -30,6 +30,7 @@ import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationPlugin;
import org.junit.Test;
import java.util.Arrays;
@ -52,7 +53,8 @@ public class PreBuiltTransportClientTests extends RandomizedTest {
@Test
public void testInstallPluginTwice() {
for (Class<? extends Plugin> plugin :
Arrays.asList(ParentJoinPlugin.class, ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class)) {
Arrays.asList(ParentJoinPlugin.class, ReindexPlugin.class, PercolatorPlugin.class,
MustachePlugin.class, CompositeAggregationPlugin.class)) {
try {
new PreBuiltTransportClient(Settings.EMPTY, plugin);
fail("exception expected");

View File

@ -235,7 +235,7 @@ public final class IndexSortConfig {
SortField.Type.FLOAT
);
static SortField.Type getSortFieldType(SortField sortField) {
public static SortField.Type getSortFieldType(SortField sortField) {
if (sortField instanceof SortedSetSortField) {
return SortField.Type.STRING;
} else if (sortField instanceof SortedNumericSortField) {

View File

@ -171,8 +171,12 @@ public abstract class ParsedMultiBucketAggregation<B extends ParsedMultiBucketAg
bucket.setDocCount(parser.longValue());
}
} else if (token == XContentParser.Token.START_OBJECT) {
XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class,
if (CommonFields.KEY.getPreferredName().equals(currentFieldName)) {
keyConsumer.accept(parser, bucket);
} else {
XContentParserUtils.parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class,
aggregations::add);
}
}
}
bucket.setAggregations(new Aggregations(aggregations));

View File

@ -51,7 +51,7 @@ public enum SortOrder implements Writeable {
}
};
static SortOrder readFromStream(StreamInput in) throws IOException {
public static SortOrder readFromStream(StreamInput in) throws IOException {
return in.readEnum(SortOrder.class);
}

View File

@ -75,6 +75,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extend
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivativeTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;
import org.junit.Before;
@ -157,7 +158,7 @@ public class AggregationsTests extends ESTestCase {
if (aggsTest instanceof InternalMultiBucketAggregationTestCase) {
// Lower down the number of buckets generated by multi bucket aggregation tests in
// order to avoid too many aggregations to be created.
((InternalMultiBucketAggregationTestCase) aggsTest).maxNumberOfBuckets = 3;
((InternalMultiBucketAggregationTestCase) aggsTest).setMaxNumberOfBuckets(3);
}
aggsTest.setUp();
}
@ -266,9 +267,13 @@ public class AggregationsTests extends ESTestCase {
if (testCase instanceof InternalMultiBucketAggregationTestCase) {
InternalMultiBucketAggregationTestCase multiBucketAggTestCase = (InternalMultiBucketAggregationTestCase) testCase;
if (currentDepth < maxDepth) {
multiBucketAggTestCase.subAggregationsSupplier = () -> createTestInstance(0, currentDepth + 1, maxDepth);
multiBucketAggTestCase.setSubAggregationsSupplier(
() -> createTestInstance(0, currentDepth + 1, maxDepth)
);
} else {
multiBucketAggTestCase.subAggregationsSupplier = () -> InternalAggregations.EMPTY;
multiBucketAggTestCase.setSubAggregationsSupplier(
() -> InternalAggregations.EMPTY
);
}
} else if (testCase instanceof InternalSingleBucketAggregationTestCase) {
InternalSingleBucketAggregationTestCase singleBucketAggTestCase = (InternalSingleBucketAggregationTestCase) testCase;

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.adjacency;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.filter;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters.InternalBucket;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

View File

@ -22,7 +22,7 @@ import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.common.geo.GeoHashUtils;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid.Bucket;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -109,7 +109,7 @@ public class InternalGeoHashGridTests extends InternalMultiBucketAggregationTest
protected Class<? extends ParsedMultiBucketAggregation> implementationClass() {
return ParsedGeoHashGrid.class;
}
@Override
protected InternalGeoHashGrid mutateInstance(InternalGeoHashGrid instance) {
String name = instance.getName();

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.joda.time.DateTime;

View File

@ -26,7 +26,7 @@ import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.range;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND;

View File

@ -20,10 +20,9 @@
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.junit.Before;
import org.elasticsearch.test.InternalAggregationTestCase;
import java.util.HashMap;
import java.util.List;

View File

@ -53,3 +53,5 @@ include::bucket/significanttext-aggregation.asciidoc[]
include::bucket/terms-aggregation.asciidoc[]
include::bucket/composite-aggregation.asciidoc[]

View File

@ -0,0 +1,581 @@
[[search-aggregations-bucket-composite-aggregation]]
=== Composite Aggregation
experimental[]
A multi-bucket aggregation that creates composite buckets from different sources.
Unlike the other `multi-bucket` aggregation the `composite` aggregation can be used
to paginate **all** buckets from a multi-level aggregation efficiently. This aggregation
provides a way to stream **all** buckets of a specific aggregation similarly to what
<<search-request-scroll, scroll>> does for documents.
The composite buckets are built from the combinations of the
values extracted/created for each document and each combination is considered as
a composite bucket.
//////////////////////////
[source,js]
--------------------------------------------------
PUT /sales
{
"mappings": {
"docs": {
"properties": {
"product": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"price": {
"type": "long"
},
"shop": {
"type": "keyword"
}
}
}
}
}
POST /sales/docs/_bulk?refresh
{"index":{"_id":0}}
{"product": "mad max", "price": "20", "timestamp": "2017-05-09T14:35"}
{"index":{"_id":1}}
{"product": "mad max", "price": "25", "timestamp": "2017-05-09T12:35"}
{"index":{"_id":2}}
{"product": "rocky", "price": "10", "timestamp": "2017-05-08T09:10"}
{"index":{"_id":3}}
{"product": "mad max", "price": "27", "timestamp": "2017-05-10T07:07"}
{"index":{"_id":4}}
{"product": "apocalypse now", "price": "10", "timestamp": "2017-05-11T08:35"}
-------------------------------------------------
// NOTCONSOLE
// TESTSETUP
//////////////////////////
For instance the following document:
```
{
"keyword": ["foo", "bar"],
"number": [23, 65, 76]
}
```
\... creates the following composite buckets when `keyword` and `number` are used as values source
for the aggregation:
```
{ "keyword": "foo", "number": 23 }
{ "keyword": "foo", "number": 65 }
{ "keyword": "foo", "number": 76 }
{ "keyword": "bar", "number": 23 }
{ "keyword": "bar", "number": 65 }
{ "keyword": "bar", "number": 76 }
```
==== Values source
The `values` parameter controls the sources that should be used to build the composite buckets.
There are three different types of values source:
===== Terms
The `terms` value source is equivalent to a simple `terms` aggregation.
The values are extracted from a field or a script exactly like the `terms` aggregation.
Example:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "product": { "terms" : { "field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
Like the `terms` aggregation it is also possible to use a script to create the values for the composite buckets:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{
"product": {
"terms" : {
"script" : {
"source": "doc['product'].value",
"lang": "painless"
}
}
}
}
]
}
}
}
}
--------------------------------------------------
// CONSOLE
===== Histogram
The `histogram` value source can be applied on numeric values to build fixed size
interval over the values. The `interval` parameter defines how the numeric values should be
transformed. For instance an `interval` set to 5 will translate any numeric values to its closest interval,
a value of `101` would be translated to `100` which is the key for the interval between 100 and 105.
Example:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "histo": { "histogram" : { "field": "price", "interval": 5 } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
The values are built from a numeric field or a script that return numerical values:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{
"histo": {
"histogram" : {
"interval": 5,
"script" : {
"source": "doc['price'].value",
"lang": "painless"
}
}
}
}
]
}
}
}
}
--------------------------------------------------
// CONSOLE
===== Date Histogram
The `date_histogram` is similar to the `histogram` value source except that the interval
is specified by date/time expression:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "date": { "date_histogram" : { "field": "timestamp", "interval": "1d" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
The example above creates an interval per day and translates all `timestamp` values to the start of its closest intervals.
Available expressions for interval: `year`, `quarter`, `month`, `week`, `day`, `hour`, `minute`, `second`
Time values can also be specified via abbreviations supported by <<time-units,time units>> parsing.
Note that fractional time values are not supported, but you can address this by shifting to another
time unit (e.g., `1.5h` could instead be specified as `90m`).
====== Time Zone
Date-times are stored in Elasticsearch in UTC. By default, all bucketing and
rounding is also done in UTC. The `time_zone` parameter can be used to indicate
that bucketing should use a different time zone.
Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
`-08:00`) or as a timezone id, an identifier used in the TZ database like
`America/Los_Angeles`.
===== Mixing different values source
The `sources` parameter accepts an array of values source.
It is possible to mix different values source to create composite buckets.
For example:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": {"field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
This will create composite buckets from the values created by two values source, a `date_histogram` and a `terms`.
Each bucket is composed of two values, one for each value source defined in the aggregation.
Any type of combinations is allowed and the order in the array is preserved
in the composite buckets.
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "shop": { "terms": {"field": "shop" } } },
{ "product": { "terms": { "field": "product" } } },
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
==== Order
By default the composite buckets are sorted by their natural ordering. Values are sorted
in ascending order of their values. When multiple value sources are requested, the ordering is done per value
source, the first value of the composite bucket is compared to the first value of the other composite bucket and if they are equals the
next values in the composite bucket are used for tie-breaking. This means that the composite bucket
`[foo, 100]` is considered smaller than `[foobar, 0]` because `foo` is considered smaller than `foobar`.
It is possible to define the direction of the sort for each value source by setting `order` to `asc` (default value)
or `desc` (descending order) directly in the value source definition.
For example:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "desc" } } },
{ "product": { "terms": {"field": "product", "order": "asc" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
\... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
and in ascending order when comparing values from the `terms` source.
==== Size
The `size` parameter can be set to define how many composite buckets should be returned.
Each composite bucket is considered as a single bucket so setting a size of 10 will return the
first 1O composite buckets created from the values source.
The response contains the values for each composite bucket in an array containing the values extracted
from each value source.
==== After
If the number of composite buckets is too high (or unknown) to be returned in a single response
it is possible to split the retrieval in multiple requests.
Since the composite buckets are flat by nature, the requested `size` is exactly the number of composite buckets
that will be returned in the response (assuming that they are at least `size` composite buckets to return).
If all composite buckets should be retrieved it is preferable to use a small size (`100` or `1000` for instance)
and then use the `after` parameter to retrieve the next results.
For example:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": {"field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[s/_search/_search\?filter_path=aggregations/]
\... returns:
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"my_buckets": {
"buckets": [
{
"key": {
"date": 1494201600000,
"product": "rocky"
},
"doc_count": 1
},
{
"key": { <1>
"date": 1494288000000,
"product": "mad max"
},
"doc_count": 2
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\.//]
<1> The last composite bucket returned by the query.
The `after` parameter can be used to retrieve the composite buckets that are **after**
the last composite buckets returned in a previous round.
For the example below the last bucket is `"key": [1494288000000, "mad max"]` so the next
round of result can be retrieved with:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "desc" } } },
{ "product": { "terms": {"field": "product", "order": "asc" } } }
],
"after": { "date": 1494288000000, "product": "mad max" } <1>
}
}
}
}
--------------------------------------------------
// CONSOLE
<1> Should restrict the aggregation to buckets that sort **after** the provided values.
==== Sub-aggregations
Like any `multi-bucket` aggregations the `composite` aggregation can hold sub-aggregations.
These sub-aggregations can be used to compute other buckets or statistics on each composite bucket created by this
parent aggregation.
For instance the following example computes the average value of a field
per composite bucket:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "desc" } } },
{ "product": { "terms": {"field": "product" } } }
]
},
"aggregations": {
"the_avg": {
"avg": { "field": "price" }
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[s/_search/_search\?filter_path=aggregations/]
\... returns:
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"my_buckets": {
"buckets": [
{
"key": {
"date": 1494460800000,
"product": "apocalypse now"
},
"doc_count": 1,
"the_avg": {
"value": 10.0
}
},
{
"key": {
"date": 1494374400000,
"product": "mad max"
},
"doc_count": 1,
"the_avg": {
"value": 27.0
}
},
{
"key": {
"date": 1494288000000,
"product" : "mad max"
},
"doc_count": 2,
"the_avg": {
"value": 22.5
}
},
{
"key": {
"date": 1494201600000,
"product": "rocky"
},
"doc_count": 1,
"the_avg": {
"value": 10.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\.//]
==== Index sorting
By default this aggregation runs on every document that match the query.
Though if the index sort matches the composite sort this aggregation can optimize
the execution and can skip documents that contain composite buckets that would not
be part of the response.
For instance the following aggregations:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "asc" } } },
{ "product": { "terms": { "field": "product", "order": "asc" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
\... is much faster on an index that uses the following sort:
[source,js]
--------------------------------------------------
PUT twitter
{
"settings" : {
"index" : {
"sort.field" : ["timestamp", "product"],
"sort.order" : ["asc", "asc"]
}
},
"mappings": {
"sales": {
"properties": {
"timestamp": {
"type": "date"
},
"product": {
"type": "keyword"
}
}
}
}
}
--------------------------------------------------
// CONSOLE
WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow
the same order as the aggregation (`desc` or `asc`).
If only the aggregation results are needed it is also better to set the size of the query to 0
and `track_total_hits` to false in order to remove other slowing factors:
[source,js]
--------------------------------------------------
GET /_search
{
"size": 0,
"track_total_hits": false,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": { "field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
See <<index-modules-index-sorting, index sorting>> for more details.

View File

@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
esplugin {
description 'A multi-bucket aggregation that can paginate buckets from different sources efficiently.'
classname 'org.elasticsearch.search.aggregations.composite.CompositeAggregationPlugin'
hasClientJar = true
}
compileJava.options.compilerArgs << "-Xlint:-deprecation"
compileTestJava.options.compilerArgs << "-Xlint:-deprecation"

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public interface CompositeAggregation extends MultiBucketsAggregation {
interface Bucket extends MultiBucketsAggregation.Bucket {
Map<String, Object> getKey();
}
@Override
List<? extends CompositeAggregation.Bucket> getBuckets();
/**
* Returns the last key in this aggregation. It can be used to retrieve the buckets that are after these values.
* See {@link CompositeAggregationBuilder#aggregateAfter}.
*/
Map<String, Object> afterKey();
static XContentBuilder bucketToXContent(CompositeAggregation.Bucket bucket,
XContentBuilder builder, Params params) throws IOException {
builder.startObject();
buildCompositeMap(CommonFields.KEY.getPreferredName(), bucket.getKey(), builder);
builder.field(CommonFields.DOC_COUNT.getPreferredName(), bucket.getDocCount());
bucket.getAggregations().toXContentInternal(builder, params);
builder.endObject();
return builder;
}
static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
bucketToXContent(bucket, builder, params);
}
builder.endArray();
return builder;
}
static void buildCompositeMap(String fieldName, Map<String, Object> composite, XContentBuilder builder) throws IOException {
builder.startObject(fieldName);
for (Map.Entry<String, Object> entry : composite.entrySet()) {
if (entry.getValue().getClass() == BytesRef.class) {
builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString());
} else {
builder.field(entry.getKey(), entry.getValue());
}
}
builder.endObject();
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class CompositeAggregationBuilder extends AbstractAggregationBuilder<CompositeAggregationBuilder> {
public static final String NAME = "composite";
public static final ParseField AFTER_FIELD_NAME = new ParseField("after");
public static final ParseField SIZE_FIELD_NAME = new ParseField("size");
public static final ParseField SOURCES_FIELD_NAME = new ParseField("sources");
private static final ObjectParser<CompositeAggregationBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(NAME);
PARSER.declareInt(CompositeAggregationBuilder::size, SIZE_FIELD_NAME);
PARSER.declareObject(CompositeAggregationBuilder::aggregateAfter, (parser, context) -> parser.map(), AFTER_FIELD_NAME);
PARSER.declareObjectArray(CompositeAggregationBuilder::setSources,
(p, c) -> CompositeValuesSourceParserHelper.fromXContent(p), SOURCES_FIELD_NAME);
}
public static CompositeAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new CompositeAggregationBuilder(aggregationName), null);
}
private List<CompositeValuesSourceBuilder<?>> sources;
private Map<String, Object> after;
private int size = 10;
private CompositeAggregationBuilder(String name) {
this(name, null);
}
public CompositeAggregationBuilder(String name, List<CompositeValuesSourceBuilder<?>> sources) {
super(name);
this.sources = sources;
}
public CompositeAggregationBuilder(StreamInput in) throws IOException {
super(in);
int num = in.readVInt();
this.sources = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
CompositeValuesSourceBuilder<?> builder = CompositeValuesSourceParserHelper.readFrom(in);
sources.add(builder);
}
this.size = in.readVInt();
if (in.readBoolean()) {
this.after = in.readMap();
}
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(sources.size());
for (CompositeValuesSourceBuilder<?> builder : sources) {
CompositeValuesSourceParserHelper.writeTo(builder, out);
}
out.writeVInt(size);
out.writeBoolean(after != null);
if (after != null) {
out.writeMap(after);
}
}
@Override
public String getType() {
return NAME;
}
private CompositeAggregationBuilder setSources(List<CompositeValuesSourceBuilder<?>> sources) {
this.sources = sources;
return this;
}
/**
* Gets the list of {@link CompositeValuesSourceBuilder} for this aggregation.
*/
public List<CompositeValuesSourceBuilder<?>> sources() {
return sources;
}
/**
* Sets the values that indicates which composite bucket this request should "aggregate after".
* Defaults to <tt>null</tt>.
*/
public CompositeAggregationBuilder aggregateAfter(Map<String, Object> afterKey) {
this.after = afterKey;
return this;
}
/**
* The number of composite buckets to return. Defaults to <tt>10</tt>.
*/
public CompositeAggregationBuilder size(int size) {
this.size = size;
return this;
}
@Override
protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subfactoriesBuilder) throws IOException {
if (parent != null) {
throw new IllegalArgumentException("[composite] aggregation cannot be used with a parent aggregation");
}
final QueryShardContext shardContext = context.getQueryShardContext();
CompositeValuesSourceConfig[] configs = new CompositeValuesSourceConfig[sources.size()];
SortField[] sortFields = new SortField[configs.length];
IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig();
if (indexSortConfig.hasIndexSort()) {
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
List<String> sourceNames = new ArrayList<>();
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
sourceNames.add(sources.get(i).name());
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}
}
final CompositeKey afterKey;
if (after != null) {
if (after.size() != sources.size()) {
throw new IllegalArgumentException("[after] has " + after.size() +
" value(s) but [sources] has " + sources.size());
}
Comparable<?>[] values = new Comparable<?>[sources.size()];
for (int i = 0; i < sources.size(); i++) {
String sourceName = sources.get(i).name();
if (after.containsKey(sourceName) == false) {
throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]");
}
Object obj = after.get(sourceName);
if (obj instanceof Comparable) {
values[i] = (Comparable<?>) obj;
} else {
throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() +
"], expected comparable, got [" + (obj == null ? "null" : obj.getClass().getSimpleName()) + "]");
}
}
afterKey = new CompositeKey(values);
} else {
afterKey = null;
}
return new CompositeAggregationFactory(name, context, parent, subfactoriesBuilder, metaData, size, configs, sourceNames, afterKey);
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SIZE_FIELD_NAME.getPreferredName(), size);
builder.startArray(SOURCES_FIELD_NAME.getPreferredName());
for (CompositeValuesSourceBuilder<?> source: sources) {
builder.startObject();
builder.startObject(source.name());
source.toXContent(builder, params);
builder.endObject();
builder.endObject();
}
builder.endArray();
if (after != null) {
CompositeAggregation.buildCompositeMap(AFTER_FIELD_NAME.getPreferredName(), after, builder);
}
builder.endObject();
return builder;
}
@Override
protected int doHashCode() {
return Objects.hash(sources, size, after);
}
@Override
protected boolean doEquals(Object obj) {
CompositeAggregationBuilder other = (CompositeAggregationBuilder) obj;
return size == other.size &&
Objects.equals(sources, other.sources) &&
Objects.equals(after, other.after);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
class CompositeAggregationFactory extends AggregatorFactory<CompositeAggregationFactory> {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final CompositeKey afterKey;
CompositeAggregationFactory(String name, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources,
List<String> sourceNames, CompositeKey afterKey) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
this.afterKey = afterKey;
}
@Override
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new CompositeAggregator(name, factories, context, parent, pipelineAggregators, metaData,
size, sources, sourceNames, afterKey);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import java.util.Arrays;
import java.util.List;
public class CompositeAggregationPlugin extends Plugin implements SearchPlugin {
@Override
public List<AggregationSpec> getAggregations() {
return Arrays.asList(
new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new, CompositeAggregationBuilder::parse)
.addResultReader(InternalComposite::new)
);
}
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final boolean canEarlyTerminate;
private final TreeMap<Integer, Integer> keys;
private final CompositeValuesComparator array;
private final List<LeafContext> contexts = new ArrayList<>();
private LeafContext leaf;
private RoaringDocIdSet.Builder builder;
CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources, List<String> sourceNames,
CompositeKey rawAfterKey) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
// we use slot 0 to fill the current document (size+1).
this.array = new CompositeValuesComparator(context.searcher().getIndexReader(), sources, size+1);
if (rawAfterKey != null) {
array.setTop(rawAfterKey.values());
}
this.keys = new TreeMap<>(array::compare);
this.canEarlyTerminate = Arrays.stream(sources)
.allMatch(CompositeValuesSourceConfig::canEarlyTerminate);
}
boolean canEarlyTerminate() {
return canEarlyTerminate;
}
private int[] getReverseMuls() {
return Arrays.stream(sources).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
}
@Override
public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
assert zeroBucket == 0L;
// Replay all documents that contain at least one top bucket (collected during the first pass).
grow(keys.size()+1);
for (LeafContext context : contexts) {
DocIdSetIterator docIdSetIterator = context.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
}
final CompositeValuesSource.Collector collector =
array.getLeafCollector(context.ctx, getSecondPassCollector(context.subCollector));
int docID;
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
collector.collect(docID);
}
}
int num = Math.min(size, keys.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
final int[] reverseMuls = getReverseMuls();
int pos = 0;
for (int slot : keys.keySet()) {
CompositeKey key = array.toCompositeKey(slot);
InternalAggregations aggs = bucketAggregations(slot);
int docCount = bucketDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, key, reverseMuls, docCount, aggs);
}
return new InternalComposite(name, size, sourceNames, Arrays.asList(buckets), reverseMuls, pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, Collections.emptyList(), reverseMuls, pipelineAggregators(), metaData());
}
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (leaf != null) {
leaf.docIdSet = builder.build();
contexts.add(leaf);
}
leaf = new LeafContext(ctx, sub);
builder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
final CompositeValuesSource.Collector inner = array.getLeafCollector(ctx, getFirstPassCollector());
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0L;
inner.collect(doc);
}
};
}
@Override
protected void doPostCollection() throws IOException {
if (leaf != null) {
leaf.docIdSet = builder.build();
contexts.add(leaf);
}
}
/**
* The first pass selects the top N composite buckets from all matching documents.
* It also records all doc ids that contain a top N composite bucket in a {@link RoaringDocIdSet} in order to be
* able to replay the collection filtered on the best buckets only.
*/
private CompositeValuesSource.Collector getFirstPassCollector() {
return new CompositeValuesSource.Collector() {
int lastDoc = -1;
@Override
public void collect(int doc) throws IOException {
// Checks if the candidate key in slot 0 is competitive.
if (keys.containsKey(0)) {
// This key is already in the top N, skip it for now.
if (doc != lastDoc) {
builder.add(doc);
lastDoc = doc;
}
return;
}
if (array.hasTop() && array.compareTop(0) <= 0) {
// This key is greater than the top value collected in the previous round.
if (canEarlyTerminate) {
// The index sort matches the composite sort, we can early terminate this segment.
throw new CollectionTerminatedException();
}
// just skip this key for now
return;
}
if (keys.size() >= size) {
// The tree map is full, check if the candidate key should be kept.
if (array.compare(0, keys.lastKey()) > 0) {
// The candidate key is not competitive
if (canEarlyTerminate) {
// The index sort matches the composite sort, we can early terminate this segment.
throw new CollectionTerminatedException();
}
// just skip this key
return;
}
}
// The candidate key is competitive
final int newSlot;
if (keys.size() >= size) {
// the tree map is full, we replace the last key with this candidate.
int slot = keys.pollLastEntry().getKey();
// and we recycle the deleted slot
newSlot = slot;
} else {
newSlot = keys.size() + 1;
}
// move the candidate key to its new slot.
array.move(0, newSlot);
keys.put(newSlot, newSlot);
if (doc != lastDoc) {
builder.add(doc);
lastDoc = doc;
}
}
};
}
/**
* The second pass delegates the collection to sub-aggregations but only if the collected composite bucket is a top bucket (selected
* in the first pass).
*/
private CompositeValuesSource.Collector getSecondPassCollector(LeafBucketCollector subCollector) throws IOException {
return doc -> {
Integer bucket = keys.get(0);
if (bucket != null) {
// The candidate key in slot 0 is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
collectExistingBucket(subCollector, doc, bucket);
}
};
}
static class LeafContext {
final LeafReaderContext ctx;
final LeafBucketCollector subCollector;
DocIdSet docIdSet;
LeafContext(LeafReaderContext ctx, LeafBucketCollector subCollector) {
this.ctx = ctx;
this.subCollector = subCollector;
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.search.DocValueFormat;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* A key that is composed of multiple {@link Comparable} values.
*/
class CompositeKey {
private final Comparable<?>[] values;
CompositeKey(Comparable<?>... values) {
this.values = values;
}
Comparable<?>[] values() {
return values;
}
int size() {
return values.length;
}
Comparable<?> get(int pos) {
assert pos < values.length;
return values[pos];
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompositeKey that = (CompositeKey) o;
return Arrays.equals(values, that.values);
}
@Override
public int hashCode() {
return Arrays.hashCode(values);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import static org.elasticsearch.search.aggregations.composite.CompositeValuesSource.wrapBinary;
import static org.elasticsearch.search.aggregations.composite.CompositeValuesSource.wrapDouble;
import static org.elasticsearch.search.aggregations.composite.CompositeValuesSource.wrapGlobalOrdinals;
import static org.elasticsearch.search.aggregations.composite.CompositeValuesSource.wrapLong;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals;
final class CompositeValuesComparator {
private final int size;
private final CompositeValuesSource<?, ?>[] arrays;
private boolean topValueSet = false;
/**
*
* @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets.
* @param size The number of composite buckets to keep.
*/
CompositeValuesComparator(IndexReader reader, CompositeValuesSourceConfig[] sources, int size) {
this.size = size;
this.arrays = new CompositeValuesSource<?, ?>[sources.length];
for (int i = 0; i < sources.length; i++) {
final int reverseMul = sources[i].reverseMul();
if (sources[i].valuesSource() instanceof WithOrdinals && reader instanceof DirectoryReader) {
WithOrdinals vs = (WithOrdinals) sources[i].valuesSource();
arrays[i] = wrapGlobalOrdinals(vs, size, reverseMul);
} else if (sources[i].valuesSource() instanceof Bytes) {
Bytes vs = (Bytes) sources[i].valuesSource();
arrays[i] = wrapBinary(vs, size, reverseMul);
} else if (sources[i].valuesSource() instanceof Numeric) {
final Numeric vs = (Numeric) sources[i].valuesSource();
if (vs.isFloatingPoint()) {
arrays[i] = wrapDouble(vs, size, reverseMul);
} else {
arrays[i] = wrapLong(vs, size, reverseMul);
}
}
}
}
/**
* Moves the values in <code>slot1</code> to <code>slot2</code>.
*/
void move(int slot1, int slot2) {
assert slot1 < size && slot2 < size;
for (int i = 0; i < arrays.length; i++) {
arrays[i].move(slot1, slot2);
}
}
/**
* Compares the values in <code>slot1</code> with <code>slot2</code>.
*/
int compare(int slot1, int slot2) {
assert slot1 < size && slot2 < size;
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compare(slot1, slot2);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Returns true if a top value has been set for this comparator.
*/
boolean hasTop() {
return topValueSet;
}
/**
* Sets the top values for this comparator.
*/
void setTop(Comparable<?>[] values) {
assert values.length == arrays.length;
topValueSet = true;
for (int i = 0; i < arrays.length; i++) {
arrays[i].setTop(values[i]);
}
}
/**
* Compares the top values with the values in <code>slot</code>.
*/
int compareTop(int slot) {
assert slot < size;
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compareTop(slot);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Builds the {@link CompositeKey} for <code>slot</code>.
*/
CompositeKey toCompositeKey(int slot) throws IOException {
assert slot < size;
Comparable<?>[] values = new Comparable<?>[arrays.length];
for (int i = 0; i < values.length; i++) {
values[i] = arrays[i].toComparable(slot);
}
return new CompositeKey(values);
}
/**
* Gets the {@link LeafBucketCollector} that will record the composite buckets of the visited documents.
*/
CompositeValuesSource.Collector getLeafCollector(LeafReaderContext context, CompositeValuesSource.Collector in) throws IOException {
int last = arrays.length - 1;
CompositeValuesSource.Collector next = arrays[last].getLeafCollector(context, in);
for (int i = last - 1; i >= 0; i--) {
next = arrays[i].getLeafCollector(context, next);
}
return next;
}
}

View File

@ -0,0 +1,410 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalMapping;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
/**
* A wrapper for {@link ValuesSource} that can record and compare values produced during a collection.
*/
abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparable<T>> {
interface Collector {
void collect(int doc) throws IOException;
}
protected final VS vs;
protected final int size;
protected final int reverseMul;
protected T topValue;
/**
*
* @param vs The original {@link ValuesSource}.
* @param size The number of values to record.
* @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed.
*/
CompositeValuesSource(VS vs, int size, int reverseMul) {
this.vs = vs;
this.size = size;
this.reverseMul = reverseMul;
}
/**
* The type of this source.
*/
abstract String type();
/**
* Moves the value in <code>from</code> in <code>to</code>.
* The value present in <code>to</code> is overridden.
*/
abstract void move(int from, int to);
/**
* Compares the value in <code>from</code> with the value in <code>to</code>.
*/
abstract int compare(int from, int to);
/**
* Compares the value in <code>slot</code> with the top value in this source.
*/
abstract int compareTop(int slot);
/**
* Sets the top value for this source. Values that compares smaller should not be recorded.
*/
abstract void setTop(Comparable<?> value);
/**
* Transforms the value in <code>slot</code> to a {@link Comparable} object.
*/
abstract Comparable<T> toComparable(int slot) throws IOException;
/**
* Gets the {@link LeafCollector} that will record the values of the visited documents.
*/
abstract Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException;
/**
* Creates a {@link CompositeValuesSource} that generates long values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, int size, int reverseMul) {
return new LongValuesSource(vs, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates double values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Double> wrapDouble(ValuesSource.Numeric vs, int size, int reverseMul) {
return new DoubleValuesSource(vs, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates binary values.
*/
static CompositeValuesSource<ValuesSource.Bytes, BytesRef> wrapBinary(ValuesSource.Bytes vs, int size, int reverseMul) {
return new BinaryValuesSource(vs, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates global ordinal values.
*/
static CompositeValuesSource<ValuesSource.Bytes.WithOrdinals, BytesRef> wrapGlobalOrdinals(ValuesSource.Bytes.WithOrdinals vs,
int size,
int reverseMul) {
return new GlobalOrdinalValuesSource(vs, size, reverseMul);
}
/**
* A {@link CompositeValuesSource} for global ordinals
*/
private static class GlobalOrdinalValuesSource extends CompositeValuesSource<ValuesSource.Bytes.WithOrdinals, BytesRef> {
private final long[] values;
private SortedSetDocValues lookup;
private Long topValueLong;
GlobalOrdinalValuesSource(ValuesSource.Bytes.WithOrdinals vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new long[size];
}
@Override
String type() {
return "global_ordinals";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Long.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return Long.compare(values[slot], topValueLong) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof BytesRef) {
topValue = (BytesRef) value;
} else if (value instanceof String) {
topValue = new BytesRef(value.toString());
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
Comparable<BytesRef> toComparable(int slot) throws IOException {
return BytesRef.deepCopyOf(lookup.lookupOrd(values[slot]));
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedSetDocValues dvs = vs.globalOrdinalsValues(context);
if (lookup == null) {
lookup = dvs;
if (topValue != null && topValueLong == null) {
if (lookup instanceof GlobalOrdinalMapping) {
// Find the global ordinal (or the insertion point) for the provided top value.
topValueLong = lookupGlobalOrdinals((GlobalOrdinalMapping) lookup, topValue);
} else {
// Global ordinals are not needed, switch back to ordinals (single segment case).
topValueLong = lookup.lookupTerm(topValue);
if (topValueLong < 0) {
// convert negative insert position
topValueLong = -topValueLong - 2;
}
}
}
}
return doc -> {
if (dvs.advanceExact(doc)) {
long ord;
while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) {
values[0] = ord;
next.collect(doc);
}
}
};
}
private static long lookupGlobalOrdinals(GlobalOrdinalMapping mapping, BytesRef key) throws IOException {
long low = 0;
long high = mapping.getValueCount();
while (low <= high) {
long mid = (low + high) >>> 1;
BytesRef midVal = mapping.lookupOrd(mid);
int cmp = midVal.compareTo(key);
if (cmp < 0) {
low = mid + 1;
} else if (cmp > 0) {
high = mid - 1;
} else {
return mid;
}
}
return low-1;
}
}
/**
* A {@link CompositeValuesSource} for binary source ({@link BytesRef})
*/
private static class BinaryValuesSource extends CompositeValuesSource<ValuesSource.Bytes, BytesRef> {
private final BytesRef[] values;
private BytesRef topValue;
BinaryValuesSource(ValuesSource.Bytes vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new BytesRef[size];
}
@Override
String type() {
return "binary";
}
@Override
public void move(int from, int to) {
values[to] = BytesRef.deepCopyOf(values[from]);
}
@Override
public int compare(int from, int to) {
return values[from].compareTo(values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return values[slot].compareTo(topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value.getClass() == BytesRef.class) {
topValue = (BytesRef) value;
} else if (value.getClass() == String.class) {
topValue = new BytesRef((String) value);
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
Comparable<BytesRef> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedBinaryDocValues dvs = vs.bytesValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
/**
* A {@link CompositeValuesSource} for longs.
*/
private static class LongValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Long> {
private final long[] values;
private long topValue;
LongValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new long[size];
}
@Override
String type() {
return "long";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Long.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return Long.compare(values[slot], topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof Number) {
topValue = ((Number) value).longValue();
} else {
topValue = Long.parseLong(value.toString());
}
}
@Override
Comparable<Long> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedNumericDocValues dvs = vs.longValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
/**
* A {@link CompositeValuesSource} for doubles.
*/
private static class DoubleValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Double> {
private final double[] values;
private double topValue;
DoubleValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new double[size];
}
@Override
String type() {
return "long";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Double.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return Double.compare(values[slot], topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof Number) {
topValue = ((Number) value).doubleValue();
} else {
topValue = Double.parseDouble(value.toString());
}
}
@Override
Comparable<Double> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedNumericDoubleValues dvs = vs.doubleValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
}

View File

@ -0,0 +1,304 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
/**
* A {@link ValuesSource} builder for {@link CompositeAggregationBuilder}
*/
public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSourceBuilder<AB>> implements Writeable, ToXContentFragment {
protected final String name;
private String field = null;
private Script script = null;
private ValueType valueType = null;
private Object missing = null;
private SortOrder order = SortOrder.ASC;
CompositeValuesSourceBuilder(String name) {
this(name, null);
}
CompositeValuesSourceBuilder(String name, ValueType valueType) {
this.name = name;
this.valueType = valueType;
}
CompositeValuesSourceBuilder(StreamInput in) throws IOException {
this.name = in.readString();
this.field = in.readOptionalString();
if (in.readBoolean()) {
this.script = new Script(in);
}
if (in.readBoolean()) {
this.valueType = ValueType.readFromStream(in);
}
this.missing = in.readGenericValue();
this.order = SortOrder.readFromStream(in);
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeOptionalString(field);
boolean hasScript = script != null;
out.writeBoolean(hasScript);
if (hasScript) {
script.writeTo(out);
}
boolean hasValueType = valueType != null;
out.writeBoolean(hasValueType);
if (hasValueType) {
valueType.writeTo(out);
}
out.writeGenericValue(missing);
order.writeTo(out);
innerWriteTo(out);
}
protected abstract void innerWriteTo(StreamOutput out) throws IOException;
protected abstract void doXContentBody(XContentBuilder builder, Params params) throws IOException;
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type());
if (field != null) {
builder.field("field", field);
}
if (script != null) {
builder.field("script", script);
}
if (missing != null) {
builder.field("missing", missing);
}
if (valueType != null) {
builder.field("value_type", valueType.getPreferredName());
}
builder.field("order", order);
doXContentBody(builder, params);
builder.endObject();
return builder;
}
@Override
public final int hashCode() {
return Objects.hash(field, missing, script, valueType, order, innerHashCode());
}
protected abstract int innerHashCode();
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@SuppressWarnings("unchecked")
AB that = (AB) o;
return Objects.equals(field, that.field()) &&
Objects.equals(script, that.script()) &&
Objects.equals(valueType, that.valueType()) &&
Objects.equals(missing, that.missing()) &&
Objects.equals(order, that.order()) &&
innerEquals(that);
}
protected abstract boolean innerEquals(AB builder);
public String name() {
return name;
}
abstract String type();
/**
* Sets the field to use for this source
*/
@SuppressWarnings("unchecked")
public AB field(String field) {
if (field == null) {
throw new IllegalArgumentException("[field] must not be null");
}
this.field = field;
return (AB) this;
}
/**
* Gets the field to use for this source
*/
public String field() {
return field;
}
/**
* Sets the script to use for this source
*/
@SuppressWarnings("unchecked")
public AB script(Script script) {
if (script == null) {
throw new IllegalArgumentException("[script] must not be null");
}
this.script = script;
return (AB) this;
}
/**
* Gets the script to use for this source
*/
public Script script() {
return script;
}
/**
* Sets the {@link ValueType} for the value produced by this source
*/
@SuppressWarnings("unchecked")
public AB valueType(ValueType valueType) {
if (valueType == null) {
throw new IllegalArgumentException("[valueType] must not be null");
}
this.valueType = valueType;
return (AB) this;
}
/**
* Gets the {@link ValueType} for the value produced by this source
*/
public ValueType valueType() {
return valueType;
}
/**
* Sets the value to use when the source finds a missing value in a
* document
*/
@SuppressWarnings("unchecked")
public AB missing(Object missing) {
if (missing == null) {
throw new IllegalArgumentException("[missing] must not be null");
}
this.missing = missing;
return (AB) this;
}
public Object missing() {
return missing;
}
/**
* Sets the {@link SortOrder} to use to sort values produced this source
*/
@SuppressWarnings("unchecked")
public AB order(String order) {
if (order == null) {
throw new IllegalArgumentException("[order] must not be null");
}
this.order = SortOrder.fromString(order);
return (AB) this;
}
/**
* Sets the {@link SortOrder} to use to sort values produced this source
*/
@SuppressWarnings("unchecked")
public AB order(SortOrder order) {
if (order == null) {
throw new IllegalArgumentException("[order] must not be null");
}
this.order = order;
return (AB) this;
}
/**
* Gets the {@link SortOrder} to use to sort values produced this source
*/
public SortOrder order() {
return order;
}
/**
* Creates a {@link CompositeValuesSourceConfig} for this source.
*
* @param context The search context for this source.
* @param config The {@link ValuesSourceConfig} for this source.
* @param pos The position of this source in the composite key.
* @param numPos The total number of positions in the composite key.
* @param sortField The {@link SortField} of the index sort at this position or null if not present.
*/
protected abstract CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException;
public final CompositeValuesSourceConfig build(SearchContext context, int pos, int numPos, SortField sortField) throws IOException {
ValuesSourceConfig<?> config = ValuesSourceConfig.resolve(context.getQueryShardContext(),
valueType, field, script, missing, null, null);
return innerBuild(context, config, pos, numPos, sortField);
}
protected boolean checkCanEarlyTerminate(IndexReader reader,
String fieldName,
boolean reverse,
SortField sortField) throws IOException {
return sortField.getField().equals(fieldName) &&
sortField.getReverse() == reverse &&
isSingleValued(reader, sortField);
}
private static boolean isSingleValued(IndexReader reader, SortField field) throws IOException {
SortField.Type type = IndexSortConfig.getSortFieldType(field);
for (LeafReaderContext context : reader.leaves()) {
if (type == SortField.Type.STRING) {
final SortedSetDocValues values = DocValues.getSortedSet(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
} else {
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
}
}
return true;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
class CompositeValuesSourceConfig {
private final String name;
private final ValuesSource vs;
private final int reverseMul;
private final boolean canEarlyTerminate;
CompositeValuesSourceConfig(String name, ValuesSource vs, SortOrder order, boolean canEarlyTerminate) {
this.name = name;
this.vs = vs;
this.canEarlyTerminate = canEarlyTerminate;
this.reverseMul = order == SortOrder.ASC ? 1 : -1;
}
String name() {
return name;
}
ValuesSource valuesSource() {
return vs;
}
/**
* The sort order for the values source (e.g. -1 for descending and 1 for ascending).
*/
int reverseMul() {
assert reverseMul == -1 || reverseMul == 1;
return reverseMul;
}
boolean canEarlyTerminate() {
return canEarlyTerminate;
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.AbstractObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.support.ValueType;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
class CompositeValuesSourceParserHelper {
static <VB extends CompositeValuesSourceBuilder<VB>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> objectParser,
ValueType targetValueType) {
objectParser.declareField(VB::field, XContentParser::text,
new ParseField("field"), ObjectParser.ValueType.STRING);
objectParser.declareField(VB::missing, XContentParser::objectText,
new ParseField("missing"), ObjectParser.ValueType.VALUE);
objectParser.declareField(VB::valueType, p -> {
ValueType valueType = ValueType.resolveForScript(p.text());
if (targetValueType != null && valueType.isNotA(targetValueType)) {
throw new ParsingException(p.getTokenLocation(),
"Aggregation [" + objectParser.getName() + "] was configured with an incompatible value type ["
+ valueType + "]. It can only work on value of type ["
+ targetValueType + "]");
}
return valueType;
}, new ParseField("value_type"), ObjectParser.ValueType.STRING);
objectParser.declareField(VB::script,
(parser, context) -> Script.parse(parser), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
objectParser.declareField(VB::order, XContentParser::text, new ParseField("order"), ObjectParser.ValueType.STRING);
}
static void writeTo(CompositeValuesSourceBuilder<?> builder, StreamOutput out) throws IOException {
final byte code;
if (builder.getClass() == TermsValuesSourceBuilder.class) {
code = 0;
} else if (builder.getClass() == DateHistogramValuesSourceBuilder.class) {
code = 1;
} else if (builder.getClass() == HistogramValuesSourceBuilder.class) {
code = 2;
} else {
throw new IOException("invalid builder type: " + builder.getClass().getSimpleName());
}
out.writeByte(code);
builder.writeTo(out);
}
static CompositeValuesSourceBuilder<?> readFrom(StreamInput in) throws IOException {
int code = in.readByte();
switch(code) {
case 0:
return new TermsValuesSourceBuilder(in);
case 1:
return new DateHistogramValuesSourceBuilder(in);
case 2:
return new HistogramValuesSourceBuilder(in);
default:
throw new IOException("Invalid code " + code);
}
}
static CompositeValuesSourceBuilder<?> fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
String name = parser.currentName();
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
String type = parser.currentName();
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
final CompositeValuesSourceBuilder<?> builder;
switch(type) {
case TermsValuesSourceBuilder.TYPE:
builder = TermsValuesSourceBuilder.parse(name, parser);
break;
case DateHistogramValuesSourceBuilder.TYPE:
builder = DateHistogramValuesSourceBuilder.parse(name, parser);
break;
case HistogramValuesSourceBuilder.TYPE:
builder = HistogramValuesSourceBuilder.parse(name, parser);
break;
default:
throw new ParsingException(parser.getTokenLocation(), "invalid source type: " + type);
}
parser.nextToken();
parser.nextToken();
return builder;
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder.DATE_FIELD_UNITS;
/**
* A {@link CompositeValuesSourceBuilder} that that builds a {@link RoundingValuesSource} from a {@link Script} or
* a field name.
*/
public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<DateHistogramValuesSourceBuilder> {
static final String TYPE = "date_histogram";
private static final ObjectParser<DateHistogramValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
PARSER.declareField((histogram, interval) -> {
if (interval instanceof Long) {
histogram.interval((long) interval);
} else {
histogram.dateHistogramInterval((DateHistogramInterval) interval);
}
}, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return p.longValue();
} else {
return new DateHistogramInterval(p.text());
}
}, Histogram.INTERVAL_FIELD, ObjectParser.ValueType.LONG);
PARSER.declareField(DateHistogramValuesSourceBuilder::timeZone, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return DateTimeZone.forID(p.text());
} else {
return DateTimeZone.forOffsetHours(p.intValue());
}
}, new ParseField("time_zone"), ObjectParser.ValueType.LONG);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, ValueType.NUMERIC);
}
static DateHistogramValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new DateHistogramValuesSourceBuilder(name), null);
}
private long interval = 0;
private DateTimeZone timeZone = null;
private DateHistogramInterval dateHistogramInterval;
public DateHistogramValuesSourceBuilder(String name) {
super(name, ValueType.DATE);
}
protected DateHistogramValuesSourceBuilder(StreamInput in) throws IOException {
super(in);
this.interval = in.readLong();
this.dateHistogramInterval = in.readOptionalWriteable(DateHistogramInterval::new);
if (in.readBoolean()) {
timeZone = DateTimeZone.forID(in.readString());
}
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeLong(interval);
out.writeOptionalWriteable(dateHistogramInterval);
boolean hasTimeZone = timeZone != null;
out.writeBoolean(hasTimeZone);
if (hasTimeZone) {
out.writeString(timeZone.getID());
}
}
@Override
protected void doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (dateHistogramInterval == null) {
builder.field(Histogram.INTERVAL_FIELD.getPreferredName(), interval);
} else {
builder.field(Histogram.INTERVAL_FIELD.getPreferredName(), dateHistogramInterval.toString());
}
if (timeZone != null) {
builder.field("time_zone", timeZone);
}
}
@Override
protected int innerHashCode() {
return Objects.hash(interval, dateHistogramInterval, timeZone);
}
@Override
protected boolean innerEquals(DateHistogramValuesSourceBuilder other) {
return Objects.equals(interval, other.interval)
&& Objects.equals(dateHistogramInterval, other.dateHistogramInterval)
&& Objects.equals(timeZone, other.timeZone);
}
@Override
public String type() {
return TYPE;
}
/**
* Returns the interval in milliseconds that is set on this source
**/
public long interval() {
return interval;
}
/**
* Sets the interval on this source.
* If both {@link #interval()} and {@link #dateHistogramInterval()} are set,
* then the {@link #dateHistogramInterval()} wins.
**/
public DateHistogramValuesSourceBuilder interval(long interval) {
if (interval < 1) {
throw new IllegalArgumentException("[interval] must be 1 or greater for [date_histogram] source");
}
this.interval = interval;
return this;
}
/**
* Returns the date interval that is set on this source
**/
public DateHistogramInterval dateHistogramInterval() {
return dateHistogramInterval;
}
public DateHistogramValuesSourceBuilder dateHistogramInterval(DateHistogramInterval dateHistogramInterval) {
if (dateHistogramInterval == null) {
throw new IllegalArgumentException("[dateHistogramInterval] must not be null");
}
this.dateHistogramInterval = dateHistogramInterval;
return this;
}
/**
* Sets the time zone to use for this aggregation
*/
public DateHistogramValuesSourceBuilder timeZone(DateTimeZone timeZone) {
if (timeZone == null) {
throw new IllegalArgumentException("[timeZone] must not be null: [" + name + "]");
}
this.timeZone = timeZone;
return this;
}
/**
* Gets the time zone to use for this aggregation
*/
public DateTimeZone timeZone() {
return timeZone;
}
private Rounding createRounding() {
Rounding.Builder tzRoundingBuilder;
if (dateHistogramInterval != null) {
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
if (dateTimeUnit != null) {
tzRoundingBuilder = Rounding.builder(dateTimeUnit);
} else {
// the interval is a time value?
tzRoundingBuilder = Rounding.builder(
TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass().getSimpleName() + ".interval"));
}
} else {
// the interval is an integer time value in millis?
tzRoundingBuilder = Rounding.builder(TimeValue.timeValueMillis(interval));
}
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
Rounding rounding = createRounding();
ValuesSource orig = config.toValuesSource(context.getQueryShardContext());
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
}
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
RoundingValuesSource vs = new RoundingValuesSource(numeric, rounding);
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null &&
pos == numPos-1 &&
fieldContext != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import java.io.IOException;
class HistogramValuesSource extends ValuesSource.Numeric {
private final Numeric vs;
private final double interval;
/**
*
* @param vs The original values source
*/
HistogramValuesSource(Numeric vs, double interval) {
this.vs = vs;
this.interval = interval;
}
@Override
public boolean isFloatingPoint() {
return true;
}
@Override
public SortedNumericDoubleValues doubleValues(LeafReaderContext context) throws IOException {
SortedNumericDoubleValues values = vs.doubleValues(context);
return new SortedNumericDoubleValues() {
@Override
public double nextValue() throws IOException {
return Math.floor(values.nextValue() / interval) * interval;
}
@Override
public int docValueCount() {
return values.docValueCount();
}
@Override
public boolean advanceExact(int target) throws IOException {
return values.advanceExact(target);
}
};
}
@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException("not applicable");
}
@Override
public SortedNumericDocValues longValues(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException("not applicable");
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
/**
* A {@link CompositeValuesSourceBuilder} that that builds a {@link HistogramValuesSource} from another numeric values source
* using the provided interval.
*/
public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<HistogramValuesSourceBuilder> {
static final String TYPE = "histogram";
private static final ObjectParser<HistogramValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(HistogramValuesSourceBuilder.TYPE);
PARSER.declareDouble(HistogramValuesSourceBuilder::interval, Histogram.INTERVAL_FIELD);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, ValueType.NUMERIC);
}
static HistogramValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new HistogramValuesSourceBuilder(name), null);
}
private double interval = 0;
public HistogramValuesSourceBuilder(String name) {
super(name, ValueType.DOUBLE);
}
protected HistogramValuesSourceBuilder(StreamInput in) throws IOException {
super(in);
this.interval = in.readDouble();
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeDouble(interval);
}
@Override
protected void doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Histogram.INTERVAL_FIELD.getPreferredName(), interval);
}
@Override
protected int innerHashCode() {
return Objects.hash(interval);
}
@Override
protected boolean innerEquals(HistogramValuesSourceBuilder other) {
return Objects.equals(interval, other.interval);
}
@Override
public String type() {
return TYPE;
}
/**
* Returns the interval that is set on this source
**/
public double interval() {
return interval;
}
/**
* Sets the interval on this source.
**/
public HistogramValuesSourceBuilder interval(double interval) {
if (interval <= 0) {
throw new IllegalArgumentException("[interval] must be greater than 0 for [histogram] source");
}
this.interval = interval;
return this;
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
ValuesSource orig = config.toValuesSource(context.getQueryShardContext());
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
}
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null &&
pos == numPos-1 &&
fieldContext != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}
}
}

View File

@ -0,0 +1,371 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
public class InternalComposite
extends InternalMultiBucketAggregation<InternalComposite, InternalComposite.InternalBucket> implements CompositeAggregation {
private final int size;
private final List<InternalBucket> buckets;
private final int[] reverseMuls;
private final List<String> sourceNames;
InternalComposite(String name, int size, List<String> sourceNames, List<InternalBucket> buckets, int[] reverseMuls,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sourceNames = sourceNames;
this.buckets = buckets;
this.size = size;
this.reverseMuls = reverseMuls;
}
InternalComposite(StreamInput in) throws IOException {
super(in);
this.size = in.readVInt();
this.sourceNames = in.readList(StreamInput::readString);
this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, reverseMuls));
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(size);
out.writeStringList(sourceNames);
out.writeIntArray(reverseMuls);
out.writeList(buckets);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return CompositeAggregation.toXContentFragment(this, builder, params);
}
@Override
public String getWriteableName() {
return CompositeAggregationBuilder.NAME;
}
@Override
public InternalComposite create(List<InternalBucket> buckets) {
return new InternalComposite(name, size, sourceNames, buckets, reverseMuls, pipelineAggregators(), getMetaData());
}
@Override
public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
return new InternalBucket(prototype.sourceNames, prototype.key, prototype.reverseMuls, prototype.docCount, aggregations);
}
public int getSize() {
return size;
}
@Override
public List<InternalBucket> getBuckets() {
return buckets;
}
@Override
public Map<String, Object> afterKey() {
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
}
// Visible for tests
int[] getReverseMuls() {
return reverseMuls;
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
for (InternalAggregation agg : aggregations) {
InternalComposite sortedAgg = (InternalComposite) agg;
BucketIterator it = new BucketIterator(sortedAgg.buckets);
if (it.next() != null) {
pq.add(it);
}
}
InternalBucket lastBucket = null;
List<InternalBucket> buckets = new ArrayList<>();
List<InternalBucket> result = new ArrayList<>();
while (pq.size() > 0) {
BucketIterator bucketIt = pq.poll();
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
buckets.clear();
result.add(reduceBucket);
if (result.size() >= size) {
break;
}
}
lastBucket = bucketIt.current;
buckets.add(bucketIt.current);
if (bucketIt.next() != null) {
pq.add(bucketIt);
}
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
result.add(reduceBucket);
}
return new InternalComposite(name, size, sourceNames, result, reverseMuls, pipelineAggregators(), metaData);
}
@Override
protected boolean doEquals(Object obj) {
InternalComposite that = (InternalComposite) obj;
return Objects.equals(size, that.size) &&
Objects.equals(buckets, that.buckets) &&
Arrays.equals(reverseMuls, that.reverseMuls);
}
@Override
protected int doHashCode() {
return Objects.hash(size, buckets, Arrays.hashCode(reverseMuls));
}
private static class BucketIterator implements Comparable<BucketIterator> {
final Iterator<InternalBucket> it;
InternalBucket current;
private BucketIterator(List<InternalBucket> buckets) {
this.it = buckets.iterator();
}
@Override
public int compareTo(BucketIterator other) {
return current.compareKey(other.current);
}
InternalBucket next() {
return current = it.hasNext() ? it.next() : null;
}
}
static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket
implements CompositeAggregation.Bucket, KeyComparable<InternalBucket> {
private final CompositeKey key;
private final long docCount;
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient List<String> sourceNames;
InternalBucket(List<String> sourceNames, CompositeKey key, int[] reverseMuls, long docCount, InternalAggregations aggregations) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
this.reverseMuls = reverseMuls;
this.sourceNames = sourceNames;
}
@SuppressWarnings("unchecked")
InternalBucket(StreamInput in, List<String> sourceNames, int[] reverseMuls) throws IOException {
final Comparable<?>[] values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
}
this.key = new CompositeKey(values);
this.docCount = in.readVLong();
this.aggregations = InternalAggregations.readAggregations(in);
this.reverseMuls = reverseMuls;
this.sourceNames = sourceNames;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(key.size());
for (int i = 0; i < key.size(); i++) {
out.writeGenericValue(key.get(i));
}
out.writeVLong(docCount);
aggregations.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), docCount, key, aggregations);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
InternalBucket that = (InternalBucket) obj;
return Objects.equals(docCount, that.docCount)
&& Objects.equals(key, that.key)
&& Objects.equals(aggregations, that.aggregations);
}
@Override
public Map<String, Object> getKey() {
return new ArrayMap(sourceNames, key.values());
}
// visible for testing
CompositeKey getRawKey() {
return key;
}
@Override
public String getKeyAsString() {
StringBuilder builder = new StringBuilder();
builder.append('{');
for (int i = 0; i < key.size(); i++) {
if (i > 0) {
builder.append(", ");
}
builder.append(sourceNames.get(i));
builder.append('=');
builder.append(formatObject(key.get(i)));
}
builder.append('}');
return builder.toString();
}
@Override
public long getDocCount() {
return docCount;
}
@Override
public Aggregations getAggregations() {
return aggregations;
}
InternalBucket reduce(List<InternalBucket> buckets, ReduceContext reduceContext) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
for (InternalBucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, reduceContext);
return new InternalBucket(sourceNames, key, reverseMuls, docCount, aggs);
}
@Override
public int compareKey(InternalBucket other) {
for (int i = 0; i < key.size(); i++) {
assert key.get(i).getClass() == other.key.get(i).getClass();
@SuppressWarnings("unchecked")
int cmp = ((Comparable) key.get(i)).compareTo(other.key.get(i)) * reverseMuls[i];
if (cmp != 0) {
return cmp;
}
}
return 0;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
/**
* See {@link CompositeAggregation#bucketToXContentFragment}
*/
throw new UnsupportedOperationException("not implemented");
}
}
static Object formatObject(Object obj) {
if (obj instanceof BytesRef) {
return ((BytesRef) obj).utf8ToString();
}
return obj;
}
private static class ArrayMap extends AbstractMap<String, Object> {
final List<String> keys;
final Object[] values;
ArrayMap(List<String> keys, Object[] values) {
assert keys.size() == values.length;
this.keys = keys;
this.values = values;
}
@Override
public int size() {
return values.length;
}
@Override
public Object get(Object key) {
for (int i = 0; i < keys.size(); i++) {
if (key.equals(keys.get(i))) {
return formatObject(values[i]);
}
}
return null;
}
@Override
public Set<Entry<String, Object>> entrySet() {
return new AbstractSet<Entry<String, Object>>() {
@Override
public Iterator<Entry<String, Object>> iterator() {
return new Iterator<Entry<String, Object>>() {
int pos = 0;
@Override
public boolean hasNext() {
return pos < values.length;
}
@Override
public Entry<String, Object> next() {
SimpleEntry<String, Object> entry =
new SimpleEntry<>(keys.get(pos), formatObject(values[pos]));
++ pos;
return entry;
}
};
}
@Override
public int size() {
return keys.size();
}
};
}
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ParsedComposite extends ParsedMultiBucketAggregation<ParsedComposite.ParsedBucket> implements CompositeAggregation {
private static ObjectParser<ParsedComposite, Void> PARSER =
new ObjectParser<>(ParsedComposite.class.getSimpleName(), true, ParsedComposite::new);
static {
declareMultiBucketAggregationFields(PARSER,
parser -> ParsedComposite.ParsedBucket.fromXContent(parser),
parser -> null
);
}
public static ParsedComposite fromXContent(XContentParser parser, String name) throws IOException {
ParsedComposite aggregation = PARSER.parse(parser, null);
aggregation.setName(name);
return aggregation;
}
@Override
public String getType() {
return CompositeAggregationBuilder.NAME;
}
@Override
public List<ParsedBucket> getBuckets() {
return buckets;
}
@Override
public Map<String, Object> afterKey() {
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
}
@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return CompositeAggregation.toXContentFragment(this, builder, params);
}
public static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements CompositeAggregation.Bucket {
private Map<String, Object> key;
@Override
public String getKeyAsString() {
return key.toString();
}
@Override
public Map<String, Object> getKey() {
return key;
}
void setKey(Map<String, Object> key) {
this.key = key;
}
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
/**
* See {@link CompositeAggregation#bucketToXContent}
*/
throw new UnsupportedOperationException("not implemented");
}
static ParsedComposite.ParsedBucket fromXContent(XContentParser parser) throws IOException {
return parseXContent(parser, false, ParsedBucket::new,
(p, bucket) -> bucket.setKey(p.mapOrdered()));
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import java.io.IOException;
/**
* A wrapper for {@link ValuesSource.Numeric} that uses {@link Rounding} to transform the long values
* produced by the underlying source.
*/
class RoundingValuesSource extends ValuesSource.Numeric {
private final ValuesSource.Numeric vs;
private final Rounding rounding;
/**
*
* @param vs The original values source
* @param rounding How to round the values
*/
RoundingValuesSource(Numeric vs, Rounding rounding) {
this.vs = vs;
this.rounding = rounding;
}
@Override
public boolean isFloatingPoint() {
return false;
}
@Override
public SortedNumericDocValues longValues(LeafReaderContext context) throws IOException {
SortedNumericDocValues values = vs.longValues(context);
return new SortedNumericDocValues() {
@Override
public long nextValue() throws IOException {
return rounding.round(values.nextValue());
}
@Override
public int docValueCount() {
return values.docValueCount();
}
@Override
public boolean advanceExact(int target) throws IOException {
return values.advanceExact(target);
}
@Override
public int docID() {
return values.docID();
}
@Override
public int nextDoc() throws IOException {
return values.nextDoc();
}
@Override
public int advance(int target) throws IOException {
return values.advance(target);
}
@Override
public long cost() {
return values.cost();
}
};
}
@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException("not applicable");
}
@Override
public SortedNumericDoubleValues doubleValues(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException("not applicable");
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
/**
* A {@link CompositeValuesSourceBuilder} that builds a {@link ValuesSource} from a {@link Script} or
* a field name.
*/
public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<TermsValuesSourceBuilder> {
static final String TYPE = "terms";
private static final ObjectParser<TermsValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(TermsValuesSourceBuilder.TYPE);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, null);
}
static TermsValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new TermsValuesSourceBuilder(name), null);
}
public TermsValuesSourceBuilder(String name) {
super(name);
}
protected TermsValuesSourceBuilder(StreamInput in) throws IOException {
super(in);
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {}
@Override
protected void doXContentBody(XContentBuilder builder, Params params) throws IOException {}
@Override
protected int innerHashCode() {
return 0;
}
@Override
protected boolean innerEquals(TermsValuesSourceBuilder builder) {
return true;
}
@Override
public String type() {
return TYPE;
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
ValuesSource vs = config.toValuesSource(context.getQueryShardContext());
if (vs == null) {
vs = ValuesSource.Numeric.EMPTY;
}
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null && config.fieldContext() != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite.spi;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.spi.NamedXContentProvider;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.composite.ParsedComposite;
import java.util.List;
import static java.util.Collections.singletonList;
public class CompositeNamedXContentProvider implements NamedXContentProvider {
@Override
public List<NamedXContentRegistry.Entry> getNamedXContentParsers() {
ParseField parseField = new ParseField(CompositeAggregationBuilder.NAME);
ContextParser<Object, Aggregation> contextParser = (p, name) -> ParsedComposite.fromXContent(p, (String) name);
return singletonList(new NamedXContentRegistry.Entry(Aggregation.class, parseField, contextParser));
}
}

View File

@ -0,0 +1 @@
org.elasticsearch.search.aggregations.composite.spi.CompositeNamedXContentProvider

View File

@ -0,0 +1,196 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.hasSize;
public class CompositeAggregationBuilderTests extends ESTestCase {
static final CompositeAggregationPlugin PLUGIN = new CompositeAggregationPlugin();
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(
new SearchModule(Settings.EMPTY, false, Collections.singletonList(PLUGIN)).getNamedXContents()
);
}
@Override
protected NamedWriteableRegistry writableRegistry() {
return new NamedWriteableRegistry(
new SearchModule(Settings.EMPTY, false, Collections.singletonList(PLUGIN)).getNamedWriteables()
);
}
private DateHistogramValuesSourceBuilder randomDateHistogramSourceBuilder() {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10));
if (randomBoolean()) {
histo.field(randomAlphaOfLengthBetween(1, 20));
} else {
histo.script(new Script(randomAlphaOfLengthBetween(10, 20)));
}
if (randomBoolean()) {
histo.dateHistogramInterval(randomFrom(DateHistogramInterval.days(10),
DateHistogramInterval.minutes(1), DateHistogramInterval.weeks(1)));
} else {
histo.interval(randomNonNegativeLong());
}
if (randomBoolean()) {
histo.timeZone(randomDateTimeZone());
}
return histo;
}
private TermsValuesSourceBuilder randomTermsSourceBuilder() {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10));
if (randomBoolean()) {
terms.field(randomAlphaOfLengthBetween(1, 20));
} else {
terms.script(new Script(randomAlphaOfLengthBetween(10, 20)));
}
terms.order(randomFrom(SortOrder.values()));
return terms;
}
private HistogramValuesSourceBuilder randomHistogramSourceBuilder() {
HistogramValuesSourceBuilder histo = new HistogramValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10));
if (randomBoolean()) {
histo.field(randomAlphaOfLengthBetween(1, 20));
} else {
histo.script(new Script(randomAlphaOfLengthBetween(10, 20)));
}
histo.interval(randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false));
return histo;
}
private CompositeAggregationBuilder randomBuilder() {
int numSources = randomIntBetween(1, 10);
List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
for (int i = 0; i < numSources; i++) {
int type = randomIntBetween(0, 2);
switch (type) {
case 0:
sources.add(randomTermsSourceBuilder());
break;
case 1:
sources.add(randomDateHistogramSourceBuilder());
break;
case 2:
sources.add(randomHistogramSourceBuilder());
break;
default:
throw new AssertionError("wrong branch");
}
}
return new CompositeAggregationBuilder(randomAlphaOfLength(10), sources);
}
public void testFromXContent() throws IOException {
CompositeAggregationBuilder testAgg = randomBuilder();
AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder().addAggregator(testAgg);
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
if (randomBoolean()) {
builder.prettyPrint();
}
factoriesBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentBuilder shuffled = shuffleXContent(builder);
XContentParser parser = createParser(shuffled);
AggregationBuilder newAgg = assertParse(parser);
assertNotSame(newAgg, testAgg);
assertEquals(testAgg, newAgg);
assertEquals(testAgg.hashCode(), newAgg.hashCode());
}
public void testToString() throws IOException {
CompositeAggregationBuilder testAgg = randomBuilder();
String toString = randomBoolean() ? Strings.toString(testAgg) : testAgg.toString();
XContentParser parser = createParser(XContentType.JSON.xContent(), toString);
AggregationBuilder newAgg = assertParse(parser);
assertNotSame(newAgg, testAgg);
assertEquals(testAgg, newAgg);
assertEquals(testAgg.hashCode(), newAgg.hashCode());
}
private AggregationBuilder assertParse(XContentParser parser) throws IOException {
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser);
assertThat(parsed.getAggregatorFactories(), hasSize(1));
assertThat(parsed.getPipelineAggregatorFactories(), hasSize(0));
AggregationBuilder newAgg = parsed.getAggregatorFactories().get(0);
assertNull(parser.nextToken());
assertNotNull(newAgg);
return newAgg;
}
/**
* Test serialization and deserialization of the test AggregatorFactory.
*/
public void testSerialization() throws IOException {
CompositeAggregationBuilder testAgg = randomBuilder();
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.writeNamedWriteable(testAgg);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), writableRegistry())) {
AggregationBuilder deserialized = in.readNamedWriteable(AggregationBuilder.class);
assertEquals(testAgg, deserialized);
assertEquals(testAgg.hashCode(), deserialized.hashCode());
assertNotSame(testAgg, deserialized);
}
}
}
public void testEqualsAndHashcode() throws IOException {
checkEqualsAndHashCode(randomBuilder(), this::copyAggregation);
}
private CompositeAggregationBuilder copyAggregation(CompositeAggregationBuilder agg) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
agg.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), writableRegistry())) {
return (CompositeAggregationBuilder) writableRegistry().getReader(AggregationBuilder.class,
agg.getWriteableName()).read(in);
}
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
public class CompositeAggregationsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public CompositeAggregationsClientYamlTestSuiteIT(@Name("yaml")ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class InternalCompositeTests extends InternalMultiBucketAggregationTestCase<InternalComposite> {
private List<String> sourceNames;
private int[] reverseMuls;
private int[] formats;
private int size;
@Override
public void setUp() throws Exception {
super.setUp();
int numFields = randomIntBetween(1, 10);
size = randomNumberOfBuckets();
sourceNames = new ArrayList<>();
reverseMuls = new int[numFields];
formats = new int[numFields];
for (int i = 0; i < numFields; i++) {
sourceNames.add("field_" + i);
reverseMuls[i] = randomBoolean() ? 1 : -1;
formats[i] = randomIntBetween(0, 2);
}
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
sourceNames= null;
reverseMuls = null;
formats = null;
}
@Override
protected List<NamedXContentRegistry.Entry> getNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>(getDefaultNamedXContents());
ContextParser<Object, Aggregation> parser = (p, c) -> ParsedComposite.fromXContent(p, (String) c);
namedXContents.add(new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(CompositeAggregationBuilder.NAME), parser));
return namedXContents;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
new SearchModule(
Settings.EMPTY, false, Collections.singletonList(new CompositeAggregationPlugin())
).getNamedWriteables()
);
}
@Override
protected Writeable.Reader<InternalComposite> instanceReader() {
return InternalComposite::new;
}
@Override
protected Class<ParsedComposite> implementationClass() {
return ParsedComposite.class;
}
protected <P extends ParsedAggregation> P parseAndAssert(final InternalAggregation aggregation,
final boolean shuffled, final boolean addRandomFields) throws IOException {
return super.parseAndAssert(aggregation, false, false);
}
private CompositeKey createCompositeKey() {
Comparable<?>[] keys = new Comparable<?>[sourceNames.size()];
for (int j = 0; j < keys.length; j++) {
switch (formats[j]) {
case 0:
keys[j] = randomLong();
break;
case 1:
keys[j] = randomDouble();
break;
case 2:
keys[j] = new BytesRef(randomAsciiLettersOfLengthBetween(1, 20));
break;
default:
throw new AssertionError("illegal branch");
}
}
return new CompositeKey(keys);
}
@SuppressWarnings("unchecked")
private Comparator<CompositeKey> getKeyComparator() {
return (o1, o2) -> {
for (int i = 0; i < o1.size(); i++) {
int cmp = ((Comparable) o1.get(i)).compareTo(o2.get(i)) * reverseMuls[i];
if (cmp != 0) {
return cmp;
}
}
return 0;
};
}
@SuppressWarnings("unchecked")
private Comparator<InternalComposite.InternalBucket> getBucketComparator() {
return (o1, o2) -> {
for (int i = 0; i < o1.getRawKey().size(); i++) {
int cmp = ((Comparable) o1.getRawKey().get(i)).compareTo(o2.getRawKey().get(i)) * reverseMuls[i];
if (cmp != 0) {
return cmp;
}
}
return 0;
};
}
@Override
protected InternalComposite createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, InternalAggregations aggregations) {
int numBuckets = randomIntBetween(0, size);
List<InternalComposite.InternalBucket> buckets = new ArrayList<>();
TreeSet<CompositeKey> keys = new TreeSet<>(getKeyComparator());
for (int i = 0; i < numBuckets; i++) {
final CompositeKey key = createCompositeKey();
if (keys.contains(key)) {
continue;
}
keys.add(key);
InternalComposite.InternalBucket bucket =
new InternalComposite.InternalBucket(sourceNames, key, reverseMuls, 1L, aggregations);
buckets.add(bucket);
}
Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
return new InternalComposite(name, size, sourceNames, buckets, reverseMuls, Collections.emptyList(), metaData);
}
@Override
protected InternalComposite mutateInstance(InternalComposite instance) throws IOException {
List<InternalComposite.InternalBucket> buckets = instance.getBuckets();
Map<String, Object> metaData = instance.getMetaData();
int code = randomIntBetween(0, 2);
int[] reverseMuls = instance.getReverseMuls();
switch(code) {
case 0:
int[] newReverseMuls = new int[reverseMuls.length];
for (int i = 0; i < reverseMuls.length; i++) {
newReverseMuls[i] = reverseMuls[i] == 1 ? -1 : 1;
}
reverseMuls = newReverseMuls;
break;
case 1:
buckets = new ArrayList<>(buckets);
buckets.add(new InternalComposite.InternalBucket(sourceNames, createCompositeKey(), reverseMuls,
randomLongBetween(1, 100), InternalAggregations.EMPTY)
);
break;
case 2:
if (metaData == null) {
metaData = new HashMap<>(1);
} else {
metaData = new HashMap<>(instance.getMetaData());
}
metaData.put(randomAlphaOfLength(15), randomInt());
break;
default:
throw new AssertionError("illegal branch");
}
return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, buckets, reverseMuls,
instance.pipelineAggregators(), metaData);
}
@Override
protected void assertReduced(InternalComposite reduced, List<InternalComposite> inputs) {
List<CompositeKey> expectedKeys = inputs.stream()
.flatMap((s) -> s.getBuckets().stream())
.map(InternalComposite.InternalBucket::getRawKey)
.sorted(getKeyComparator())
.distinct()
.limit(reduced.getSize())
.collect(Collectors.toList());
assertThat(reduced.getBuckets().size(), lessThanOrEqualTo(size));
assertThat(reduced.getBuckets().size(), equalTo(expectedKeys.size()));
Iterator<CompositeKey> expectedIt = expectedKeys.iterator();
for (InternalComposite.InternalBucket bucket : reduced.getBuckets()) {
assertTrue(expectedIt.hasNext());
assertThat(bucket.getRawKey(), equalTo(expectedIt.next()));
}
assertFalse(expectedIt.hasNext());
}
public void testReduceSame() throws IOException {
InternalComposite result = createTestInstance(randomAlphaOfLength(10), Collections.emptyList(), Collections.emptyMap(),
InternalAggregations.EMPTY);
List<InternalAggregation> toReduce = new ArrayList<>();
int numSame = randomIntBetween(1, 10);
for (int i = 0; i < numSame; i++) {
toReduce.add(result);
}
InternalComposite finalReduce = (InternalComposite) result.reduce(toReduce,
new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, true));
assertThat(finalReduce.getBuckets().size(), equalTo(result.getBuckets().size()));
Iterator<InternalComposite.InternalBucket> expectedIt = result.getBuckets().iterator();
for (InternalComposite.InternalBucket bucket : finalReduce.getBuckets()) {
InternalComposite.InternalBucket expectedBucket = expectedIt.next();
assertThat(bucket.getRawKey(), equalTo(expectedBucket.getRawKey()));
assertThat(bucket.getDocCount(), equalTo(expectedBucket.getDocCount()*numSame));
}
}
}

View File

@ -0,0 +1,13 @@
# Integration tests for Composite aggs plugin
#
"Composite aggs loaded":
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.info: {}
- match: { nodes.$master.modules.0.name: aggs-composite }

View File

@ -0,0 +1,196 @@
---
setup:
- do:
indices.create:
index: test
body:
mappings:
doc:
properties:
keyword:
type: keyword
long:
type: long
- do:
index:
index: test
type: doc
id: 1
body: { "keyword": "foo", "long": [10, 20] }
- do:
index:
index: test
type: doc
id: 2
body: { "keyword": ["foo", "bar"] }
- do:
index:
index: test
type: doc
id: 3
body: { "keyword": "bar", "long": [100, 0] }
- do:
index:
index: test
type: doc
id: 4
body: { "keyword": "bar", "long": [1000, 0] }
- do:
indices.refresh:
index: [test]
---
"Simple Composite aggregation":
- skip:
version: " - 6.99.99"
reason: this uses a new API that has been added in 7.0
- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
"kw": {
"terms": {
"field": "keyword"
}
}
]
- match: {hits.total: 4}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 3 }
- match: { aggregations.test.buckets.1.key.kw: "foo" }
- match: { aggregations.test.buckets.1.doc_count: 2 }
---
"Nested Composite aggregation":
- skip:
version: " - 6.99.99"
reason: this uses a new API that has been added in 7.0
- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"long": {
"terms": {
"field": "long"
}
}
},
{
"kw": {
"terms": {
"field": "keyword"
}
}
}
]
- match: {hits.total: 4}
- length: { aggregations.test.buckets: 5 }
- match: { aggregations.test.buckets.0.key.long: 0}
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 2 }
- match: { aggregations.test.buckets.1.key.long: 10 }
- match: { aggregations.test.buckets.1.key.kw: "foo"}
- match: { aggregations.test.buckets.1.doc_count: 1 }
- match: { aggregations.test.buckets.2.key.long: 20 }
- match: { aggregations.test.buckets.2.key.kw: "foo" }
- match: { aggregations.test.buckets.2.doc_count: 1 }
- match: { aggregations.test.buckets.3.key.long: 100}
- match: { aggregations.test.buckets.3.key.kw: "bar" }
- match: { aggregations.test.buckets.3.doc_count: 1 }
- match: { aggregations.test.buckets.4.key.long: 1000 }
- match: { aggregations.test.buckets.4.key.kw: "bar" }
- match: { aggregations.test.buckets.4.doc_count: 1 }
---
"Aggregate After":
- skip:
version: " - 6.99.99"
reason: this uses a new API that has been added in 7.0
- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"long": {
"terms": {
"field": "long"
}
}
},
{
"kw": {
"terms": {
"field": "keyword"
}
}
}
]
after: { "long": 20, "kw": "foo" }
- match: {hits.total: 4}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.long: 100 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.long: 1000 }
- match: { aggregations.test.buckets.1.key.kw: "bar" }
- match: { aggregations.test.buckets.1.doc_count: 1 }
---
"Invalid Composite aggregation":
- skip:
version: " - 6.99.99"
reason: this uses a new API that has been added in 7.0
- do:
catch: /\[composite\] aggregation cannot be used with a parent aggregation/
search:
index: test
body:
aggregations:
test:
terms:
field: long
aggs:
nested:
composite:
sources: [
{
"kw": {
"terms": {
"field": "keyword"
}
}
}
]

View File

@ -30,6 +30,7 @@ List projects = [
'test:fixtures:old-elasticsearch',
'test:logger-usage',
'modules:aggs-matrix-stats',
'modules:aggs-composite',
'modules:analysis-common',
'modules:ingest-common',
'modules:lang-expression',

View File

@ -93,11 +93,17 @@ public abstract class AggregatorTestCase extends ESTestCase {
private List<Releasable> releasables = new ArrayList<>();
private static final String TYPE_NAME = "type";
protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
IndexSearcher indexSearcher,
MappedFieldType... fieldTypes) throws IOException {
return createAggregatorFactory(aggregationBuilder, indexSearcher, createIndexSettings(), fieldTypes);
}
/** Create a factory for the given aggregation builder. */
protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
IndexSearcher indexSearcher,
MappedFieldType... fieldTypes) throws IOException {
IndexSettings indexSettings = createIndexSettings();
IndexSearcher indexSearcher,
IndexSettings indexSettings,
MappedFieldType... fieldTypes) throws IOException {
SearchContext searchContext = createSearchContext(indexSearcher, indexSettings);
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
when(searchContext.bigArrays()).thenReturn(new MockBigArrays(Settings.EMPTY, circuitBreakerService));
@ -121,6 +127,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
when(searchContext.lookup()).thenReturn(searchLookup);
QueryShardContext queryShardContext = queryShardContextMock(mapperService, fieldTypes, circuitBreakerService);
when(queryShardContext.getIndexSettings()).thenReturn(indexSettings);
when(searchContext.getQueryShardContext()).thenReturn(queryShardContext);
for (MappedFieldType fieldType : fieldTypes) {
when(searchContext.smartNameFieldType(fieldType.name())).thenReturn(fieldType);
@ -132,8 +139,16 @@ public abstract class AggregatorTestCase extends ESTestCase {
protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
IndexSearcher indexSearcher,
MappedFieldType... fieldTypes) throws IOException {
return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), fieldTypes);
}
protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
IndexSearcher indexSearcher,
IndexSettings indexSettings,
MappedFieldType... fieldTypes) throws IOException {
@SuppressWarnings("unchecked")
A aggregator = (A) createAggregatorFactory(aggregationBuilder, indexSearcher, fieldTypes).create(null, true);
A aggregator = (A) createAggregatorFactory(aggregationBuilder, indexSearcher, indexSettings, fieldTypes)
.create(null, true);
return aggregator;
}
@ -218,6 +233,14 @@ public abstract class AggregatorTestCase extends ESTestCase {
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return search(searcher, query, builder, createIndexSettings(), fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSearcher searcher,
Query query,
AggregationBuilder builder,
IndexSettings indexSettings,
MappedFieldType... fieldTypes) throws IOException {
C a = createAggregator(builder, searcher, fieldTypes);
a.preCollection();
searcher.search(query, a);

View File

@ -136,6 +136,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;

View File

@ -17,11 +17,17 @@
* under the License.
*/
package org.elasticsearch.search.aggregations;
package org.elasticsearch.test;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalAggregationTestCase;
import java.io.IOException;
import java.util.ArrayList;
@ -38,8 +44,8 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
private static final int DEFAULT_MAX_NUMBER_OF_BUCKETS = 10;
Supplier<InternalAggregations> subAggregationsSupplier;
int maxNumberOfBuckets = DEFAULT_MAX_NUMBER_OF_BUCKETS;
private Supplier<InternalAggregations> subAggregationsSupplier;
private int maxNumberOfBuckets = DEFAULT_MAX_NUMBER_OF_BUCKETS;
protected int randomNumberOfBuckets() {
return randomIntBetween(minNumberOfBuckets(), maxNumberOfBuckets());
@ -53,6 +59,14 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
return maxNumberOfBuckets;
}
public void setMaxNumberOfBuckets(int maxNumberOfBuckets) {
this.maxNumberOfBuckets = maxNumberOfBuckets;
}
public void setSubAggregationsSupplier(Supplier<InternalAggregations> subAggregationsSupplier) {
this.subAggregationsSupplier = subAggregationsSupplier;
}
@Override
public void setUp() throws Exception {
super.setUp();