Optimize the composite aggregation for match_all and range queries (#28745)

This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values
present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate
the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:

```
"composite" : {
  "sources" : [
    { "value1": { "terms" : { "field": "timestamp", "order": "asc" } } }
  ],
  "size": 10
}
```
... can use the field `timestamp` to collect the documents with the 10 lowest values for the field instead of visiting all documents.
For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough
composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.

This mode can execute iff:
 * The leading source in the composite definition uses an indexed field of type `date` (works also with `date_histogram` source), `integer`, `long` or `keyword`.
 * The query is a match_all query or a range query over the field that is used as the leading source in the composite definition.
 * The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only).

If these conditions are not met this aggregation visits each document like any other agg.
This commit is contained in:
Jim Ferenczi 2018-03-26 09:51:37 +02:00 committed by GitHub
parent afe95a7738
commit 5288235ca3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 2325 additions and 1042 deletions

View File

@ -545,88 +545,3 @@ GET /_search
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\.//]
==== Index sorting
By default this aggregation runs on every document that match the query.
Though if the index sort matches the composite sort this aggregation can optimize
the execution and can skip documents that contain composite buckets that would not
be part of the response.
For instance the following aggregations:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "asc" } } },
{ "product": { "terms": { "field": "product", "order": "asc" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
\... is much faster on an index that uses the following sort:
[source,js]
--------------------------------------------------
PUT twitter
{
"settings" : {
"index" : {
"sort.field" : ["timestamp", "product"],
"sort.order" : ["asc", "asc"]
}
},
"mappings": {
"sales": {
"properties": {
"timestamp": {
"type": "date"
},
"product": {
"type": "keyword"
}
}
}
}
}
--------------------------------------------------
// CONSOLE
WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow
the same order as the aggregation (`desc` or `asc`).
If only the aggregation results are needed it is also better to set the size of the query to 0
and `track_total_hits` to false in order to remove other slowing factors:
[source,js]
--------------------------------------------------
GET /_search
{
"size": 0,
"track_total_hits": false,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": { "field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
See <<index-modules-index-sorting, index sorting>> for more details.

View File

@ -99,6 +99,7 @@ setup:
- do:
search:
index: test
allow_partial_search_results: false
body:
aggregations:
test:

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
/**
* A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
*/
class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
private final BytesRef[] values;
private BytesRef currentValue;
BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
int size, int reverseMul) {
super(fieldType, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.values = new BytesRef[size];
}
@Override
public void copyCurrent(int slot) {
values[slot] = BytesRef.deepCopyOf(currentValue);
}
@Override
public int compare(int from, int to) {
return compareValues(values[from], values[to]);
}
@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values[slot]);
}
@Override
int compareCurrentWithAfter() {
return compareValues(currentValue, afterValue);
}
int compareValues(BytesRef v1, BytesRef v2) {
return v1.compareTo(v2) * reverseMul;
}
@Override
void setAfter(Comparable<?> value) {
if (value.getClass() == BytesRef.class) {
afterValue = (BytesRef) value;
} else if (value.getClass() == String.class) {
afterValue = new BytesRef((String) value);
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
BytesRef toComparable(int slot) {
return values[slot];
}
@Override
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException {
final SortedBinaryDocValues dvs = docValuesFunc.apply(context);
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
currentValue = dvs.nextValue();
next.collect(doc, bucket);
}
}
}
};
}
@Override
LeafBucketCollector getLeafCollector(Comparable<?> value, LeafReaderContext context, LeafBucketCollector next) {
if (value.getClass() != BytesRef.class) {
throw new IllegalArgumentException("Expected BytesRef, got " + value.getClass());
}
currentValue = (BytesRef) value;
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
next.collect(doc, bucket);
}
};
}
@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return null;
}
return new TermsSortedDocsProducer(fieldType.name());
}
@Override
public void close() {}
}

View File

@ -19,16 +19,12 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -154,16 +150,9 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
if (parent != null) {
throw new IllegalArgumentException("[composite] aggregation cannot be used with a parent aggregation");
}
final QueryShardContext shardContext = context.getQueryShardContext();
CompositeValuesSourceConfig[] configs = new CompositeValuesSourceConfig[sources.size()];
SortField[] sortFields = new SortField[configs.length];
IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig();
if (indexSortConfig.hasIndexSort()) {
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
configs[i] = sources.get(i).build(context);
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import java.util.Arrays;
import java.util.List;
public class CompositeAggregationPlugin extends Plugin implements SearchPlugin {
@Override
public List<AggregationSpec> getAggregations() {
return Arrays.asList(
new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new, CompositeAggregationBuilder::parse)
.addResultReader(InternalComposite::new)
);
}
}

View File

@ -19,22 +19,29 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -43,97 +50,74 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final SortedDocsProducer sortedDocsProducer;
private final List<String> sourceNames;
private final int[] reverseMuls;
private final List<DocValueFormat> formats;
private final boolean canEarlyTerminate;
private final TreeMap<Integer, Integer> keys;
private final CompositeValuesComparator array;
private final CompositeValuesCollectorQueue queue;
private final List<LeafContext> contexts = new ArrayList<>();
private LeafContext leaf;
private RoaringDocIdSet.Builder builder;
private final List<Entry> entries;
private LeafReaderContext currentLeaf;
private RoaringDocIdSet.Builder docIdSetBuilder;
private BucketCollector deferredCollectors;
CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources, CompositeKey rawAfterKey) throws IOException {
int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = Arrays.stream(sources).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.formats = Arrays.stream(sources).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
// we use slot 0 to fill the current document (size+1).
this.array = new CompositeValuesComparator(context.searcher().getIndexReader(), sources, size+1);
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
final SingleDimensionValuesSource<?>[] sources =
createValuesSources(context.bigArrays(), context.searcher().getIndexReader(), context.query(), sourceConfigs, size);
this.queue = new CompositeValuesCollectorQueue(sources, size);
this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
if (rawAfterKey != null) {
array.setTop(rawAfterKey.values());
queue.setAfter(rawAfterKey.values());
}
this.keys = new TreeMap<>(array::compare);
this.canEarlyTerminate = Arrays.stream(sources)
.allMatch(CompositeValuesSourceConfig::canEarlyTerminate);
this.entries = new ArrayList<>();
}
boolean canEarlyTerminate() {
return canEarlyTerminate;
@Override
protected void doClose() {
Releasables.close(queue);
}
private int[] getReverseMuls() {
return Arrays.stream(sources).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = BucketCollector.wrap(collectors);
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}
@Override
protected void doPostCollection() throws IOException {
finishLeaf();
}
@Override
public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
assert zeroBucket == 0L;
consumeBucketsAndMaybeBreak(keys.size());
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
grow(keys.size()+1);
final boolean needsScores = needsScores();
Weight weight = null;
if (needsScores) {
Query query = context.query();
weight = context.searcher().createNormalizedWeight(query, true);
}
for (LeafContext context : contexts) {
DocIdSetIterator docIdSetIterator = context.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
}
final CompositeValuesSource.Collector collector =
array.getLeafCollector(context.ctx, getSecondPassCollector(context.subCollector));
int docID;
DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(context.ctx);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (docIdSetIterator it not empty).
scorerIt = scorer.iterator();
context.subCollector.setScorer(scorer);
}
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (needsScores) {
assert scorerIt.docID() < docID;
scorerIt.advance(docID);
// aggregations should only be replayed on matching documents
assert scorerIt.docID() == docID;
}
collector.collect(docID);
}
runDeferredCollections();
}
int num = Math.min(size, keys.size());
int num = Math.min(size, queue.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
final int[] reverseMuls = getReverseMuls();
int pos = 0;
for (int slot : keys.keySet()) {
CompositeKey key = array.toCompositeKey(slot);
for (int slot : queue.getSortedSlot()) {
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = bucketAggregations(slot);
int docCount = bucketDocCount(slot);
int docCount = queue.getDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
@ -143,20 +127,46 @@ final class CompositeAggregator extends BucketsAggregator {
@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls,
pipelineAggregators(), metaData());
}
private void finishLeaf() {
if (currentLeaf != null) {
DocIdSet docIdSet = docIdSetBuilder.build();
entries.add(new Entry(currentLeaf, docIdSet));
currentLeaf = null;
docIdSetBuilder = null;
}
}
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (leaf != null) {
leaf.docIdSet = builder.build();
contexts.add(leaf);
finishLeaf();
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
if (sortedDocsProducer != null) {
/**
* The producer will visit documents sorted by the leading source of the composite definition
* and terminates when the leading source value is guaranteed to be greater than the lowest
* composite bucket in the queue.
*/
DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(ctx, docIdSet));
}
leaf = new LeafContext(ctx, sub);
builder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
final CompositeValuesSource.Collector inner = array.getLeafCollector(ctx, getFirstPassCollector());
/**
* We can bypass search entirely for this segment, all the processing has been done in the previous call.
* Throwing this exception will terminate the execution of the search for this root aggregation,
* see {@link MultiCollector} for more details on how we handle early termination in aggregations.
*/
throw new CollectionTerminatedException();
} else {
if (fillDocIdSet) {
currentLeaf = ctx;
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder));
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
@ -165,103 +175,131 @@ final class CompositeAggregator extends BucketsAggregator {
}
};
}
@Override
protected void doPostCollection() throws IOException {
if (leaf != null) {
leaf.docIdSet = builder.build();
contexts.add(leaf);
}
}
/**
* The first pass selects the top N composite buckets from all matching documents.
* It also records all doc ids that contain a top N composite bucket in a {@link RoaringDocIdSet} in order to be
* able to replay the collection filtered on the best buckets only.
* The first pass selects the top composite buckets from all matching documents.
*/
private CompositeValuesSource.Collector getFirstPassCollector() {
return new CompositeValuesSource.Collector() {
private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) {
return new LeafBucketCollector() {
int lastDoc = -1;
@Override
public void collect(int doc) throws IOException {
// Checks if the candidate key in slot 0 is competitive.
if (keys.containsKey(0)) {
// This key is already in the top N, skip it for now.
if (doc != lastDoc) {
public void collect(int doc, long bucket) throws IOException {
int slot = queue.addIfCompetitive();
if (slot != -1) {
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
}
return;
}
if (array.hasTop() && array.compareTop(0) <= 0) {
// This key is greater than the top value collected in the previous round.
if (canEarlyTerminate) {
// The index sort matches the composite sort, we can early terminate this segment.
throw new CollectionTerminatedException();
}
// just skip this key for now
return;
}
if (keys.size() >= size) {
// The tree map is full, check if the candidate key should be kept.
if (array.compare(0, keys.lastKey()) > 0) {
// The candidate key is not competitive
if (canEarlyTerminate) {
// The index sort matches the composite sort, we can early terminate this segment.
throw new CollectionTerminatedException();
}
// just skip this key
return;
}
}
// The candidate key is competitive
final int newSlot;
if (keys.size() >= size) {
// the tree map is full, we replace the last key with this candidate.
int slot = keys.pollLastEntry().getKey();
// and we recycle the deleted slot
newSlot = slot;
} else {
newSlot = keys.size() + 1;
}
// move the candidate key to its new slot.
array.move(0, newSlot);
keys.put(newSlot, newSlot);
if (doc != lastDoc) {
builder.add(doc);
lastDoc = doc;
}
}
};
}
/**
* The second pass delegates the collection to sub-aggregations but only if the collected composite bucket is a top bucket (selected
* in the first pass).
* Replay the documents that might contain a top bucket and pass top buckets to
* the {@link this#deferredCollectors}.
*/
private CompositeValuesSource.Collector getSecondPassCollector(LeafBucketCollector subCollector) throws IOException {
return doc -> {
Integer bucket = keys.get(0);
if (bucket != null) {
// The candidate key in slot 0 is a top bucket.
private void runDeferredCollections() throws IOException {
final boolean needsScores = needsScores();
Weight weight = null;
if (needsScores) {
Query query = context.query();
weight = context.searcher().createNormalizedWeight(query, true);
}
deferredCollectors.preCollection();
for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
}
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context);
final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));
DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
if (scorer != null) {
scorerIt = scorer.iterator();
subCollector.setScorer(scorer);
}
}
int docID;
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (needsScores) {
assert scorerIt != null && scorerIt.docID() < docID;
scorerIt.advance(docID);
// aggregations should only be replayed on matching documents
assert scorerIt.docID() == docID;
}
collector.collect(docID);
}
}
deferredCollectors.postCollection();
}
/**
* Replay the top buckets from the matching documents.
*/
private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollector) {
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
Integer slot = queue.compareCurrent();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
collectExistingBucket(subCollector, doc, bucket);
subCollector.collect(doc, slot);
}
}
};
}
static class LeafContext {
final LeafReaderContext ctx;
final LeafBucketCollector subCollector;
DocIdSet docIdSet;
private static SingleDimensionValuesSource<?>[] createValuesSources(BigArrays bigArrays, IndexReader reader, Query query,
CompositeValuesSourceConfig[] configs, int size) {
final SingleDimensionValuesSource<?>[] sources = new SingleDimensionValuesSource[configs.length];
for (int i = 0; i < sources.length; i++) {
final int reverseMul = configs[i].reverseMul();
if (configs[i].valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) configs[i].valuesSource();
sources[i] = new GlobalOrdinalValuesSource(bigArrays, configs[i].fieldType(), vs::globalOrdinalsValues, size, reverseMul);
if (i == 0 && sources[i].createSortedDocsProducerOrNull(reader, query) != null) {
// this the leading source and we can optimize it with the sorted docs producer but
// we don't want to use global ordinals because the number of visited documents
// should be low and global ordinals need one lookup per visited term.
Releasables.close(sources[i]);
sources[i] = new BinaryValuesSource(configs[i].fieldType(), vs::bytesValues, size, reverseMul);
}
} else if (configs[i].valuesSource() instanceof ValuesSource.Bytes) {
ValuesSource.Bytes vs = (ValuesSource.Bytes) configs[i].valuesSource();
sources[i] = new BinaryValuesSource(configs[i].fieldType(), vs::bytesValues, size, reverseMul);
} else if (configs[i].valuesSource() instanceof ValuesSource.Numeric) {
final ValuesSource.Numeric vs = (ValuesSource.Numeric) configs[i].valuesSource();
if (vs.isFloatingPoint()) {
sources[i] = new DoubleValuesSource(bigArrays, configs[i].fieldType(), vs::doubleValues, size, reverseMul);
} else {
if (vs instanceof RoundingValuesSource) {
sources[i] = new LongValuesSource(bigArrays, configs[i].fieldType(), vs::longValues,
((RoundingValuesSource) vs)::round, configs[i].format(), size, reverseMul);
} else {
sources[i] = new LongValuesSource(bigArrays, configs[i].fieldType(), vs::longValues,
(value) -> value, configs[i].format(), size, reverseMul);
}
}
}
}
return sources;
}
LeafContext(LeafReaderContext ctx, LeafBucketCollector subCollector) {
this.ctx = ctx;
this.subCollector = subCollector;
private static class Entry {
final LeafReaderContext context;
final DocIdSet docIdSet;
Entry(LeafReaderContext context, DocIdSet docIdSet) {
this.context = context;
this.docIdSet = docIdSet;
}
}
}

