Optimize composite aggregation based on index sorting (#48399) (#50272)

Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
This commit is contained in:
Jim Ferenczi 2019-12-20 12:32:37 +01:00 committed by GitHub
parent 1c7bfebe01
commit 2acafd4b15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 660 additions and 127 deletions

View File

@ -117,6 +117,7 @@ Example:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -135,6 +136,7 @@ Like the `terms` aggregation it is also possible to use a script to create the v
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -170,6 +172,7 @@ Example:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -188,6 +191,7 @@ The values are built from a numeric field or a script that return numerical valu
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -220,6 +224,7 @@ is specified by date/time expression:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -249,6 +254,7 @@ the format specified with the format parameter:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -291,6 +297,7 @@ For example:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -313,6 +320,7 @@ in the composite buckets.
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -342,6 +350,7 @@ For example:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -368,6 +377,7 @@ It is possible to include them in the response by setting `missing_bucket` to
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -393,7 +403,7 @@ first 10 composite buckets created from the values source.
The response contains the values for each composite bucket in an array containing the values extracted
from each value source.
==== After
==== Pagination
If the number of composite buckets is too high (or unknown) to be returned in a single response
it is possible to split the retrieval in multiple requests.
@ -407,6 +417,7 @@ For example:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -472,6 +483,7 @@ round of result can be retrieved with:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
@ -489,6 +501,116 @@ GET /_search
<1> Should restrict the aggregation to buckets that sort **after** the provided values.
==== Early termination
For optimal performance the <<index-modules-index-sorting,index sort>> should be set on the index so that it matches
parts or fully the source order in the composite aggregation.
For instance the following index sort:
[source,console]
--------------------------------------------------
PUT twitter
{
"settings" : {
"index" : {
"sort.field" : ["username", "timestamp"], <1>
"sort.order" : ["asc", "desc"] <2>
}
},
"mappings": {
"properties": {
"username": {
"type": "keyword",
"doc_values": true
},
"timestamp": {
"type": "date"
}
}
}
}
--------------------------------------------------
<1> This index is sorted by `username` first then by `timestamp`.
<2> ... in ascending order for the `username` field and in descending order for the `timestamp` field.
.. could be used to optimize these composite aggregations:
[source,console]
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "user_name": { "terms" : { "field": "user_name" } } } <1>
]
}
}
}
}
--------------------------------------------------
<1> `user_name` is a prefix of the index sort and the order matches (`asc`).
[source,console]
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "user_name": { "terms" : { "field": "user_name" } } }, <1>
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } <2>
]
}
}
}
}
--------------------------------------------------
<1> `user_name` is a prefix of the index sort and the order matches (`asc`).
<2> `timestamp` matches also the prefix and the order matches (`desc`).
In order to optimize the early termination it is advised to set `track_total_hits` in the request
to `false`. The number of total hits that match the request can be retrieved on the first request
and it would be costly to compute this number on every page:
[source,console]
--------------------------------------------------
GET /_search
{
"size": 0,
"track_total_hits": false,
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "user_name": { "terms" : { "field": "user_name" } } },
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }
]
}
}
}
}
--------------------------------------------------
Note that the order of the source is important, in the example below switching the `user_name` with the `timestamp`
would deactivate the sort optimization since this configuration wouldn't match the index sort specification.
If the order of sources do not matter for your use case you can follow these simple guidelines:
* Put the fields with the highest cardinality first.
* Make sure that the order of the field matches the order of the index sort.
* Put multi-valued fields last since they cannot be used for early termination.
WARNING: <<index-modules-index-sorting,index sort>> can slowdown indexing, it is very important to test index sorting
with your specific use case and dataset to ensure that it matches your requirement. If it doesn't note that `composite`
aggregations will also try to early terminate on non-sorted indices if the query matches all document (`match_all` query).
==== Sub-aggregations
Like any `multi-bucket` aggregations the `composite` aggregation can hold sub-aggregations.
@ -501,6 +623,7 @@ per composite bucket:
--------------------------------------------------
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {

View File

@ -235,7 +235,8 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
} else {
afterKey = null;
}
return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size, configs, afterKey);
return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size,
configs, afterKey);
}

View File

