Fix date_range aggregation to not cache if now is used

Before this change the processing of the ranges in the date range (and
other range type) aggregations was done when the Aggregator was created.
This meant that the SearchContext did not know that now had been used in
a range until after the decision to cache was made.

This change moves the processing of the ranges to the aggregation builders
so that the search context is made aware that now has been used before
it decides if the request should be cached
This commit is contained in:
Colin Goodheart-Smithe 2016-10-04 19:53:28 +01:00
parent 6a5630f901
commit e168b3b66b
10 changed files with 85 additions and 48 deletions

View File

@ -40,10 +40,10 @@ public class AbstractRangeAggregatorFactory<AF extends AbstractRangeAggregatorFa
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, AF> {
private final InternalRange.Factory<?, ?> rangeFactory;
private final List<R> ranges;
private final R[] ranges;
private final boolean keyed;
public AbstractRangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, List<R> ranges, boolean keyed,
public AbstractRangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, R[] ranges, boolean keyed,
InternalRange.Factory<?, ?> rangeFactory, AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, type, config, context, parent, subFactoriesBuilder, metaData);
@ -55,7 +55,7 @@ public class AbstractRangeAggregatorFactory<AF extends AbstractRangeAggregatorFa
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return new Unmapped<R>(name, ranges, keyed, config.format(), context, parent, rangeFactory, pipelineAggregators, metaData);
return new Unmapped<>(name, ranges, keyed, config.format(), context, parent, rangeFactory, pipelineAggregators, metaData);
}
@Override

View File