View File

@ -0,0 +1,247 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeMap;
/**
* A specialized queue implementation for composite buckets
*/
final class CompositeValuesCollectorQueue implements Releasable {
// the slot for the current candidate
private static final int CANDIDATE_SLOT = Integer.MAX_VALUE;
private final int maxSize;
private final TreeMap<Integer, Integer> keys;
private final SingleDimensionValuesSource<?>[] arrays;
private final int[] docCounts;
private boolean afterValueSet = false;
/**
* Constructs a composite queue with the specified size and sources.
*
* @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets.
* @param size The number of composite buckets to keep.
*/
CompositeValuesCollectorQueue(SingleDimensionValuesSource<?>[] sources, int size) {
this.maxSize = size;
this.arrays = sources;
this.docCounts = new int[size];
this.keys = new TreeMap<>(this::compare);
}
void clear() {
keys.clear();
Arrays.fill(docCounts, 0);
afterValueSet = false;
}
/**
* The current size of the queue.
*/
int size() {
return keys.size();
}
/**
* Whether the queue is full or not.
*/
boolean isFull() {
return keys.size() == maxSize;
}
/**
* Returns a sorted {@link Set} view of the slots contained in this queue.
*/
Set<Integer> getSortedSlot() {
return keys.keySet();
}
/**
* Compares the current candidate with the values in the queue and returns
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer compareCurrent() {
return keys.get(CANDIDATE_SLOT);
}
/**
* Returns the lowest value (exclusive) of the leading source.
*/
Comparable<?> getLowerValueLeadSource() {
return afterValueSet ? arrays[0].getAfter() : null;
}
/**
* Returns the upper value (inclusive) of the leading source.
*/
Comparable<?> getUpperValueLeadSource() throws IOException {
return size() >= maxSize ? arrays[0].toComparable(keys.lastKey()) : null;
}
/**
* Returns the document count in <code>slot</code>.
*/
int getDocCount(int slot) {
return docCounts[slot];
}
/**
* Copies the current value in <code>slot</code>.
*/
private void copyCurrent(int slot) {
for (int i = 0; i < arrays.length; i++) {
arrays[i].copyCurrent(slot);
}
docCounts[slot] = 1;
}
/**
* Compares the values in <code>slot1</code> with <code>slot2</code>.
*/
int compare(int slot1, int slot2) {
for (int i = 0; i < arrays.length; i++) {
int cmp = (slot1 == CANDIDATE_SLOT) ? arrays[i].compareCurrent(slot2) :
arrays[i].compare(slot1, slot2);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Sets the after values for this comparator.
*/
void setAfter(Comparable<?>[] values) {
assert values.length == arrays.length;
afterValueSet = true;
for (int i = 0; i < arrays.length; i++) {
arrays[i].setAfter(values[i]);
}
}
/**
* Compares the after values with the values in <code>slot</code>.
*/
private int compareCurrentWithAfter() {
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compareCurrentWithAfter();
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Builds the {@link CompositeKey} for <code>slot</code>.
*/
CompositeKey toCompositeKey(int slot) throws IOException {
assert slot < maxSize;
Comparable<?>[] values = new Comparable<?>[arrays.length];
for (int i = 0; i < values.length; i++) {
values[i] = arrays[i].toComparable(slot);
}
return new CompositeKey(values);
}
/**
* Creates the collector that will visit the composite buckets of the matching documents.
* The provided collector <code>in</code> is called on each composite bucket.
*/
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector in) throws IOException {
return getLeafCollector(null, context, in);
}
/**
* Creates the collector that will visit the composite buckets of the matching documents.
* If <code>forceLeadSourceValue</code> is not null, the leading source will use this value
* for each document.
* The provided collector <code>in</code> is called on each composite bucket.
*/
LeafBucketCollector getLeafCollector(Comparable<?> forceLeadSourceValue,
LeafReaderContext context, LeafBucketCollector in) throws IOException {
int last = arrays.length - 1;
LeafBucketCollector collector = in;
while (last > 0) {
collector = arrays[last--].getLeafCollector(context, collector);
}
if (forceLeadSourceValue != null) {
collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector);
} else {
collector = arrays[last].getLeafCollector(context, collector);
}
return collector;
}
/**
* Check if the current candidate should be added in the queue.
* @return The target slot of the candidate or -1 is the candidate is not competitive.
*/
int addIfCompetitive() {
// checks if the candidate key is competitive
Integer topSlot = compareCurrent();
if (topSlot != null) {
// this key is already in the top N, skip it
docCounts[topSlot] += 1;
return topSlot;
}
if (afterValueSet && compareCurrentWithAfter() <= 0) {
// this key is greater than the top value collected in the previous round, skip it
return -1;
}
if (keys.size() >= maxSize) {
// the tree map is full, check if the candidate key should be kept
if (compare(CANDIDATE_SLOT, keys.lastKey()) > 0) {
// the candidate key is not competitive, skip it
return -1;
}
}
// the candidate key is competitive
final int newSlot;
if (keys.size() >= maxSize) {
// the tree map is full, we replace the last key with this candidate
int slot = keys.pollLastEntry().getKey();
// and we recycle the deleted slot
newSlot = slot;
} else {
newSlot = keys.size();
assert newSlot < maxSize;
}
// move the candidate key to its new slot
copyCurrent(newSlot);
keys.put(newSlot, newSlot);
return newSlot;
}
@Override
public void close() {
Releasables.close(arrays);
}
}

View File

@ -1,144 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes;
import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals;
final class CompositeValuesComparator {
private final int size;
private final CompositeValuesSource<?, ?>[] arrays;
private boolean topValueSet = false;
/**
*
* @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets.
* @param size The number of composite buckets to keep.
*/
CompositeValuesComparator(IndexReader reader, CompositeValuesSourceConfig[] sources, int size) {
this.size = size;
this.arrays = new CompositeValuesSource<?, ?>[sources.length];
for (int i = 0; i < sources.length; i++) {
final int reverseMul = sources[i].reverseMul();
if (sources[i].valuesSource() instanceof WithOrdinals && reader instanceof DirectoryReader) {
WithOrdinals vs = (WithOrdinals) sources[i].valuesSource();
arrays[i] = CompositeValuesSource.wrapGlobalOrdinals(vs, size, reverseMul);
} else if (sources[i].valuesSource() instanceof Bytes) {
Bytes vs = (Bytes) sources[i].valuesSource();
arrays[i] = CompositeValuesSource.wrapBinary(vs, size, reverseMul);
} else if (sources[i].valuesSource() instanceof Numeric) {
final Numeric vs = (Numeric) sources[i].valuesSource();
if (vs.isFloatingPoint()) {
arrays[i] = CompositeValuesSource.wrapDouble(vs, size, reverseMul);
} else {
arrays[i] = CompositeValuesSource.wrapLong(vs, sources[i].format(), size, reverseMul);
}
}
}
}
/**
* Moves the values in <code>slot1</code> to <code>slot2</code>.
*/
void move(int slot1, int slot2) {
assert slot1 < size && slot2 < size;
for (int i = 0; i < arrays.length; i++) {
arrays[i].move(slot1, slot2);
}
}
/**
* Compares the values in <code>slot1</code> with <code>slot2</code>.
*/
int compare(int slot1, int slot2) {
assert slot1 < size && slot2 < size;
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compare(slot1, slot2);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Returns true if a top value has been set for this comparator.
*/
boolean hasTop() {
return topValueSet;
}
/**
* Sets the top values for this comparator.
*/
void setTop(Comparable<?>[] values) {
assert values.length == arrays.length;
topValueSet = true;
for (int i = 0; i < arrays.length; i++) {
arrays[i].setTop(values[i]);
}
}
/**
* Compares the top values with the values in <code>slot</code>.
*/
int compareTop(int slot) {
assert slot < size;
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compareTop(slot);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
/**
* Builds the {@link CompositeKey} for <code>slot</code>.
*/
CompositeKey toCompositeKey(int slot) throws IOException {
assert slot < size;
Comparable<?>[] values = new Comparable<?>[arrays.length];
for (int i = 0; i < values.length; i++) {
values[i] = arrays[i].toComparable(slot);
}
return new CompositeKey(values);
}
/**
* Gets the {@link LeafBucketCollector} that will record the composite buckets of the visited documents.
*/
CompositeValuesSource.Collector getLeafCollector(LeafReaderContext context, CompositeValuesSource.Collector in) throws IOException {
int last = arrays.length - 1;
CompositeValuesSource.Collector next = arrays[last].getLeafCollector(context, in);
for (int i = last - 1; i >= 0; i--) {
next = arrays[i].getLeafCollector(context, next);
}
return next;
}
}

View File

@ -1,400 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
/**
* A wrapper for {@link ValuesSource} that can record and compare values produced during a collection.
*/
abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparable<T>> {
interface Collector {
void collect(int doc) throws IOException;
}
protected final VS vs;
protected final int size;
protected final int reverseMul;
protected T topValue;
/**
*
* @param vs The original {@link ValuesSource}.
* @param size The number of values to record.
* @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed.
*/
CompositeValuesSource(VS vs, int size, int reverseMul) {
this.vs = vs;
this.size = size;
this.reverseMul = reverseMul;
}
/**
* The type of this source.
*/
abstract String type();
/**
* Moves the value in <code>from</code> in <code>to</code>.
* The value present in <code>to</code> is overridden.
*/
abstract void move(int from, int to);
/**
* Compares the value in <code>from</code> with the value in <code>to</code>.
*/
abstract int compare(int from, int to);
/**
* Compares the value in <code>slot</code> with the top value in this source.
*/
abstract int compareTop(int slot);
/**
* Sets the top value for this source. Values that compares smaller should not be recorded.
*/
abstract void setTop(Comparable<?> value);
/**
* Transforms the value in <code>slot</code> to a {@link Comparable} object.
*/
abstract Comparable<T> toComparable(int slot) throws IOException;
/**
* Gets the {@link LeafCollector} that will record the values of the visited documents.
*/
abstract Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException;
/**
* Creates a {@link CompositeValuesSource} that generates long values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, DocValueFormat format,
int size, int reverseMul) {
return new LongValuesSource(vs, format, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates double values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Double> wrapDouble(ValuesSource.Numeric vs, int size, int reverseMul) {
return new DoubleValuesSource(vs, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates binary values.
*/
static CompositeValuesSource<ValuesSource.Bytes, BytesRef> wrapBinary(ValuesSource.Bytes vs, int size, int reverseMul) {
return new BinaryValuesSource(vs, size, reverseMul);
}
/**
* Creates a {@link CompositeValuesSource} that generates global ordinal values.
*/
static CompositeValuesSource<ValuesSource.Bytes.WithOrdinals, BytesRef> wrapGlobalOrdinals(ValuesSource.Bytes.WithOrdinals vs,
int size,
int reverseMul) {
return new GlobalOrdinalValuesSource(vs, size, reverseMul);
}
/**
* A {@link CompositeValuesSource} for global ordinals
*/
private static class GlobalOrdinalValuesSource extends CompositeValuesSource<ValuesSource.Bytes.WithOrdinals, BytesRef> {
private final long[] values;
private SortedSetDocValues lookup;
private Long topValueGlobalOrd;
private boolean isTopValueInsertionPoint;
GlobalOrdinalValuesSource(ValuesSource.Bytes.WithOrdinals vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new long[size];
}
@Override
String type() {
return "global_ordinals";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Long.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
int cmp = Long.compare(values[slot], topValueGlobalOrd);
if (cmp == 0 && isTopValueInsertionPoint) {
// the top value is missing in this shard, the comparison is against
// the insertion point of the top value so equality means that the value
// is "after" the insertion point.
return reverseMul;
}
return cmp * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof BytesRef) {
topValue = (BytesRef) value;
} else if (value instanceof String) {
topValue = new BytesRef(value.toString());
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
Comparable<BytesRef> toComparable(int slot) throws IOException {
return BytesRef.deepCopyOf(lookup.lookupOrd(values[slot]));
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedSetDocValues dvs = vs.globalOrdinalsValues(context);
if (lookup == null) {
lookup = dvs;
if (topValue != null && topValueGlobalOrd == null) {
topValueGlobalOrd = lookup.lookupTerm(topValue);
if (topValueGlobalOrd < 0) {
// convert negative insert position
topValueGlobalOrd = -topValueGlobalOrd - 1;
isTopValueInsertionPoint = true;
}
}
}
return doc -> {
if (dvs.advanceExact(doc)) {
long ord;
while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) {
values[0] = ord;
next.collect(doc);
}
}
};
}
}
/**
* A {@link CompositeValuesSource} for binary source ({@link BytesRef})
*/
private static class BinaryValuesSource extends CompositeValuesSource<ValuesSource.Bytes, BytesRef> {
private final BytesRef[] values;
BinaryValuesSource(ValuesSource.Bytes vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new BytesRef[size];
}
@Override
String type() {
return "binary";
}
@Override
public void move(int from, int to) {
values[to] = BytesRef.deepCopyOf(values[from]);
}
@Override
public int compare(int from, int to) {
return values[from].compareTo(values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return values[slot].compareTo(topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value.getClass() == BytesRef.class) {
topValue = (BytesRef) value;
} else if (value.getClass() == String.class) {
topValue = new BytesRef((String) value);
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
Comparable<BytesRef> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedBinaryDocValues dvs = vs.bytesValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
/**
* A {@link CompositeValuesSource} for longs.
*/
private static class LongValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Long> {
private final long[] values;
// handles "format" for date histogram source
private final DocValueFormat format;
LongValuesSource(ValuesSource.Numeric vs, DocValueFormat format, int size, int reverseMul) {
super(vs, size, reverseMul);
this.format = format;
this.values = new long[size];
}
@Override
String type() {
return "long";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Long.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return Long.compare(values[slot], topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof Number) {
topValue = ((Number) value).longValue();
} else {
// for date histogram source with "format", the after value is formatted
// as a string so we need to retrieve the original value in milliseconds.
topValue = format.parseLong(value.toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}
@Override
Comparable<Long> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedNumericDocValues dvs = vs.longValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
/**
* A {@link CompositeValuesSource} for doubles.
*/
private static class DoubleValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Double> {
private final double[] values;
DoubleValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) {
super(vs, size, reverseMul);
this.values = new double[size];
}
@Override
String type() {
return "long";
}
@Override
void move(int from, int to) {
values[to] = values[from];
}
@Override
int compare(int from, int to) {
return Double.compare(values[from], values[to]) * reverseMul;
}
@Override
int compareTop(int slot) {
return Double.compare(values[slot], topValue) * reverseMul;
}
@Override
void setTop(Comparable<?> value) {
if (value instanceof Number) {
topValue = ((Number) value).doubleValue();
} else {
topValue = Double.parseDouble(value.toString());
}
}
@Override
Comparable<Double> toComparable(int slot) {
return values[slot];
}
@Override
Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException {
final SortedNumericDoubleValues dvs = vs.doubleValues(context);
return doc -> {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
values[0] = dvs.nextValue();
next.collect(doc);
}
}
};
}
}
}

View File

@ -19,19 +19,13 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.SortField;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@ -291,46 +285,18 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
*
* @param context The search context for this source.
* @param config The {@link ValuesSourceConfig} for this source.
* @param pos The position of this source in the composite key.
* @param numPos The total number of positions in the composite key.
* @param sortField The {@link SortField} of the index sort at this position or null if not present.
*/
protected abstract CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException;
protected abstract CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig<?> config) throws IOException;
public final CompositeValuesSourceConfig build(SearchContext context, int pos, int numPos, SortField sortField) throws IOException {
public final CompositeValuesSourceConfig build(SearchContext context) throws IOException {
ValuesSourceConfig<?> config = ValuesSourceConfig.resolve(context.getQueryShardContext(),
valueType, field, script, missing, null, format);
return innerBuild(context, config, pos, numPos, sortField);
if (config.unmapped() && field != null && config.missing() == null) {
// this source cannot produce any values so we refuse to build
// since composite buckets are not created on null values
throw new QueryShardException(context.getQueryShardContext(),
"failed to find field [" + field + "] and [missing] is not provided");
}
protected boolean checkCanEarlyTerminate(IndexReader reader,
String fieldName,
boolean reverse,
SortField sortField) throws IOException {
return sortField.getField().equals(fieldName) &&
sortField.getReverse() == reverse &&
isSingleValued(reader, sortField);
}
private static boolean isSingleValued(IndexReader reader, SortField field) throws IOException {
SortField.Type type = IndexSortConfig.getSortFieldType(field);
for (LeafReaderContext context : reader.leaves()) {
if (type == SortField.Type.STRING) {
final SortedSetDocValues values = DocValues.getSortedSet(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
} else {
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
}
}
return true;
return innerBuild(context, config);
}
}

View File

@ -19,22 +19,25 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
class CompositeValuesSourceConfig {
private final String name;
@Nullable
private final MappedFieldType fieldType;
private final ValuesSource vs;
private final DocValueFormat format;
private final int reverseMul;
private final boolean canEarlyTerminate;
CompositeValuesSourceConfig(String name, ValuesSource vs, DocValueFormat format, SortOrder order, boolean canEarlyTerminate) {
CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format, SortOrder order) {
this.name = name;
this.fieldType = fieldType;
this.vs = vs;
this.format = format;
this.canEarlyTerminate = canEarlyTerminate;
this.reverseMul = order == SortOrder.ASC ? 1 : -1;
}
@ -45,6 +48,13 @@ class CompositeValuesSourceConfig {
return name;
}
/**
* Returns the {@link MappedFieldType} for this config.
*/
MappedFieldType fieldType() {
return fieldType;
}
/**
* Returns the {@link ValuesSource} for this configuration.
*/
@ -67,11 +77,4 @@ class CompositeValuesSourceConfig {
assert reverseMul == -1 || reverseMul == 1;
return reverseMul;
}
/**
* Returns whether this {@link ValuesSource} is used to sort the index.
*/
boolean canEarlyTerminate() {
return canEarlyTerminate;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -29,9 +28,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.FieldContext;
@ -39,7 +38,6 @@ import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -217,11 +215,7 @@ public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuild
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig<?> config) throws IOException {
Rounding rounding = createRounding();
ValuesSource orig = config.toValuesSource(context.getQueryShardContext());
if (orig == null) {
@ -230,19 +224,10 @@ public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuild
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
RoundingValuesSource vs = new RoundingValuesSource(numeric, rounding);
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null &&
pos == numPos-1 &&
fieldContext != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
// dates are returned as timestamp in milliseconds-since-the-epoch unless a specific date format
// is specified in the builder.
final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
return new CompositeValuesSourceConfig(name, vs, docValueFormat,
order(), canEarlyTerminate);
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order());
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
/**
* A {@link SingleDimensionValuesSource} for doubles.
*/
class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
private final CheckedFunction<LeafReaderContext, SortedNumericDoubleValues, IOException> docValuesFunc;
private final DoubleArray values;
private double currentValue;
DoubleValuesSource(BigArrays bigArrays, MappedFieldType fieldType,
CheckedFunction<LeafReaderContext, SortedNumericDoubleValues, IOException> docValuesFunc,
int size, int reverseMul) {
super(fieldType, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.values = bigArrays.newDoubleArray(size, false);
}
@Override
void copyCurrent(int slot) {
values.set(slot, currentValue);
}
@Override
int compare(int from, int to) {
return compareValues(values.get(from), values.get(to));
}
@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values.get(slot));
}
@Override
int compareCurrentWithAfter() {
return compareValues(currentValue, afterValue);
}
private int compareValues(double v1, double v2) {
return Double.compare(v1, v2) * reverseMul;
}
@Override
void setAfter(Comparable<?> value) {
if (value instanceof Number) {
afterValue = ((Number) value).doubleValue();
} else {
afterValue = Double.parseDouble(value.toString());
}
}
@Override
Double toComparable(int slot) {
return values.get(slot);
}
@Override
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException {
final SortedNumericDoubleValues dvs = docValuesFunc.apply(context);
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
currentValue = dvs.nextValue();
next.collect(doc, bucket);
}
}
}
};
}
@Override
LeafBucketCollector getLeafCollector(Comparable<?> value, LeafReaderContext context, LeafBucketCollector next) {
if (value.getClass() != Double.class) {
throw new IllegalArgumentException("Expected Double, got " + value.getClass());
}
currentValue = (Double) value;
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
next.collect(doc, bucket);
}
};
}
@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
return null;
}
@Override
public void close() {
Releasables.close(values);
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
/**
* A {@link SingleDimensionValuesSource} for global ordinals.
*/
class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
private final CheckedFunction<LeafReaderContext, SortedSetDocValues, IOException> docValuesFunc;
private final LongArray values;
private SortedSetDocValues lookup;
private long currentValue;
private Long afterValueGlobalOrd;
private boolean isTopValueInsertionPoint;
private long lastLookupOrd = -1;
private BytesRef lastLookupValue;
GlobalOrdinalValuesSource(BigArrays bigArrays,
MappedFieldType type, CheckedFunction<LeafReaderContext, SortedSetDocValues, IOException> docValuesFunc,
int size, int reverseMul) {
super(type, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.values = bigArrays.newLongArray(size, false);
}
@Override
void copyCurrent(int slot) {
values.set(slot, currentValue);
}
@Override
int compare(int from, int to) {
return Long.compare(values.get(from), values.get(to)) * reverseMul;
}
@Override
int compareCurrent(int slot) {
return Long.compare(currentValue, values.get(slot)) * reverseMul;
}
@Override
int compareCurrentWithAfter() {
int cmp = Long.compare(currentValue, afterValueGlobalOrd);
if (cmp == 0 && isTopValueInsertionPoint) {
// the top value is missing in this shard, the comparison is against
// the insertion point of the top value so equality means that the value
// is "after" the insertion point.
return reverseMul;
}
return cmp * reverseMul;
}
@Override
void setAfter(Comparable<?> value) {
if (value instanceof BytesRef) {
afterValue = (BytesRef) value;
} else if (value instanceof String) {
afterValue = new BytesRef(value.toString());
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}
@Override
BytesRef toComparable(int slot) throws IOException {
long globalOrd = values.get(slot);
if (globalOrd == lastLookupOrd) {
return lastLookupValue;
} else {
lastLookupOrd= globalOrd;
lastLookupValue = BytesRef.deepCopyOf(lookup.lookupOrd(values.get(slot)));
return lastLookupValue;
}
}
@Override
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException {
final SortedSetDocValues dvs = docValuesFunc.apply(context);
if (lookup == null) {
initLookup(dvs);
}
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (dvs.advanceExact(doc)) {
long ord;
while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) {
currentValue = ord;
next.collect(doc, bucket);
}
}
}
};
}
@Override
LeafBucketCollector getLeafCollector(Comparable<?> value, LeafReaderContext context, LeafBucketCollector next) throws IOException {
if (value.getClass() != BytesRef.class) {
throw new IllegalArgumentException("Expected BytesRef, got " + value.getClass());
}
BytesRef term = (BytesRef) value;
final SortedSetDocValues dvs = docValuesFunc.apply(context);
if (lookup == null) {
initLookup(dvs);
}
return new LeafBucketCollector() {
boolean currentValueIsSet = false;
@Override
public void collect(int doc, long bucket) throws IOException {
if (!currentValueIsSet) {
if (dvs.advanceExact(doc)) {
long ord;
while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) {
if (term.equals(lookup.lookupOrd(ord))) {
currentValueIsSet = true;
currentValue = ord;
break;
}
}
}
}
assert currentValueIsSet;
next.collect(doc, bucket);
}
};
}
@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return null;
}
return new TermsSortedDocsProducer(fieldType.name());
}
@Override
public void close() {
Releasables.close(values);
}
private void initLookup(SortedSetDocValues dvs) throws IOException {
lookup = dvs;
if (afterValue != null && afterValueGlobalOrd == null) {
afterValueGlobalOrd = lookup.lookupTerm(afterValue);
if (afterValueGlobalOrd < 0) {
// convert negative insert position
afterValueGlobalOrd = -afterValueGlobalOrd - 1;
isTopValueInsertionPoint = true;
}
}
}
}