@ -20,18 +20,28 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.queries.SearchAfterSortedDocQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -46,6 +56,8 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.CellIdSource;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import java.io.IOException;
import java.util.ArrayList;
@ -60,11 +72,12 @@ import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.M
final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final SortedDocsProducer sortedDocsProducer;
private final List<String> sourceNames;
private final int[] reverseMuls;
private final List<DocValueFormat> formats;
private final CompositeKey rawAfterKey;
private final CompositeValuesSourceConfig[] sourceConfigs;
private final SingleDimensionValuesSource<?>[] sources;
private final CompositeValuesCollectorQueue queue;
@ -73,6 +86,8 @@ final class CompositeAggregator extends BucketsAggregator {
private RoaringDocIdSet.Builder docIdSetBuilder;
private BucketCollector deferredCollectors;
private boolean earlyTerminated;
CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException {
@ -89,11 +104,12 @@ final class CompositeAggregator extends BucketsAggregator {
" to: [" + bucketLimit + "] but was [" + size + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() +
"] cluster level setting.", bucketLimit);
}
this.sourceConfigs = sourceConfigs;
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size);
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
this.rawAfterKey = rawAfterKey;
}
@Override
@ -121,7 +137,6 @@ final class CompositeAggregator extends BucketsAggregator {
public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
assert zeroBucket == 0L;
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
@ -138,13 +153,13 @@ final class CompositeAggregator extends BucketsAggregator {
}
CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
pipelineAggregators(), metaData());
earlyTerminated, pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls,
pipelineAggregators(), metaData());
false, pipelineAggregators(), metaData());
}
private void finishLeaf() {
@ -156,58 +171,179 @@ final class CompositeAggregator extends BucketsAggregator {
}
}
/** Return true if the provided field may have multiple values per document in the leaf **/
private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortField) throws IOException {
SortField.Type type = IndexSortConfig.getSortFieldType(sortField);
switch (type) {
case STRING:
final SortedSetDocValues v1 = context.reader().getSortedSetDocValues(sortField.getField());
return v1 != null && DocValues.unwrapSingleton(v1) == null;
case DOUBLE:
case FLOAT:
case LONG:
case INT:
final SortedNumericDocValues v2 = context.reader().getSortedNumericDocValues(sortField.getField());
return v2 != null && DocValues.unwrapSingleton(v2) == null;
default:
// we have no clue whether the field is multi-valued or not so we assume it is.
return true;
}
}
/**
* Returns the {@link Sort} prefix that is eligible to index sort
* optimization and null if index sort is not applicable.
*/
private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException {
Sort indexSort = context.reader().getMetaData().getSort();
if (indexSort == null) {
return null;
}
List<SortField> sortFields = new ArrayList<>();
for (int i = 0; i < indexSort.getSort().length; i++) {
CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
SingleDimensionValuesSource<?> source = sources[i];
SortField indexSortField = indexSort.getSort()[i];
if (source.fieldType == null
// TODO: can we handle missing bucket when using index sort optimization ?
|| source.missingBucket
|| indexSortField.getField().equals(source.fieldType.name()) == false
|| isMaybeMultivalued(context, indexSortField)
|| sourceConfig.hasScript()) {
break;
}
if (indexSortField.getReverse() != (source.reverseMul == -1)) {
if (i == 0) {
// the leading index sort matches the leading source field but the order is reversed
// so we don't check the other sources.
return new Sort(indexSortField);
}
break;
}
sortFields.add(indexSortField);
}
return sortFields.isEmpty() ? null : new Sort(sortFields.toArray(new SortField[0]));
}
/**
* Return the number of leading sources that match the index sort.
*
* @param indexSortPrefix The index sort prefix that matches the sources
* @return The length of the index sort prefix if the sort order matches
* or -1 if the leading index sort is in the reverse order of the
* leading source. A value of 0 indicates that the index sort is
* not applicable.
*/
private int computeSortPrefixLen(Sort indexSortPrefix) {
if (indexSortPrefix == null) {
return 0;
}
if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) {
assert indexSortPrefix.getSort().length == 1;
return -1;
} else {
return indexSortPrefix.getSort().length;
}
}
private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) throws IOException {
DocValueFormat[] formats = new DocValueFormat[indexSortPrefix.getSort().length];
for (int i = 0; i < formats.length; i++) {
formats[i] = sources[i].format;
}
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(new SortAndFormats(indexSortPrefix, formats),
Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length));
if (indexSortPrefix.getSort().length < sources.length) {
// include all docs that belong to the partial bucket
fieldDoc.doc = 0;
}
BooleanQuery newQuery = new BooleanQuery.Builder()
.add(context.query(), BooleanClause.Occur.MUST)
.add(new SearchAfterSortedDocQuery(indexSortPrefix, fieldDoc), BooleanClause.Occur.FILTER)
.build();
Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
DocIdSetIterator docIt = scorer.iterator();
final LeafBucketCollector inner = queue.getLeafCollector(ctx,
getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length));
inner.setScorer(scorer);
while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
inner.collect(docIt.docID());
}
}
}
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
finishLeaf();
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
Sort indexSortPrefix = buildIndexSortPrefix(ctx);
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);
SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 ?
sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null;
if (sortedDocsProducer != null) {
/*
The producer will visit documents sorted by the leading source of the composite definition
and terminates when the leading source value is guaranteed to be greater than the lowest
composite bucket in the queue.
*/
// Visit documents sorted by the leading source of the composite definition and terminates
// when the leading source value is guaranteed to be greater than the lowest composite bucket
// in the queue.
DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(ctx, docIdSet));
}
/*
We can bypass search entirely for this segment, all the processing has been done in the previous call.
Throwing this exception will terminate the execution of the search for this root aggregation,
see {@link org.apache.lucene.search.MultiCollector} for more details on how we handle early termination in aggregations.
*/
// We can bypass search entirely for this segment, the processing is done in the previous call.
// Throwing this exception will terminate the execution of the search for this root aggregation,
// see {@link MultiCollector} for more details on how we handle early termination in aggregations.
earlyTerminated = true;
throw new CollectionTerminatedException();
} else {
if (fillDocIdSet) {
currentLeaf = ctx;
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder));
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0L;
inner.collect(doc);
}
};
if (rawAfterKey != null && sortPrefixLen > 0) {
// We have an after key and index sort is applicable so we jump directly to the doc
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
} else {
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0L;
inner.collect(doc);
}
};
}
}
}
/**
* The first pass selects the top composite buckets from all matching documents.
*/
private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) {
private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder, int indexSortPrefix) {
return new LeafBucketCollector() {
int lastDoc = -1;
@Override
public void collect(int doc, long bucket) throws IOException {
int slot = queue.addIfCompetitive();
if (slot != -1) {
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
try {
if (queue.addIfCompetitive(indexSortPrefix)) {
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
}
}
} catch (CollectionTerminatedException exc) {
earlyTerminated = true;
throw exc;
}
}
};
@ -274,7 +410,6 @@ final class CompositeAggregator extends BucketsAggregator {
private SingleDimensionValuesSource<?> createValuesSource(BigArrays bigArrays, IndexReader reader,
CompositeValuesSourceConfig config, int size) {
final int reverseMul = config.reverseMul();
if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
@ -63,6 +64,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
private final int maxSize;
private final Map<Slot, Integer> map;
private final SingleDimensionValuesSource<?>[] arrays;
private IntArray docCounts;
private boolean afterKeyIsSet = false;
@ -153,7 +155,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
cmp = arrays[i].compare(slot1, slot2);
}
if (cmp != 0) {
return cmp;
return cmp > 0 ? i+1 : -(i+1);
}
}
return 0;
@ -244,27 +246,57 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
/**
* Check if the current candidate should be added in the queue.
* @return The target slot of the candidate or -1 is the candidate is not competitive.
* @return <code>true</code> if the candidate is competitive (added or already in the queue).
*/
int addIfCompetitive() {
boolean addIfCompetitive() {
return addIfCompetitive(0);
}
/**
* Add or update the current composite key in the queue if the values are competitive.
*
* @param indexSortSourcePrefix 0 if the index sort is null or doesn't match any of the sources field,
* a value greater than 0 indicates the prefix len of the sources that match the index sort
* and a negative value indicates that the index sort match the source field but the order is reversed.
* @return <code>true</code> if the candidate is competitive (added or already in the queue).
*
* @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting.
*/
boolean addIfCompetitive(int indexSortSourcePrefix) {
// checks if the candidate key is competitive
Integer topSlot = compareCurrent();
if (topSlot != null) {
// this key is already in the top N, skip it
docCounts.increment(topSlot, 1);
return topSlot;
return true;
}
if (afterKeyIsSet && compareCurrentWithAfter() <= 0) {
// this key is greater than the top value collected in the previous round, skip it
return -1;
if (afterKeyIsSet) {
int cmp = compareCurrentWithAfter();
if (cmp <= 0) {
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
// the leading index sort is in the reverse order of the leading source
// so we can early terminate when we reach a document that is smaller
// than the after key (collected on a previous page).
throw new CollectionTerminatedException();
}
// key was collected on a previous page, skip it (>= afterKey).
return false;
}
}
if (size() >= maxSize
// the tree map is full, check if the candidate key should be kept
&& compare(CANDIDATE_SLOT, top()) > 0) {
// the candidate key is not competitive, skip it
return -1;
if (size() >= maxSize) {
// the tree map is full, check if the candidate key should be kept
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) {
if (cmp <= indexSortSourcePrefix) {
// index sort guarantees that there is no key greater or equal than the
// current one in the subsequent documents so we can early terminate.
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
return false;
}
}
// the candidate key is competitive
final int newSlot;
if (size() >= maxSize) {
@ -280,7 +312,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
copyCurrent(newSlot);
map.put(new Slot(newSlot), newSlot);
add(newSlot);
return newSlot;
return true;
}
@Override

View File

@ -221,7 +221,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
}
/**
* If true an explicit `null bucket will represent documents with missing values.
* If <code>true</code> an explicit <code>null</code> bucket will represent documents with missing values.
*/
@SuppressWarnings("unchecked")
public AB missingBucket(boolean missingBucket) {

View File

@ -33,23 +33,28 @@ class CompositeValuesSourceConfig {
private final DocValueFormat format;
private final int reverseMul;
private final boolean missingBucket;
private final boolean hasScript;
/**
* Creates a new {@link CompositeValuesSourceConfig}.
*
* @param name The name of the source.
* @param fieldType The field type or null if the source is a script.
* @param vs The underlying {@link ValuesSource}.
* @param format The {@link DocValueFormat} of this source.
* @param order The sort order associated with this source.
* @param missingBucket If <code>true</code> an explicit <code>null</code> bucket will represent documents with missing values.
* @param hasScript <code>true</code> if the source contains a script that can change the value.
*/
CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format,
SortOrder order, boolean missingBucket) {
SortOrder order, boolean missingBucket, boolean hasScript) {
this.name = name;
this.fieldType = fieldType;
this.vs = vs;
this.format = format;
this.reverseMul = order == SortOrder.ASC ? 1 : -1;
this.missingBucket = missingBucket;
this.hasScript = hasScript;
}
/**
@ -88,6 +93,13 @@ class CompositeValuesSourceConfig {
return missingBucket;
}
/**
* Returns true if the source contains a script that can change the value.
*/
boolean hasScript() {
return hasScript;
}
/**
* The sort order for the values source (e.g. -1 for descending and 1 for ascending).
*/

View File

@ -228,7 +228,8 @@ public class DateHistogramValuesSourceBuilder
// is specified in the builder.
final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket());
return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(),
missingBucket(), config.script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -113,7 +113,8 @@ public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder
// is specified in the builder.
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
CellIdSource cellIdSource = new CellIdSource(geoPoint, precision, GeoTileUtils::longEncode);
return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), missingBucket());
return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(),
missingBucket(), script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected geo_point, got " + orig.getClass().getSimpleName());
}