@ -19,13 +19,17 @@
package org.elasticsearch.search.aggregations.bucket.range;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator.Range;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import java.io.IOException;
import java.util.ArrayList;
@ -55,6 +59,40 @@ public abstract class AbstractRangeBuilder<AB extends AbstractRangeBuilder<AB, R
keyed = in.readBoolean();
}
/**
* Resolve any strings in the ranges so we have a number value for the from
* and to of each range. The ranges are also sorted before being returned.
*/
protected Range[] processRanges(AggregationContext context, ValuesSourceConfig<Numeric> config) {
Range[] ranges = new Range[this.ranges.size()];
for (int i = 0; i < ranges.length; i++) {
ranges[i] = this.ranges.get(i).process(config.format(), context.searchContext());
}
sortRanges(ranges);
return ranges;
}
private static void sortRanges(final Range[] ranges) {
new InPlaceMergeSorter() {
@Override
protected void swap(int i, int j) {
final Range tmp = ranges[i];
ranges[i] = ranges[j];
ranges[j] = tmp;
}
@Override
protected int compare(int i, int j) {
int cmp = Double.compare(ranges[i].from, ranges[j].from);
if (cmp == 0) {
cmp = Double.compare(ranges[i].to, ranges[j].to);
}
return cmp;
}
}.sort(0, ranges.length);
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeVInt(ranges.size());

View File

@ -114,6 +114,8 @@ public class RangeAggregationBuilder extends AbstractRangeBuilder<RangeAggregati
@Override
protected RangeAggregatorFactory innerBuild(AggregationContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
// We need to call processRanges here so they are parsed before we make the decision of whether to cache the request
Range[] ranges = processRanges(context, config);
return new RangeAggregatorFactory(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder,
metaData);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.range;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput;
@ -210,7 +209,7 @@ public class RangeAggregator extends BucketsAggregator {
final double[] maxTo;
public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
InternalRange.Factory rangeFactory, List<? extends Range> ranges, boolean keyed, AggregationContext aggregationContext,
InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, AggregationContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
@ -220,11 +219,7 @@ public class RangeAggregator extends BucketsAggregator {
this.keyed = keyed;
this.rangeFactory = rangeFactory;
this.ranges = new Range[ranges.size()];
for (int i = 0; i < this.ranges.length; i++) {
this.ranges[i] = ranges.get(i).process(format, context.searchContext());
}
sortRanges(this.ranges);
this.ranges = ranges;
maxTo = new double[this.ranges.length];
maxTo[0] = this.ranges[0].to;
@ -337,45 +332,21 @@ public class RangeAggregator extends BucketsAggregator {
return rangeFactory.create(name, buckets, format, keyed, pipelineAggregators(), metaData());
}
private static void sortRanges(final Range[] ranges) {
new InPlaceMergeSorter() {
@Override
protected void swap(int i, int j) {
final Range tmp = ranges[i];
ranges[i] = ranges[j];
ranges[j] = tmp;
}
@Override
protected int compare(int i, int j) {
int cmp = Double.compare(ranges[i].from, ranges[j].from);
if (cmp == 0) {
cmp = Double.compare(ranges[i].to, ranges[j].to);
}
return cmp;
}
}.sort(0, ranges.length);
}
public static class Unmapped<R extends RangeAggregator.Range> extends NonCollectingAggregator {
private final List<R> ranges;
private final R[] ranges;
private final boolean keyed;
private final InternalRange.Factory factory;
private final DocValueFormat format;
@SuppressWarnings("unchecked")
public Unmapped(String name, List<R> ranges, boolean keyed, DocValueFormat format,
public Unmapped(String name, R[] ranges, boolean keyed, DocValueFormat format,
AggregationContext context,
Aggregator parent, InternalRange.Factory factory, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.ranges = new ArrayList<>();
for (R range : ranges) {
this.ranges.add((R) range.process(format, context.searchContext()));
}
this.ranges = ranges;
this.keyed = keyed;
this.format = format;
this.factory = factory;
@ -384,7 +355,7 @@ public class RangeAggregator extends BucketsAggregator {
@Override
public InternalAggregation buildEmptyAggregation() {
InternalAggregations subAggs = buildEmptySubAggregations();
List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.size());
List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.length);
for (RangeAggregator.Range range : ranges) {
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, format));
}

View File

@ -29,12 +29,11 @@ import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class RangeAggregatorFactory extends AbstractRangeAggregatorFactory<RangeAggregatorFactory, RangeAggregator.Range> {
public RangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, List<Range> ranges, boolean keyed,
public RangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, Range[] ranges, boolean keyed,
Factory<?, ?> rangeFactory, AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder, metaData);

View File

@ -259,6 +259,9 @@ public class DateRangeAggregationBuilder extends AbstractRangeBuilder<DateRangeA
@Override
protected DateRangeAggregatorFactory innerBuild(AggregationContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
// We need to call processRanges here so they are parsed and we know whether `now` has been used before we make
// the decision of whether to cache the request
Range[] ranges = processRanges(context, config);
return new DateRangeAggregatorFactory(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder,
metaData);
}

View File

@ -30,12 +30,11 @@ import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class DateRangeAggregatorFactory extends AbstractRangeAggregatorFactory<DateRangeAggregatorFactory, Range> {
public DateRangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, List<Range> ranges, boolean keyed,
public DateRangeAggregatorFactory(String name, Type type, ValuesSourceConfig<Numeric> config, Range[] ranges, boolean keyed,
Factory<?, ?> rangeFactory, AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, type, config, ranges, keyed, rangeFactory, context, parent, subFactoriesBuilder, metaData);

View File

@ -215,6 +215,7 @@ public class GeoDistanceAggregationBuilder extends ValuesSourceAggregationBuilde
protected ValuesSourceAggregatorFactory<ValuesSource.GeoPoint, ?> innerBuild(AggregationContext context,
ValuesSourceConfig<ValuesSource.GeoPoint> config, AggregatorFactory<?> parent, Builder subFactoriesBuilder)
throws IOException {
Range[] ranges = this.ranges.toArray(new Range[this.range().size()]);
return new GeoDistanceRangeAggregatorFactory(name, type, config, origin, ranges, unit, distanceType, keyed, context, parent,
subFactoriesBuilder, metaData);
}

View File

@ -51,13 +51,13 @@ public class GeoDistanceRangeAggregatorFactory
private final InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> rangeFactory = InternalGeoDistance.FACTORY;
private final GeoPoint origin;
private final List<Range> ranges;
private final Range[] ranges;
private final DistanceUnit unit;
private final GeoDistance distanceType;
private final boolean keyed;
public GeoDistanceRangeAggregatorFactory(String name, Type type, ValuesSourceConfig<ValuesSource.GeoPoint> config, GeoPoint origin,
List<Range> ranges, DistanceUnit unit, GeoDistance distanceType, boolean keyed, AggregationContext context,
Range[] ranges, DistanceUnit unit, GeoDistance distanceType, boolean keyed, AggregationContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, type, config, context, parent, subFactoriesBuilder, metaData);
this.origin = origin;
@ -70,7 +70,7 @@ public class GeoDistanceRangeAggregatorFactory
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return new Unmapped<Range>(name, ranges, keyed, config.format(), context, parent, rangeFactory, pipelineAggregators, metaData);
return new Unmapped<>(name, ranges, keyed, config.format(), context, parent, rangeFactory, pipelineAggregators, metaData);
}
@Override

View File

@ -34,6 +34,8 @@ import org.joda.time.chrono.ISOChronology;
import java.util.List;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
@ -406,11 +408,33 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// If size > 1 and cache flag is set on the request we should cache
final SearchResponse r4 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(1)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).get();
// If the request has an aggregation containng now we should not cache
final SearchResponse r4 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.addAggregation(filter("foo", QueryBuilders.rangeQuery("s").from("now-10y").to("now"))).get();
assertSearchResponse(r4);
assertThat(r4.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// If the request has an aggregation containng now we should not cache
final SearchResponse r5 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.addAggregation(dateRange("foo").field("s").addRange("now-10y", "now")).get();
assertSearchResponse(r5);
assertThat(r5.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// If size > 1 and cache flag is set on the request we should cache
final SearchResponse r6 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(1)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).get();
assertSearchResponse(r6);
assertThat(r6.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),