View File

@ -19,19 +19,17 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
@ -108,27 +106,16 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig<?> config) throws IOException {
ValuesSource orig = config.toValuesSource(context.getQueryShardContext());
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
}
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null &&
pos == numPos-1 &&
fieldContext != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate);
final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order());
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;
/**
* A {@link SingleDimensionValuesSource} for longs.
*/
class LongValuesSource extends SingleDimensionValuesSource<Long> {
private final CheckedFunction<LeafReaderContext, SortedNumericDocValues, IOException> docValuesFunc;
private final LongUnaryOperator rounding;
// handles "format" for date histogram source
private final DocValueFormat format;
private final LongArray values;
private long currentValue;
LongValuesSource(BigArrays bigArrays, MappedFieldType fieldType,
CheckedFunction<LeafReaderContext, SortedNumericDocValues, IOException> docValuesFunc,
LongUnaryOperator rounding, DocValueFormat format, int size, int reverseMul) {
super(fieldType, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.rounding = rounding;
this.format = format;
this.values = bigArrays.newLongArray(size, false);
}
@Override
void copyCurrent(int slot) {
values.set(slot, currentValue);
}
@Override
int compare(int from, int to) {
return compareValues(values.get(from), values.get(to));
}
@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values.get(slot));
}
@Override
int compareCurrentWithAfter() {
return compareValues(currentValue, afterValue);
}
private int compareValues(long v1, long v2) {
return Long.compare(v1, v2) * reverseMul;
}
@Override
void setAfter(Comparable<?> value) {
if (value instanceof Number) {
afterValue = ((Number) value).longValue();
} else {
// for date histogram source with "format", the after value is formatted
// as a string so we need to retrieve the original value in milliseconds.
afterValue = format.parseLong(value.toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}
@Override
Long toComparable(int slot) {
return values.get(slot);
}
@Override
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException {
final SortedNumericDocValues dvs = docValuesFunc.apply(context);
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
currentValue = dvs.nextValue();
next.collect(doc, bucket);
}
}
}
};
}
@Override
LeafBucketCollector getLeafCollector(Comparable<?> value, LeafReaderContext context, LeafBucketCollector next) {
if (value.getClass() != Long.class) {
throw new IllegalArgumentException("Expected Long, got " + value.getClass());
}
currentValue = (Long) value;
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
next.collect(doc, bucket);
}
};
}
@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
(query != null &&
query.getClass() != MatchAllDocsQuery.class &&
// if the query is a range query over the same field
(query instanceof PointRangeQuery && fieldType.name().equals((((PointRangeQuery) query).getField()))) == false)) {
return null;
}
final byte[] lowerPoint;
final byte[] upperPoint;
if (query instanceof PointRangeQuery) {
final PointRangeQuery rangeQuery = (PointRangeQuery) query;
lowerPoint = rangeQuery.getLowerPoint();
upperPoint = rangeQuery.getUpperPoint();
} else {
lowerPoint = null;
upperPoint = null;
}
if (fieldType instanceof NumberFieldMapper.NumberFieldType) {
NumberFieldMapper.NumberFieldType ft = (NumberFieldMapper.NumberFieldType) fieldType;
final ToLongFunction<byte[]> toBucketFunction;
switch (ft.typeName()) {
case "long":
toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0));
break;
case "int":
case "short":
case "byte":
toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0));
break;
default:
return null;
}
return new PointsSortedDocsProducer(fieldType.name(), toBucketFunction, lowerPoint, upperPoint);
} else if (fieldType instanceof DateFieldMapper.DateFieldType) {
final ToLongFunction<byte[]> toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0));
return new PointsSortedDocsProducer(fieldType.name(), toBucketFunction, lowerPoint, upperPoint);
} else {
return null;
}
}
@Override
public void close() {
Releasables.close(values);
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.DocIdSetBuilder;
import org.apache.lucene.util.StringHelper;
import java.io.IOException;
import java.util.function.ToLongFunction;
/**
* A {@link SortedDocsProducer} that can sort documents based on numerics indexed in the provided field.
*/
class PointsSortedDocsProducer extends SortedDocsProducer {
private final ToLongFunction<byte[]> bucketFunction;
private final byte[] lowerPointQuery;
private final byte[] upperPointQuery;
PointsSortedDocsProducer(String field, ToLongFunction<byte[]> bucketFunction, byte[] lowerPointQuery, byte[] upperPointQuery) {
super(field);
this.bucketFunction = bucketFunction;
this.lowerPointQuery = lowerPointQuery;
this.upperPointQuery = upperPointQuery;
}
@Override
DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue,
LeafReaderContext context, boolean fillDocIdSet) throws IOException {
final PointValues values = context.reader().getPointValues(field);
if (values == null) {
// no value for the field
return DocIdSet.EMPTY;
}
long lowerBucket = Long.MIN_VALUE;
Comparable<?> lowerValue = queue.getLowerValueLeadSource();
if (lowerValue != null) {
if (lowerValue.getClass() != Long.class) {
throw new IllegalStateException("expected Long, got " + lowerValue.getClass());
}
lowerBucket = (Long) lowerValue;
}
long upperBucket = Long.MAX_VALUE;
Comparable<?> upperValue = queue.getUpperValueLeadSource();
if (upperValue != null) {
if (upperValue.getClass() != Long.class) {
throw new IllegalStateException("expected Long, got " + upperValue.getClass());
}
upperBucket = (Long) upperValue;
}
DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null;
Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket);
try {
values.intersect(visitor);
visitor.flush();
} catch (CollectionTerminatedException exc) {}
return fillDocIdSet ? builder.build() : DocIdSet.EMPTY;
}
private class Visitor implements PointValues.IntersectVisitor {
final LeafReaderContext context;
final CompositeValuesCollectorQueue queue;
final DocIdSetBuilder builder;
final int maxDoc;
final int bytesPerDim;
final long lowerBucket;
final long upperBucket;
DocIdSetBuilder bucketDocsBuilder;
DocIdSetBuilder.BulkAdder adder;
int remaining;
long lastBucket;
boolean first = true;
Visitor(LeafReaderContext context, CompositeValuesCollectorQueue queue, DocIdSetBuilder builder,
int bytesPerDim, long lowerBucket, long upperBucket) {
this.context = context;
this.maxDoc = context.reader().maxDoc();
this.queue = queue;
this.builder = builder;
this.lowerBucket = lowerBucket;
this.upperBucket = upperBucket;
this.bucketDocsBuilder = new DocIdSetBuilder(maxDoc);
this.bytesPerDim = bytesPerDim;
}
@Override
public void grow(int count) {
remaining = count;
adder = bucketDocsBuilder.grow(count);
}
@Override
public void visit(int docID) throws IOException {
throw new IllegalStateException("should never be called");
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
if (compare(packedValue, packedValue) != PointValues.Relation.CELL_CROSSES_QUERY) {
remaining --;
return;
}
long bucket = bucketFunction.applyAsLong(packedValue);
if (first == false && bucket != lastBucket) {
final DocIdSet docIdSet = bucketDocsBuilder.build();
if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) &&
// lower bucket is inclusive
lowerBucket != lastBucket) {
// this bucket does not have any competitive composite buckets,
// we can early terminate the collection because the remaining buckets are guaranteed
// to be greater than this bucket.
throw new CollectionTerminatedException();
}
bucketDocsBuilder = new DocIdSetBuilder(maxDoc);
assert remaining > 0;
adder = bucketDocsBuilder.grow(remaining);
}
lastBucket = bucket;
first = false;
adder.add(docID);
remaining --;
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if ((upperPointQuery != null && StringHelper.compare(bytesPerDim, minPackedValue, 0, upperPointQuery, 0) > 0) ||
(lowerPointQuery != null && StringHelper.compare(bytesPerDim, maxPackedValue, 0, lowerPointQuery, 0) < 0)) {
// does not match the query
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
// check the current bounds
if (lowerBucket != Long.MIN_VALUE) {
long maxBucket = bucketFunction.applyAsLong(maxPackedValue);
if (maxBucket < lowerBucket) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
if (upperBucket != Long.MAX_VALUE) {
long minBucket = bucketFunction.applyAsLong(minPackedValue);
if (minBucket > upperBucket) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
return PointValues.Relation.CELL_CROSSES_QUERY;
}
public void flush() throws IOException {
if (first == false) {
final DocIdSet docIdSet = bucketDocsBuilder.build();
processBucket(queue, context, docIdSet.iterator(), lastBucket, builder);
bucketDocsBuilder = null;
}
}
}
}