View File

@ -119,7 +119,8 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missingBucket());
return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(),
missingBucket(), script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -54,8 +54,10 @@ public class InternalComposite
private final List<String> sourceNames;
private final List<DocValueFormat> formats;
private final boolean earlyTerminated;
InternalComposite(String name, int size, List<String> sourceNames, List<DocValueFormat> formats,
List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls,
List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls, boolean earlyTerminated,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sourceNames = sourceNames;
@ -64,6 +66,7 @@ public class InternalComposite
this.afterKey = afterKey;
this.size = size;
this.reverseMuls = reverseMuls;
this.earlyTerminated = earlyTerminated;
}
public InternalComposite(StreamInput in) throws IOException {
@ -85,6 +88,7 @@ public class InternalComposite
} else {
this.afterKey = buckets.size() > 0 ? buckets.get(buckets.size()-1).key : null;
}
this.earlyTerminated = in.getVersion().onOrAfter(Version.V_7_6_0) ? in.readBoolean() : false;
}
@Override
@ -104,6 +108,9 @@ public class InternalComposite
afterKey.writeTo(out);
}
}
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeBoolean(earlyTerminated);
}
}
@Override
@ -124,7 +131,7 @@ public class InternalComposite
* to be able to retrieve the next page even if all buckets have been filtered.
*/
return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey,
reverseMuls, pipelineAggregators(), getMetaData());
reverseMuls, earlyTerminated, pipelineAggregators(), getMetaData());
}
@Override
@ -150,6 +157,11 @@ public class InternalComposite
return null;
}
// Visible for tests
boolean isTerminatedEarly() {
return earlyTerminated;
}
// Visible for tests
int[] getReverseMuls() {
return reverseMuls;
@ -158,8 +170,10 @@ public class InternalComposite
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
boolean earlyTerminated = false;
for (InternalAggregation agg : aggregations) {
InternalComposite sortedAgg = (InternalComposite) agg;
earlyTerminated |= sortedAgg.earlyTerminated;
BucketIterator it = new BucketIterator(sortedAgg.buckets);
if (it.next() != null) {
pq.add(it);
@ -191,7 +205,8 @@ public class InternalComposite
result.add(reduceBucket);
}
final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData);
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls,
earlyTerminated, pipelineAggregators(), metaData);
}
@Override

View File

@ -66,8 +66,7 @@ abstract class SortedDocsProducer {
@Override
public void collect(int doc, long bucket) throws IOException {
hasCollected[0] = true;
int slot = queue.addIfCompetitive();
if (slot != -1) {
if (queue.addIfCompetitive()) {
topCompositeCollected[0]++;
if (adder != null && doc != lastDoc) {
if (remainingBits == 0) {

View File

@ -85,6 +85,6 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
} else {
format = config.format();
}
return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket());
return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), script() != null);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
@ -385,4 +386,8 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
return NAME;
}
@Override
protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
return super.doRewrite(queryShardContext);
}
}

View File

@ -184,7 +184,8 @@ public class SearchAfterBuilder implements ToXContentObject, Writeable {
if (value instanceof Number) {
return ((Number) value).longValue();
}
return Long.parseLong(value.toString());
return format.parseLong(value.toString(), false,
() -> { throw new IllegalStateException("now() is not allowed in [search_after] key"); });
case FLOAT:
if (value instanceof Number) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
@ -31,17 +32,28 @@ import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.GeoPointFieldMapper;
@ -63,6 +75,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.After;
import org.junit.Before;
@ -82,12 +95,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class CompositeAggregatorTests extends AggregatorTestCase {
public class CompositeAggregatorTests extends AggregatorTestCase {
private static MappedFieldType[] FIELD_TYPES;
@Override
@ -109,6 +123,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("date");
builder.docValues(true);
builder.format("yyyy-MM-dd||epoch_millis");
DateFieldMapper fieldMapper =
builder.build(new Mapper.BuilderContext(createIndexSettings().getSettings(), new ContentPath(0)));
FIELD_TYPES[3] = fieldMapper.fieldType();
@ -419,7 +434,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
public void testWithKeywordDesc() throws Exception {
public void testWithKeywordDesc() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
@ -485,19 +500,19 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
return new CompositeAggregationBuilder("name", Collections.singletonList(terms));
}, (result) -> {
assertEquals(5, result.getBuckets().size());
assertEquals("{keyword=z}", result.afterKey().toString());
assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString());
assertEquals(1L, result.getBuckets().get(3).getDocCount());
assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString());
assertEquals(1L, result.getBuckets().get(4).getDocCount());
}
assertEquals(5, result.getBuckets().size());
assertEquals("{keyword=z}", result.afterKey().toString());
assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString());
assertEquals(1L, result.getBuckets().get(3).getDocCount());
assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString());
assertEquals(1L, result.getBuckets().get(4).getDocCount());
}
);
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
@ -589,10 +604,10 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long")
)
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long")
)
),
(result) -> {
assertEquals(4, result.getBuckets().size());
@ -610,11 +625,11 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long")
)
).aggregateAfter(createAfterKey("keyword", "a", "long", 100L)
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long")
)
).aggregateAfter(createAfterKey("keyword", "a", "long", 100L)
),
(result) -> {
assertEquals(2, result.getBuckets().size());
@ -942,7 +957,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
new TermsValuesSourceBuilder("double").field("double")
)
).aggregateAfter(createAfterKey("keyword", "a", "long", 100L, "double", 0.4d))
,(result) -> {
, (result) -> {
assertEquals(10, result.getBuckets().size());
assertEquals("{keyword=z, long=0, double=0.09}", result.afterKey().toString());
assertEquals("{keyword=b, long=100, double=0.4}", result.getBuckets().get(0).getKeyAsString());
@ -1152,8 +1167,9 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", "now"));
},
(result) -> {}
));
(result) -> {
}
));
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key"));
@ -1167,7 +1183,8 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", "1474329600000"));
},
(result) -> {}
(result) -> {
}
));
assertThat(exc.getMessage(), containsString("failed to parse date field [1474329600000]"));
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
@ -1486,7 +1503,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
new DateHistogramValuesSourceBuilder("date_histo").field("date")
.dateHistogramInterval(DateHistogramInterval.days(1))
)
).aggregateAfter(createAfterKey("keyword","c", "date_histo", 1474329600000L))
).aggregateAfter(createAfterKey("keyword", "c", "date_histo", 1474329600000L))
, (result) -> {
assertEquals(4, result.getBuckets().size());
assertEquals("{keyword=z, date_histo=1474329600000}", result.afterKey().toString());
@ -1668,7 +1685,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
builders.add(new TermsValuesSourceBuilder("duplicate1").field("baz"));
builders.add(new TermsValuesSourceBuilder("duplicate2").field("bar"));
builders.add(new TermsValuesSourceBuilder("duplicate2").field("baz"));
new CompositeAggregationBuilder("foo", builders);
new CompositeAggregationBuilder("foo", builders);
});
assertThat(e.getMessage(), equalTo("Composite source names must be unique, found duplicates: [duplicate2, duplicate1]"));
}
@ -1705,7 +1722,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
List<Map<String, List<Object>>> dataset = new ArrayList<>();
Set<T> valuesSet = new HashSet<>();
Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<> ();
Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<>();
for (int i = 0; i < numDocs; i++) {
int numValues = randomIntBetween(1, 5);
Set<Object> values = new HashSet<>();
@ -1725,13 +1742,13 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
List<Comparable<T>> seen = new ArrayList<>();
AtomicBoolean finish = new AtomicBoolean(false);
int size = randomIntBetween(1, expected.size());
int size = randomIntBetween(1, expected.size());
while (finish.get() == false) {
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset,
() -> {
Map<String, Object> afterKey = null;
if (seen.size() > 0) {
afterKey = Collections.singletonMap(field, seen.get(seen.size()-1));
afterKey = Collections.singletonMap(field, seen.get(seen.size() - 1));
}
TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field);
return new CompositeAggregationBuilder("name", Collections.singletonList(source))
@ -1838,44 +1855,130 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
public void testEarlyTermination() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
createDocument("keyword", "a", "long", 100L, "foo", "bar"),
createDocument("keyword", "c", "long", 100L, "foo", "bar"),
createDocument("keyword", "a", "long", 0L, "foo", "bar"),
createDocument("keyword", "d", "long", 10L, "foo", "bar"),
createDocument("keyword", "b", "long", 10L, "foo", "bar"),
createDocument("keyword", "c", "long", 10L, "foo", "bar"),
createDocument("keyword", "e", "long", 100L, "foo", "bar"),
createDocument("keyword", "e", "long", 10L, "foo", "bar")
)
);
executeTestCase(true, false, new TermQuery(new Term("foo", "bar")),
dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long")
)).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2),
(result) -> {
assertEquals(2, result.getBuckets().size());
assertEquals("{keyword=c, long=100}", result.afterKey().toString());
assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString());
assertEquals(1L, result.getBuckets().get(1).getDocCount());
assertTrue(result.isTerminatedEarly());
}
);
// source field and index sorting config have different order
executeTestCase(true, false, new TermQuery(new Term("foo", "bar")),
dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
// reverse source order
new TermsValuesSourceBuilder("keyword").field("keyword").order(SortOrder.DESC),
new TermsValuesSourceBuilder("long").field("long").order(SortOrder.DESC)
)
).aggregateAfter(createAfterKey("keyword", "c", "long", 10L)).size(2),
(result) -> {
assertEquals(2, result.getBuckets().size());
assertEquals("{keyword=a, long=100}", result.afterKey().toString());
assertEquals("{keyword=b, long=10}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString());
assertEquals(1L, result.getBuckets().get(1).getDocCount());
assertTrue(result.isTerminatedEarly());
}
);
}
private void testSearchCase(List<Query> queries,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
Consumer<InternalComposite> verify) throws IOException {
for (Query query : queries) {
executeTestCase(false, query, dataset, create, verify);
executeTestCase(true, query, dataset, create, verify);
executeTestCase(false, false, query, dataset, create, verify);
executeTestCase(false, true, query, dataset, create, verify);
executeTestCase(true, true, query, dataset, create, verify);
}
}
private void executeTestCase(boolean reduced,
private void executeTestCase(boolean useIndexSort,
boolean reduced,
Query query,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
Consumer<InternalComposite> verify) throws IOException {
Map<String, MappedFieldType> types =
Arrays.stream(FIELD_TYPES).collect(Collectors.toMap(MappedFieldType::name, Function.identity()));
CompositeAggregationBuilder aggregationBuilder = create.get();
Sort indexSort = useIndexSort ? buildIndexSort(aggregationBuilder.sources(), types) : null;
IndexSettings indexSettings = createIndexSettings(indexSort);
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random()));
if (indexSort != null) {
config.setIndexSort(indexSort);
config.setCodec(TestUtil.getDefaultCodec());
}
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) {
Document document = new Document();
for (Map<String, List<Object>> fields : dataset) {
addToDocument(document, fields);
indexWriter.addDocument(document);
document.clear();
}
if (reduced == false && randomBoolean()) {
indexWriter.forceMerge(1);
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
CompositeAggregationBuilder aggregationBuilder = create.get();
final InternalComposite composite;
if (reduced) {
composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
composite = searchAndReduce(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES);
} else {
composite = search(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
composite = search(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES);
}
verify.accept(composite);
}
}
}
private static IndexSettings createIndexSettings(Sort sort) {
Settings.Builder builder = Settings.builder();
if (sort != null) {
String[] fields = Arrays.stream(sort.getSort())
.map(SortField::getField)
.toArray(String[]::new);
String[] orders = Arrays.stream(sort.getSort())
.map((o) -> o.getReverse() ? "desc" : "asc")
.toArray(String[]::new);
builder.putList("index.sort.field", fields);
builder.putList("index.sort.order", orders);
}
return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build());
}
private void addToDocument(Document doc, Map<String, List<Object>> keys) {
for (Map.Entry<String, List<Object>> entry : keys.entrySet()) {
final String name = entry.getKey();
@ -1935,4 +2038,43 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
private static long asLong(String dateTime) {
return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
}
private static Sort buildIndexSort(List<CompositeValuesSourceBuilder<?>> sources, Map<String, MappedFieldType> fieldTypes) {
List<SortField> sortFields = new ArrayList<>();
for (CompositeValuesSourceBuilder<?> source : sources) {
MappedFieldType type = fieldTypes.get(source.field());
if (type instanceof KeywordFieldMapper.KeywordFieldType) {
sortFields.add(new SortedSetSortField(type.name(), false));
} else if (type instanceof DateFieldMapper.DateFieldType) {
sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false));
} else if (type instanceof NumberFieldMapper.NumberFieldType) {
boolean comp = false;
switch (type.typeName()) {
case "byte":
case "short":
case "integer":
comp = true;
sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.INT, false));
break;
case "long":
sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false));
break;
case "float":
case "double":
comp = true;
sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.DOUBLE, false));
break;
default:
break;
}
if (comp == false) {
break;
}
}
}
return sortFields.size() > 0 ? new Sort(sortFields.toArray(new SortField[0])) : null;
}
}