View File

@ -51,13 +51,17 @@ class RoundingValuesSource extends ValuesSource.Numeric {
return false;
}
public long round(long value) {
return rounding.round(value);
}
@Override
public SortedNumericDocValues longValues(LeafReaderContext context) throws IOException {
SortedNumericDocValues values = vs.longValues(context);
return new SortedNumericDocValues() {
@Override
public long nextValue() throws IOException {
return rounding.round(values.nextValue());
return round(values.nextValue());
}
@Override

View File

@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
/**
* A source that can record and compare values of similar type.
*/
abstract class SingleDimensionValuesSource<T extends Comparable<T>> implements Releasable {
protected final int size;
protected final int reverseMul;
protected T afterValue;
@Nullable
protected MappedFieldType fieldType;
/**
* Ctr
*
* @param fieldType The fieldType associated with the source.
* @param size The number of values to record.
* @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed.
*/
SingleDimensionValuesSource(@Nullable MappedFieldType fieldType, int size, int reverseMul) {
this.fieldType = fieldType;
this.size = size;
this.reverseMul = reverseMul;
this.afterValue = null;
}
/**
* The current value is filled by a {@link LeafBucketCollector} that visits all the
* values of each document. This method saves this current value in a slot and should only be used
* in the context of a collection.
* See {@link this#getLeafCollector}.
*/
abstract void copyCurrent(int slot);
/**
* Compares the value in <code>from</code> with the value in <code>to</code>.
*/
abstract int compare(int from, int to);
/**
* The current value is filled by a {@link LeafBucketCollector} that visits all the
* values of each document. This method compares this current value with the value present in
* the provided slot and should only be used in the context of a collection.
* See {@link this#getLeafCollector}.
*/
abstract int compareCurrent(int slot);
/**
* The current value is filled by a {@link LeafBucketCollector} that visits all the
* values of each document. This method compares this current value with the after value
* set on this source and should only be used in the context of a collection.
* See {@link this#getLeafCollector}.
*/
abstract int compareCurrentWithAfter();
/**
* Sets the after value for this source. Values that compares smaller are filtered.
*/
abstract void setAfter(Comparable<?> value);
/**
* Returns the after value set for this source.
*/
T getAfter() {
return afterValue;
}
/**
* Transforms the value in <code>slot</code> to a {@link Comparable} object.
*/
abstract T toComparable(int slot) throws IOException;
/**
* Creates a {@link LeafBucketCollector} that extracts all values from a document and invokes
* {@link LeafBucketCollector#collect} on the provided <code>next</code> collector for each of them.
* The current value of this source is set on each call and can be accessed by <code>next</code> via
* the {@link this#copyCurrent(int)} and {@link this#compareCurrent(int)} methods. Note that these methods
* are only valid when invoked from the {@link LeafBucketCollector} created in this source.
*/
abstract LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException;
/**
* Creates a {@link LeafBucketCollector} that sets the current value for each document to the provided
* <code>value</code> and invokes {@link LeafBucketCollector#collect} on the provided <code>next</code> collector.
*/
abstract LeafBucketCollector getLeafCollector(Comparable<?> value,
LeafReaderContext context, LeafBucketCollector next) throws IOException;
/**
* Returns a {@link SortedDocsProducer} or null if this source cannot produce sorted docs.
*/
abstract SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query);
/**
* Returns true if a {@link SortedDocsProducer} should be used to optimize the execution.
*/
protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldType fieldType) {
if (fieldType == null ||
fieldType.indexOptions() == IndexOptions.NONE ||
// inverse of the natural order
reverseMul == -1) {
return false;
}
if (reader.hasDeletions() &&
(reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) {
// do not use the index if it has more than 50% of deleted docs
return false;
}
return true;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.DocIdSetBuilder;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
/**
* A producer that visits composite buckets in the order of the value indexed in the leading source of the composite
* definition. It can be used to control which documents should be collected to produce the top composite buckets
* without visiting all documents in an index.
*/
abstract class SortedDocsProducer {
protected final String field;
SortedDocsProducer(String field) {
this.field = field;
}
/**
* Visits all non-deleted documents in <code>iterator</code> and fills the provided <code>queue</code>
* with the top composite buckets extracted from the collection.
* Documents that contain a top composite bucket are added in the provided <code>builder</code> if it is not null.
*
* Returns true if the queue is full and the current <code>leadSourceBucket</code> did not produce any competitive
* composite buckets.
*/
protected boolean processBucket(CompositeValuesCollectorQueue queue, LeafReaderContext context, DocIdSetIterator iterator,
Comparable<?> leadSourceBucket, @Nullable DocIdSetBuilder builder) throws IOException {
final int[] topCompositeCollected = new int[1];
final boolean[] hasCollected = new boolean[1];
final LeafBucketCollector queueCollector = new LeafBucketCollector() {
int lastDoc = -1;
// we need to add the matching document in the builder
// so we build a bulk adder from the approximate cost of the iterator
// and rebuild the adder during the collection if needed
int remainingBits = (int) Math.min(iterator.cost(), Integer.MAX_VALUE);
DocIdSetBuilder.BulkAdder adder = builder == null ? null : builder.grow(remainingBits);
@Override
public void collect(int doc, long bucket) throws IOException {
hasCollected[0] = true;
int slot = queue.addIfCompetitive();
if (slot != -1) {
topCompositeCollected[0]++;
if (adder != null && doc != lastDoc) {
if (remainingBits == 0) {
// the cost approximation was lower than the real size, we need to grow the adder
// by some numbers (128) to ensure that we can add the extra documents
adder = builder.grow(128);
remainingBits = 128;
}
adder.add(doc);
remainingBits --;
lastDoc = doc;
}
}
}
};
final Bits liveDocs = context.reader().getLiveDocs();
final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector);
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (liveDocs == null || liveDocs.get(iterator.docID())) {
collector.collect(iterator.docID());
}
}
if (queue.isFull() &&
hasCollected[0] &&
topCompositeCollected[0] == 0) {
return true;
}
return false;
}
/**
* Populates the queue with the composite buckets present in the <code>context</code>.
* Returns the {@link DocIdSet} of the documents that contain a top composite bucket in this leaf or
* {@link DocIdSet#EMPTY} if <code>fillDocIdSet</code> is false.
*/
abstract DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue,
LeafReaderContext context, boolean fillDocIdSet) throws IOException;
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.DocIdSetBuilder;
import java.io.IOException;
/**
* A {@link SortedDocsProducer} that can sort documents based on terms indexed in the provided field.
*/
class TermsSortedDocsProducer extends SortedDocsProducer {
TermsSortedDocsProducer(String field) {
super(field);
}
@Override
DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue,
LeafReaderContext context, boolean fillDocIdSet) throws IOException {
final Terms terms = context.reader().terms(field);
if (terms == null) {
// no value for the field
return DocIdSet.EMPTY;
}
BytesRef lowerValue = (BytesRef) queue.getLowerValueLeadSource();
BytesRef upperValue = (BytesRef) queue.getUpperValueLeadSource();
final TermsEnum te = terms.iterator();
if (lowerValue != null) {
if (te.seekCeil(lowerValue) == TermsEnum.SeekStatus.END) {
return DocIdSet.EMPTY ;
}
} else {
if (te.next() == null) {
return DocIdSet.EMPTY;
}
}
DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), terms) : null;
PostingsEnum reuse = null;
boolean first = true;
do {
if (upperValue != null && upperValue.compareTo(te.term()) < 0) {
break;
}
reuse = te.postings(reuse, PostingsEnum.NONE);
if (processBucket(queue, context, reuse, te.term(), builder) && !first) {
// this bucket does not have any competitive composite buckets,
// we can early terminate the collection because the remaining buckets are guaranteed
// to be greater than this bucket.
break;
}
first = false;
} while (te.next() != null);
return fillDocIdSet ? builder.build() : DocIdSet.EMPTY;
}
}