View File

@ -29,10 +29,16 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
@ -56,6 +62,7 @@ import java.util.Set;
import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
static class ClassAndName {
@ -133,31 +140,47 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
}
private void testRandomCase(ClassAndName... types) throws IOException {
testRandomCase(true, true, types);
testRandomCase(true, false, types);
testRandomCase(false, true, types);
testRandomCase(false, false, types);
for (int i = 0; i < types.length; i++) {
testRandomCase(true, true, i, types);
testRandomCase(true, false, i, types);
testRandomCase(false, true, i, types);
testRandomCase(false, false, i, types);
}
}
private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException {
private void testRandomCase(boolean forceMerge,
boolean missingBucket,
int indexSortSourcePrefix,
ClassAndName... types) throws IOException {
final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
int numDocs = randomIntBetween(50, 100);
List<Comparable<?>[]> possibleValues = new ArrayList<>();
for (ClassAndName type : types) {
SortField[] indexSortFields = indexSortSourcePrefix == 0 ? null : new SortField[indexSortSourcePrefix];
for (int i = 0; i < types.length; i++) {
ClassAndName type = types[i];
final Comparable<?>[] values;
int numValues = randomIntBetween(1, numDocs * 2);
values = new Comparable[numValues];
if (type.clazz == Long.class) {
for (int i = 0; i < numValues; i++) {
values[i] = randomLong();
if (i < indexSortSourcePrefix) {
indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.LONG);
}
for (int j = 0; j < numValues; j++) {
values[j] = randomLong();
}
} else if (type.clazz == Double.class) {
for (int i = 0; i < numValues; i++) {
values[i] = randomDouble();
if (i < indexSortSourcePrefix) {
indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.DOUBLE);
}
for (int j = 0; j < numValues; j++) {
values[j] = randomDouble();
}
} else if (type.clazz == BytesRef.class) {
for (int i = 0; i < numValues; i++) {
values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
if (i < indexSortSourcePrefix) {
indexSortFields[i] = new SortedSetSortField(type.fieldType.name(), false);
}
for (int j = 0; j < numValues; j++) {
values[j] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
}
} else {
assert (false);
@ -167,13 +190,17 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
Set<CompositeKey> keys = new HashSet<>();
try (Directory directory = newDirectory()) {
final IndexWriterConfig writerConfig = newIndexWriterConfig();
if (indexSortFields != null) {
writerConfig.setIndexSort(new Sort(indexSortFields));
}
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) {
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
List<List<Comparable<?>>> docValues = new ArrayList<>();
boolean hasAllField = true;
for (int j = 0; j < types.length; j++) {
int numValues = randomIntBetween(0, 5);
int numValues = indexSortSourcePrefix-1 >= j ? 1 : randomIntBetween(0, 5);
List<Comparable<?>> values = new ArrayList<>();
if (numValues == 0) {
hasAllField = false;
@ -212,7 +239,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
}
}
IndexReader reader = DirectoryReader.open(directory);
int size = randomIntBetween(1, keys.size());
int size = keys.size() > 1 ? randomIntBetween(1, keys.size()) : 1;
SingleDimensionValuesSource<?>[] sources = new SingleDimensionValuesSource[types.length];
for (int i = 0; i < types.length; i++) {
final MappedFieldType fieldType = types[i].fieldType;
@ -276,21 +303,25 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last);
final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery());
for (LeafReaderContext leafReaderContext : reader.leaves()) {
final LeafBucketCollector leafCollector = new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
queue.addIfCompetitive();
}
};
if (docsProducer != null && withProducer) {
assertEquals(DocIdSet.EMPTY,
docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false));
} else {
final LeafBucketCollector leafCollector = new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
queue.addIfCompetitive(indexSortSourcePrefix);
}
};
final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector);
final Bits liveDocs = leafReaderContext.reader().getLiveDocs();
for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
queueCollector.collect(i);
try {
queueCollector.collect(i);
} catch (CollectionTerminatedException exc) {
assertThat(indexSortSourcePrefix, greaterThan(0));
}
}
}
}

View File

@ -170,7 +170,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
}
Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls,
return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(),
Collections.emptyList(), metaData);
}
@ -207,7 +207,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
}
CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null;
return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, formats, buckets, lastBucket, reverseMuls,
instance.pipelineAggregators(), metaData);
randomBoolean(), instance.pipelineAggregators(), metaData);
}
@Override

View File

@ -304,7 +304,15 @@ public abstract class AggregatorTestCase extends ESTestCase {
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return search(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
return search(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return search(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSearcher searcher,
@ -312,8 +320,17 @@ public abstract class AggregatorTestCase extends ESTestCase {
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
return search(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);
a.preCollection();
searcher.search(query, a);
a.postCollection();
@ -327,7 +344,23 @@ public abstract class AggregatorTestCase extends ESTestCase {
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
}
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
}
/**
@ -335,7 +368,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
* builds an aggregator for each sub-searcher filtered by the provided {@link Query} and
* returns the reduced {@link InternalAggregation}.
*/
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
@ -364,7 +398,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
for (ShardSearcher subSearcher : subSearchers) {
MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket);
C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes);
C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes);
a.preCollection();
subSearcher.search(weight, a);
a.postCollection();