View File

@ -19,18 +19,16 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
@ -80,21 +78,12 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
}
@Override
protected CompositeValuesSourceConfig innerBuild(SearchContext context,
ValuesSourceConfig<?> config,
int pos,
int numPos,
SortField sortField) throws IOException {
protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig<?> config) throws IOException {
ValuesSource vs = config.toValuesSource(context.getQueryShardContext());
if (vs == null) {
vs = ValuesSource.Numeric.EMPTY;
}
boolean canEarlyTerminate = false;
final FieldContext fieldContext = config.fieldContext();
if (sortField != null && config.fieldContext() != null) {
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate);
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order());
}
}

View File

@ -19,42 +19,44 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.IndexSettingsModule;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
@ -64,12 +66,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class CompositeAggregatorTests extends AggregatorTestCase {
@ -79,7 +87,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
FIELD_TYPES = new MappedFieldType[5];
FIELD_TYPES = new MappedFieldType[6];
FIELD_TYPES[0] = new KeywordFieldMapper.KeywordFieldType();
FIELD_TYPES[0].setName("keyword");
FIELD_TYPES[0].setHasDocValues(true);
@ -101,6 +109,10 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
FIELD_TYPES[4] = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
FIELD_TYPES[4].setName("price");
FIELD_TYPES[4].setHasDocValues(true);
FIELD_TYPES[5] = new KeywordFieldMapper.KeywordFieldType();
FIELD_TYPES[5].setName("terms");
FIELD_TYPES[5].setHasDocValues(true);
}
@Override
@ -110,6 +122,19 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
FIELD_TYPES = null;
}
public void testUnmappedField() throws Exception {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10))
.field("unknown");
CompositeAggregationBuilder builder = new CompositeAggregationBuilder("test", Collections.singletonList(terms));
IndexSearcher searcher = new IndexSearcher(new MultiReader());
QueryShardException exc =
expectThrows(QueryShardException.class, () -> createAggregatorFactory(builder, searcher));
assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing] is not provided"));
// should work when missing is provided
terms.missing("missing");
createAggregatorFactory(builder, searcher);
}
public void testWithKeyword() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
@ -121,8 +146,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("keyword", "c")
)
);
final Sort sort = new Sort(new SortedSetSortField("keyword", false));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -139,7 +163,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -168,8 +192,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("keyword", "delta")
)
);
final Sort sort = new Sort(new SortedSetSortField("keyword", false));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -188,7 +211,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -206,7 +229,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword").order(SortOrder.DESC);
@ -236,8 +259,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("keyword", "c")
)
);
final Sort sort = new Sort(new SortedSetSortField("keyword", true));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword")
@ -255,7 +277,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword")
@ -285,7 +307,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -307,7 +329,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -339,7 +361,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword")
@ -362,7 +384,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword")
@ -394,11 +416,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 100L)
)
);
final Sort sort = new Sort(
new SortedSetSortField("keyword", false),
new SortedNumericSortField("long", SortField.Type.LONG)
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
@ -419,7 +437,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new TermsValuesSourceBuilder("keyword").field("keyword"),
@ -451,11 +469,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 100L)
)
);
final Sort sort = new Sort(
new SortedSetSortField("keyword", true),
new SortedNumericSortField("long", SortField.Type.LONG, true)
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -477,7 +491,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -510,7 +524,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -543,7 +557,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -580,11 +594,10 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("keyword", Arrays.asList("d", "d"), "long", Arrays.asList(10L, 100L, 1000L)),
createDocument("keyword", "c"),
createDocument("long", 100L)
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -619,7 +632,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -653,7 +666,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -688,7 +701,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -723,7 +736,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -751,8 +764,12 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
LongPoint.newRangeQuery(
"date",
asLong("2016-09-20T09:00:34"),
asLong("2017-10-20T06:09:24")
)), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -771,7 +788,12 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
LongPoint.newRangeQuery(
"date",
asLong("2016-09-20T11:34:00"),
asLong("2017-10-20T06:09:24")
)), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -802,8 +824,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -823,7 +844,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -845,7 +866,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
public void testThatDateHistogramFailsFormatAfter() throws IOException {
ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,
() -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(),
() -> testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), Collections.emptyList(),
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -860,7 +881,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key"));
exc = expectThrows(ElasticsearchParseException.class,
() -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(),
() -> testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), Collections.emptyList(),
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -887,8 +908,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -908,7 +928,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
@ -940,7 +960,12 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
LongPoint.newRangeQuery(
"date",
asLong("2016-09-20T09:00:34"),
asLong("2017-10-20T06:09:24")
)), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -971,7 +996,12 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
LongPoint.newRangeQuery(
"date",
asLong("2016-09-20T11:34:00"),
asLong("2017-10-20T06:09:24")
)), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1007,7 +1037,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("price")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1035,7 +1065,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("price")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1075,7 +1105,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("double")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1105,7 +1135,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("double")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1138,7 +1168,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("long", 4L)
)
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1167,7 +1197,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), null, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
@ -1202,8 +1232,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
createDocument("keyword", "c")
)
);
final Sort sort = new Sort(new SortedSetSortField("keyword", false));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -1232,7 +1261,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
@ -1257,36 +1286,174 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
private void testSearchCase(Query query, Sort sort,
public void testWithTermsSubAggExecutionMode() throws Exception {
// test with no bucket
for (Aggregator.SubAggCollectionMode mode : Aggregator.SubAggCollectionMode.values()) {
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")),
Collections.singletonList(createDocument()),
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
return new CompositeAggregationBuilder("name", Collections.singletonList(terms))
.subAggregation(
new TermsAggregationBuilder("terms", ValueType.STRING)
.field("terms")
.collectMode(mode)
.subAggregation(new MaxAggregationBuilder("max").field("long"))
);
}, (result) -> {
assertEquals(0, result.getBuckets().size());
}
);
}
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
createDocument("keyword", "a", "terms", "a", "long", 50L),
createDocument("keyword", "c", "terms", "d", "long", 78L),
createDocument("keyword", "a", "terms", "w", "long", 78L),
createDocument("keyword", "d", "terms", "y", "long", 76L),
createDocument("keyword", "c", "terms", "y", "long", 70L)
)
);
for (Aggregator.SubAggCollectionMode mode : Aggregator.SubAggCollectionMode.values()) {
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
() -> {
TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
.field("keyword");
return new CompositeAggregationBuilder("name", Collections.singletonList(terms))
.subAggregation(
new TermsAggregationBuilder("terms", ValueType.STRING)
.field("terms")
.collectMode(mode)
.subAggregation(new MaxAggregationBuilder("max").field("long"))
);
}, (result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
StringTerms subTerms = result.getBuckets().get(0).getAggregations().get("terms");
assertEquals(2, subTerms.getBuckets().size());
assertEquals("a", subTerms.getBuckets().get(0).getKeyAsString());
assertEquals("w", subTerms.getBuckets().get(1).getKeyAsString());
InternalMax max = subTerms.getBuckets().get(0).getAggregations().get("max");
assertEquals(50L, (long) max.getValue());
max = subTerms.getBuckets().get(1).getAggregations().get("max");
assertEquals(78L, (long) max.getValue());
assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
subTerms = result.getBuckets().get(1).getAggregations().get("terms");
assertEquals(2, subTerms.getBuckets().size());
assertEquals("d", subTerms.getBuckets().get(0).getKeyAsString());
assertEquals("y", subTerms.getBuckets().get(1).getKeyAsString());
max = subTerms.getBuckets().get(0).getAggregations().get("max");
assertEquals(78L, (long) max.getValue());
max = subTerms.getBuckets().get(1).getAggregations().get("max");
assertEquals(70L, (long) max.getValue());
assertEquals("{keyword=d}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
subTerms = result.getBuckets().get(2).getAggregations().get("terms");
assertEquals(1, subTerms.getBuckets().size());
assertEquals("y", subTerms.getBuckets().get(0).getKeyAsString());
max = subTerms.getBuckets().get(0).getAggregations().get("max");
assertEquals(76L, (long) max.getValue());
}
);
}
}
public void testRandomStrings() throws IOException {
testRandomTerms("keyword", () -> randomAlphaOfLengthBetween(5, 50), (v) -> (String) v);
}
public void testRandomLongs() throws IOException {
testRandomTerms("long", () -> randomLong(), (v) -> (long) v);
}
public void testRandomInts() throws IOException {
testRandomTerms("price", () -> randomInt(), (v) -> ((Number) v).intValue());
}
private <T extends Comparable<T>, V extends Comparable<T>> void testRandomTerms(String field,
Supplier<T> randomSupplier,
Function<Object, V> transformKey) throws IOException {
int numTerms = randomIntBetween(10, 500);
List<T> terms = new ArrayList<>();
for (int i = 0; i < numTerms; i++) {
terms.add(randomSupplier.get());
}
int numDocs = randomIntBetween(100, 200);
List<Map<String, List<Object>>> dataset = new ArrayList<>();
Set<T> valuesSet = new HashSet<>();
Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<> ();
for (int i = 0; i < numDocs; i++) {
int numValues = randomIntBetween(1, 5);
Set<Object> values = new HashSet<>();
for (int j = 0; j < numValues; j++) {
int rand = randomIntBetween(0, terms.size() - 1);
if (values.add(terms.get(rand))) {
AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand),
(k) -> new AtomicLong(0));
count.incrementAndGet();
valuesSet.add(terms.get(rand));
}
}
dataset.add(Collections.singletonMap(field, new ArrayList<>(values)));
}
List<T> expected = new ArrayList<>(valuesSet);
Collections.sort(expected);
List<Comparable<T>> seen = new ArrayList<>();
AtomicBoolean finish = new AtomicBoolean(false);
int size = randomIntBetween(1, expected.size());
while (finish.get() == false) {
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset,
() -> {
Map<String, Object> afterKey = null;
if (seen.size() > 0) {
afterKey = Collections.singletonMap(field, seen.get(seen.size()-1));
}
TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field);
return new CompositeAggregationBuilder("name", Collections.singletonList(source))
.subAggregation(new TopHitsAggregationBuilder("top_hits").storedField("_none_"))
.aggregateAfter(afterKey)
.size(size);
}, (result) -> {
if (result.getBuckets().size() == 0) {
finish.set(true);
}
for (InternalComposite.InternalBucket bucket : result.getBuckets()) {
V term = transformKey.apply(bucket.getKey().get(field));
seen.add(term);
assertThat(bucket.getDocCount(), equalTo(expectedDocCounts.get(term).get()));
}
});
}
assertEquals(expected, seen);
}
private void testSearchCase(List<Query> queries,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
Consumer<InternalComposite> verify) throws IOException {
executeTestCase(false, null, query, dataset, create, verify);
executeTestCase(true, null, query, dataset, create, verify);
if (sort != null) {
executeTestCase(false, sort, query, dataset, create, verify);
executeTestCase(true, sort, query, dataset, create, verify);
for (Query query : queries) {
executeTestCase(false, query, dataset, create, verify);
executeTestCase(true, query, dataset, create, verify);
}
}
private void executeTestCase(boolean reduced,
Sort sort,
Query query,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
Consumer<InternalComposite> verify) throws IOException {
IndexSettings indexSettings = createIndexSettings(sort);
try (Directory directory = newDirectory()) {
IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random()));
if (sort != null) {
config.setIndexSort(sort);
/**
* Forces the default codec because {@link CompositeValuesSourceBuilder#checkCanEarlyTerminate}
* cannot detect single-valued field with the asserting-codec.
**/
config.setCodec(TestUtil.getDefaultCodec());
}
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (Map<String, List<Object>> fields : dataset) {
addToDocument(document, fields);
@ -1295,12 +1462,8 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, sort == null, sort == null);
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
CompositeAggregationBuilder aggregationBuilder = create.get();
if (sort != null) {
CompositeAggregator aggregator = createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, FIELD_TYPES);
assertTrue(aggregator.canEarlyTerminate());
}
final InternalComposite composite;
if (reduced) {
composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
@ -1312,31 +1475,22 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
}
private static IndexSettings createIndexSettings(Sort sort) {
Settings.Builder builder = Settings.builder();
if (sort != null) {
String[] fields = Arrays.stream(sort.getSort())
.map(SortField::getField)
.toArray(String[]::new);
String[] orders = Arrays.stream(sort.getSort())
.map((o) -> o.getReverse() ? "desc" : "asc")
.toArray(String[]::new);
builder.putList("index.sort.field", fields);
builder.putList("index.sort.order", orders);
}
return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build());
}
private void addToDocument(Document doc, Map<String, List<Object>> keys) {
for (Map.Entry<String, List<Object>> entry : keys.entrySet()) {
final String name = entry.getKey();
for (Object value : entry.getValue()) {
if (value instanceof Long) {
if (value instanceof Integer) {
doc.add(new SortedNumericDocValuesField(name, (int) value));
doc.add(new IntPoint(name, (int) value));
} else if (value instanceof Long) {
doc.add(new SortedNumericDocValuesField(name, (long) value));
doc.add(new LongPoint(name, (long) value));
} else if (value instanceof Double) {
doc.add(new SortedNumericDocValuesField(name, NumericUtils.doubleToSortableLong((double) value)));
doc.add(new DoublePoint(name, (double) value));
} else if (value instanceof String) {
doc.add(new SortedSetDocValuesField(name, new BytesRef((String) value)));
doc.add(new StringField(name, new BytesRef((String) value), Field.Store.NO));
} else {
throw new AssertionError("invalid object: " + value.getClass().getSimpleName());
}

View File

@ -0,0 +1,330 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG;
import static org.hamcrest.Matchers.equalTo;
public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
static class ClassAndName {
final MappedFieldType fieldType;
final Class<? extends Comparable<?>> clazz;
ClassAndName(MappedFieldType fieldType, Class<? extends Comparable<?>> clazz) {
this.fieldType = fieldType;
this.clazz = clazz;
}
}
public void testRandomLong() throws IOException {
testRandomCase(new ClassAndName(createNumber("long", LONG) , Long.class));
}
public void testRandomDouble() throws IOException {
testRandomCase(new ClassAndName(createNumber("double", DOUBLE) , Double.class));
}
public void testRandomDoubleAndLong() throws IOException {
testRandomCase(new ClassAndName(createNumber("double", DOUBLE), Double.class),
new ClassAndName(createNumber("long", LONG), Long.class));
}
public void testRandomDoubleAndKeyword() throws IOException {
testRandomCase(new ClassAndName(createNumber("double", DOUBLE), Double.class),
new ClassAndName(createKeyword("keyword"), BytesRef.class));
}
public void testRandomKeyword() throws IOException {
testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class));
}
public void testRandomLongAndKeyword() throws IOException {
testRandomCase(new ClassAndName(createNumber("long", LONG), Long.class),
new ClassAndName(createKeyword("keyword"), BytesRef.class));
}
public void testRandomLongAndDouble() throws IOException {
testRandomCase(new ClassAndName(createNumber("long", LONG), Long.class),
new ClassAndName(createNumber("double", DOUBLE) , Double.class));
}
public void testRandomKeywordAndLong() throws IOException {
testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class),
new ClassAndName(createNumber("long", LONG), Long.class));
}
public void testRandomKeywordAndDouble() throws IOException {
testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class),
new ClassAndName(createNumber("double", DOUBLE), Double.class));
}
public void testRandom() throws IOException {
int numTypes = randomIntBetween(3, 8);
ClassAndName[] types = new ClassAndName[numTypes];
for (int i = 0; i < numTypes; i++) {
int rand = randomIntBetween(0, 2);
switch (rand) {
case 0:
types[i] = new ClassAndName(createNumber(Integer.toString(i), LONG), Long.class);
break;
case 1:
types[i] = new ClassAndName(createNumber(Integer.toString(i), DOUBLE), Double.class);
break;
case 2:
types[i] = new ClassAndName(createKeyword(Integer.toString(i)), BytesRef.class);
break;
default:
assert(false);
}
}
testRandomCase(true, types);
}
private void testRandomCase(ClassAndName... types) throws IOException {
testRandomCase(true, types);
testRandomCase(false, types);
}
private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IOException {
final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
int numDocs = randomIntBetween(50, 100);
List<Comparable<?>[]> possibleValues = new ArrayList<>();
for (ClassAndName type : types) {
int numValues = randomIntBetween(1, numDocs*2);
Comparable<?>[] values = new Comparable[numValues];
if (type.clazz == Long.class) {
for (int i = 0; i < numValues; i++) {
values[i] = randomLong();
}
} else if (type.clazz == Double.class) {
for (int i = 0; i < numValues; i++) {
values[i] = randomDouble();
}
} else if (type.clazz == BytesRef.class) {
for (int i = 0; i < numValues; i++) {
values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
}
} else {
assert(false);
}
possibleValues.add(values);
}
Set<CompositeKey> keys = new HashSet<>();
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) {
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
List<List<Comparable<?>>> docValues = new ArrayList<>();
boolean hasAllField = true;
for (int j = 0; j < types.length; j++) {
int numValues = randomIntBetween(0, 5);
if (numValues == 0) {
hasAllField = false;
}
List<Comparable<?>> values = new ArrayList<>();
for (int k = 0; k < numValues; k++) {
values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length-1)]);
if (types[j].clazz == Long.class) {
long value = (Long) values.get(k);
document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value));
document.add(new LongPoint(types[j].fieldType.name(), value));
} else if (types[j].clazz == Double.class) {
document.add(new SortedNumericDocValuesField(types[j].fieldType.name(),
NumericUtils.doubleToSortableLong((Double) values.get(k))));
} else if (types[j].clazz == BytesRef.class) {
BytesRef value = (BytesRef) values.get(k);
document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k)));
document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO));
} else {
assert(false);
}
}
docValues.add(values);
}
if (hasAllField) {
List<CompositeKey> comb = createListCombinations(docValues);
keys.addAll(comb);
}
indexWriter.addDocument(document);
}
if (forceMerge) {
indexWriter.forceMerge(1);
}
}
IndexReader reader = DirectoryReader.open(directory);
int size = randomIntBetween(1, keys.size());
SingleDimensionValuesSource<?>[] sources = new SingleDimensionValuesSource[types.length];
for (int i = 0; i < types.length; i++) {
final MappedFieldType fieldType = types[i].fieldType;
if (types[i].clazz == Long.class) {
sources[i] = new LongValuesSource(bigArrays, fieldType,
context -> context.reader().getSortedNumericDocValues(fieldType.name()), value -> value,
DocValueFormat.RAW, size, 1);
} else if (types[i].clazz == Double.class) {
sources[i] = new DoubleValuesSource(bigArrays, fieldType,
context -> FieldData.sortableLongBitsToDoubles(context.reader().getSortedNumericDocValues(fieldType.name())),
size, 1);
} else if (types[i].clazz == BytesRef.class) {
if (forceMerge) {
// we don't create global ordinals but we test this mode when the reader has a single segment
// since ordinals are global in this case.
sources[i] = new GlobalOrdinalValuesSource(bigArrays, fieldType,
context -> context.reader().getSortedSetDocValues(fieldType.name()), size, 1);
} else {
sources[i] = new BinaryValuesSource(fieldType,
context -> FieldData.toString(context.reader().getSortedSetDocValues(fieldType.name())), size, 1);
}
} else {
assert(false);
}
}
CompositeKey[] expected = keys.toArray(new CompositeKey[0]);
Arrays.sort(expected, (a, b) -> compareKey(a, b));
CompositeValuesCollectorQueue queue = new CompositeValuesCollectorQueue(sources, size);
final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery());
for (boolean withProducer : new boolean[] {true, false}) {
if (withProducer && docsProducer == null) {
continue;
}
int pos = 0;
CompositeKey last = null;
while (pos < size) {
queue.clear();
if (last != null) {
queue.setAfter(last.values());
}
for (LeafReaderContext leafReaderContext : reader.leaves()) {
final LeafBucketCollector leafCollector = new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
queue.addIfCompetitive();
}
};
if (withProducer) {
assertEquals(DocIdSet.EMPTY,
docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false));
} else {
final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector);
final Bits liveDocs = leafReaderContext.reader().getLiveDocs();
for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
queueCollector.collect(i);
}
}
}
}
assertEquals(size, Math.min(queue.size(), expected.length - pos));
int ptr = 0;
for (int slot : queue.getSortedSlot()) {
CompositeKey key = queue.toCompositeKey(slot);
assertThat(key, equalTo(expected[ptr++]));
last = key;
}
pos += queue.size();
}
}
reader.close();
}
}
private static MappedFieldType createNumber(String name, NumberFieldMapper.NumberType type) {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(type);
fieldType.setIndexOptions(IndexOptions.DOCS);
fieldType.setName(name);
fieldType.setHasDocValues(true);
fieldType.freeze();
return fieldType;
}
private static MappedFieldType createKeyword(String name) {
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType();
fieldType.setIndexOptions(IndexOptions.DOCS);
fieldType.setName(name);
fieldType.setHasDocValues(true);
fieldType.freeze();
return fieldType;
}
private static int compareKey(CompositeKey key1, CompositeKey key2) {
assert key1.size() == key2.size();
for (int i = 0; i < key1.size(); i++) {
Comparable<Object> cmp1 = (Comparable<Object>) key1.get(i);
int cmp = cmp1.compareTo(key2.get(i));
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private static List<CompositeKey> createListCombinations(List<List<Comparable<?>>> values) {
List<CompositeKey> keys = new ArrayList<>();
createListCombinations(new Comparable[values.size()], values, 0, values.size(), keys);
return keys;
}
private static void createListCombinations(Comparable<?>[] key, List<List<Comparable<?>>> values,
int pos, int maxPos, List<CompositeKey> keys) {
if (pos == maxPos) {
keys.add(new CompositeKey(key.clone()));
} else {
for (Comparable<?> val : values.get(pos)) {
key[pos] = val;
createListCombinations(key, values, pos + 1, maxPos, keys);
}
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.test.ESTestCase;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SingleDimensionValuesSourceTests extends ESTestCase {
public void testBinarySorted() {
MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType();
keyword.setName("keyword");
BinaryValuesSource source = new BinaryValuesSource(keyword, context -> null, 1, 1);
assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null));
IndexReader reader = mockIndexReader(1, 1);
assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
assertNotNull(source.createSortedDocsProducerOrNull(reader, null));
assertNull(source.createSortedDocsProducerOrNull(reader,
new TermQuery(new Term("keyword", "toto)"))));
source = new BinaryValuesSource(keyword, context -> null, 0, -1);
assertNull(source.createSortedDocsProducerOrNull(reader, null));
}
public void testGlobalOrdinalsSorted() {
MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType();
keyword.setName("keyword");
BinaryValuesSource source = new BinaryValuesSource(keyword, context -> null, 1, 1);
assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null));
IndexReader reader = mockIndexReader(1, 1);
assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
assertNotNull(source.createSortedDocsProducerOrNull(reader, null));
assertNull(source.createSortedDocsProducerOrNull(reader,
new TermQuery(new Term("keyword", "toto)"))));
source = new BinaryValuesSource(keyword, context -> null, 1, -1);
assertNull(source.createSortedDocsProducerOrNull(reader, null));
}
public void testNumericSorted() {
for (NumberFieldMapper.NumberType numberType : NumberFieldMapper.NumberType.values()) {
MappedFieldType number = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
number.setName("number");
final SingleDimensionValuesSource<?> source;
if (numberType == NumberFieldMapper.NumberType.BYTE ||
numberType == NumberFieldMapper.NumberType.SHORT ||
numberType == NumberFieldMapper.NumberType.INTEGER ||
numberType == NumberFieldMapper.NumberType.LONG) {
source = new LongValuesSource(BigArrays.NON_RECYCLING_INSTANCE,
number, context -> null, value -> value, DocValueFormat.RAW, 1, 1);
assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null));
IndexReader reader = mockIndexReader(1, 1);
assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
assertNotNull(source.createSortedDocsProducerOrNull(reader, null));
assertNotNull(source.createSortedDocsProducerOrNull(reader, LongPoint.newRangeQuery("number", 0, 1)));
assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)"))));
LongValuesSource sourceRev =
new LongValuesSource(BigArrays.NON_RECYCLING_INSTANCE,
number, context -> null, value -> value, DocValueFormat.RAW, 1, -1);
assertNull(sourceRev.createSortedDocsProducerOrNull(reader, null));
} else if (numberType == NumberFieldMapper.NumberType.HALF_FLOAT ||
numberType == NumberFieldMapper.NumberType.FLOAT ||
numberType == NumberFieldMapper.NumberType.DOUBLE) {
source = new DoubleValuesSource(BigArrays.NON_RECYCLING_INSTANCE,
number, context -> null, 1, 1);
} else{
throw new AssertionError ("missing type:" + numberType.typeName());
}
assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null));
}
}
private static IndexReader mockIndexReader(int maxDoc, int numDocs) {
IndexReader reader = mock(IndexReader.class);
when(reader.hasDeletions()).thenReturn(maxDoc - numDocs > 0);
when(reader.maxDoc()).thenReturn(maxDoc);
when(reader.numDocs()).thenReturn(numDocs);
return reader;
}
}

View File

@ -291,7 +291,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
A internalAgg = (A) a.buildAggregation(0L);
InternalAggregationTestCase.assertMultiBucketConsumer(internalAgg, bucketConsumer);
return internalAgg;
}
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,