Drop rewriting in date_histogram (backport of #57836) (#58875)

The `date_histogram` aggregation had an optimization where it'd rewrite
`time_zones` who's offset from UTC is fixed across the entire index.
This rewrite is no longer needed after #56371 because we can tell that a
time zone is fixed lower down in the aggregation. So this removes it.
This commit is contained in:
Nik Everett 2020-07-01 17:19:12 -04:00 committed by GitHub
parent c64e283dbf
commit 5e49ee800e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 318 deletions

View File

@ -479,30 +479,6 @@ public final class DateFieldMapper extends FieldMapper {
}
}
return isFieldWithinRange(reader, fromInclusive, toInclusive);
}
/**
* Return whether all values of the given {@link IndexReader} are within the range,
* outside the range or cross the range. Unlike {@link #isFieldWithinQuery} this
* accepts values that are out of the range of the {@link #resolution} of this field.
* @param fromInclusive start date, inclusive
* @param toInclusive end date, inclusive
*/
public Relation isFieldWithinRange(IndexReader reader, Instant fromInclusive, Instant toInclusive)
throws IOException {
return isFieldWithinRange(reader,
resolution.convert(resolution.clampToValidRange(fromInclusive)),
resolution.convert(resolution.clampToValidRange(toInclusive)));
}
/**
* Return whether all values of the given {@link IndexReader} are within the range,
* outside the range or cross the range.
* @param fromInclusive start date, inclusive, {@link Resolution#convert(Instant) converted} to the appropriate scale
* @param toInclusive end date, inclusive, {@link Resolution#convert(Instant) converted} to the appropriate scale
*/
private Relation isFieldWithinRange(IndexReader reader, long fromInclusive, long toInclusive) throws IOException {
if (PointValues.size(reader, name()) == 0) {
// no points, so nothing matches
return Relation.DISJOINT;

View File

@ -19,10 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -30,11 +26,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.LeafNumericFieldData;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Relation;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -50,10 +41,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.zone.ZoneOffsetTransition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -407,121 +395,6 @@ protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBu
return NAME;
}
/**
* Returns a {@linkplain ZoneId} that functions the same as
* {@link #timeZone()} on the data in the shard referred to by
* {@code context}. It <strong>attempts</strong> to convert zones that
* have non-fixed offsets into fixed offset zones that produce the
* same results on all data in the shard.
* <p>
* We go about this in three phases:
* <ol>
* <li>A bunch of preflight checks to see if we *can* optimize it
* <li>Find the any Instant in shard
* <li>Find the DST transition before and after that Instant
* <li>Round those into the interval
* <li>Check if the rounded value include all values within shard
* <li>If they do then return a fixed offset time zone because it
* will return the same values for all time in the shard as the
* original time zone, but faster
* <li>Otherwise return the original time zone. It'll be slower, but
* correct.
* </ol>
* <p>
* NOTE: this can't be done in rewrite() because the timezone is then also used on the
* coordinating node in order to generate missing buckets, which may cross a transition
* even though data on the shards doesn't.
*/
ZoneId rewriteTimeZone(QueryShardContext context) throws IOException {
final ZoneId tz = timeZone();
if (tz == null || tz.getRules().isFixedOffset()) {
// This time zone is already as fast as it is going to get.
return tz;
}
if (script() != null) {
// We can't be sure what dates the script will return so we don't attempt to optimize anything
return tz;
}
if (field() == null) {
// Without a field we're not going to be able to look anything up.
return tz;
}
MappedFieldType ft = context.fieldMapper(field());
if (ft == null || false == ft instanceof DateFieldMapper.DateFieldType) {
// If the field is unmapped or not a date then we can't get its range.
return tz;
}
DateFieldMapper.DateFieldType dft = (DateFieldMapper.DateFieldType) ft;
final IndexReader reader = context.getIndexReader();
if (reader == null) {
return tz;
}
Instant instant = null;
final IndexNumericFieldData fieldData = context.getForField(ft);
for (LeafReaderContext ctx : reader.leaves()) {
LeafNumericFieldData leafFD = fieldData.load(ctx);
SortedNumericDocValues values = leafFD.getLongValues();
if (values.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
instant = Instant.ofEpochMilli(values.nextValue());
break;
}
}
if (instant == null) {
return tz;
}
ZoneOffsetTransition prevOffsetTransition = tz.getRules().previousTransition(instant);
final long prevTransition;
if (prevOffsetTransition != null) {
prevTransition = prevOffsetTransition.getInstant().toEpochMilli();
} else {
prevTransition = instant.toEpochMilli();
}
ZoneOffsetTransition nextOffsetTransition = tz.getRules().nextTransition(instant);
final long nextTransition;
if (nextOffsetTransition != null) {
nextTransition = nextOffsetTransition.getInstant().toEpochMilli();
} else {
nextTransition = Long.MAX_VALUE; // fixed time-zone after prevTransition
}
// We need all not only values but also rounded values to be within
// [prevTransition, nextTransition].
final long low;
DateIntervalWrapper.IntervalTypeEnum intervalType = dateHistogramInterval.getIntervalType();
if (intervalType.equals(DateIntervalWrapper.IntervalTypeEnum.FIXED)) {
low = Math.addExact(prevTransition, dateHistogramInterval.tryIntervalAsFixedUnit().millis());
} else if (intervalType.equals(DateIntervalWrapper.IntervalTypeEnum.CALENDAR)) {
final Rounding.DateTimeUnit intervalAsUnit = dateHistogramInterval.tryIntervalAsCalendarUnit();
final Rounding rounding = Rounding.builder(intervalAsUnit).timeZone(timeZone()).build();
low = rounding.nextRoundingValue(prevTransition);
} else {
// We're not sure what the interval was originally (legacy) so use old behavior of assuming
// calendar first, then fixed. Required because fixed/cal overlap in places ("1h")
Rounding.DateTimeUnit intervalAsUnit = dateHistogramInterval.tryIntervalAsCalendarUnit();
if (intervalAsUnit != null) {
final Rounding rounding = Rounding.builder(intervalAsUnit).timeZone(timeZone()).build();
low = rounding.nextRoundingValue(prevTransition);
} else {
final TimeValue intervalAsMillis = dateHistogramInterval.tryIntervalAsFixedUnit();
low = Math.addExact(prevTransition, intervalAsMillis.millis());
}
}
// rounding rounds down, so 'nextTransition' is a good upper bound
final long high = nextTransition;
if (dft.isFieldWithinRange(
reader, Instant.ofEpochMilli(low), Instant.ofEpochMilli(high - 1)) == Relation.WITHIN) {
// All values in this reader have the same offset despite daylight saving times.
// This is very common for location-based timezones such as Europe/Paris in
// combination with time-based indices.
return ZoneOffset.ofTotalSeconds(tz.getRules().getOffset(instant).getTotalSeconds());
}
return tz;
}
@Override
protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig config,
@ -529,22 +402,25 @@ protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBu
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
final ZoneId tz = timeZone();
final Rounding rounding = dateHistogramInterval.createRounding(tz, offset);
// TODO once we optimize TimeIntervalRounding we won't need to rewrite the time zone
final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, offset);
}
ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, queryShardContext, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, queryShardContext, parent, subFactoriesBuilder, metadata);
return new DateHistogramAggregatorFactory(
name,
config,
order,
keyed,
minDocCount,
rounding,
roundedBounds,
queryShardContext,
parent,
subFactoriesBuilder,
metadata
);
}
@Override

View File

@ -54,20 +54,26 @@ public final class DateHistogramAggregatorFactory extends ValuesSourceAggregator
private final long minDocCount;
private final ExtendedBounds extendedBounds;
private final Rounding rounding;
private final Rounding shardRounding;
public DateHistogramAggregatorFactory(String name, ValuesSourceConfig config,
BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata) throws IOException {
public DateHistogramAggregatorFactory(
String name,
ValuesSourceConfig config,
BucketOrder order,
boolean keyed,
long minDocCount,
Rounding rounding,
ExtendedBounds extendedBounds,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.rounding = rounding;
this.shardRounding = shardRounding;
}
public long minDocCount() {
@ -89,7 +95,7 @@ public final class DateHistogramAggregatorFactory extends ValuesSourceAggregator
// TODO: Is there a reason not to get the prepared rounding in the supplier itself?
Rounding.Prepared preparedRounding = config.getValuesSource()
.roundingPreparer(queryShardContext.getIndexReader())
.apply(shardRounding);
.apply(rounding);
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(
name,
factories,

View File

@ -75,7 +75,6 @@ public class DateFieldTypeTests extends FieldTypeTestCase<DateFieldType> {
DateFieldType ft = new DateFieldType("my_date");
assertEquals(Relation.DISJOINT, ft.isFieldWithinQuery(reader, "2015-10-12", "2016-04-03",
randomBoolean(), randomBoolean(), null, null, context));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03")));
}
public void testIsFieldWithinQueryDateMillis() throws IOException {
@ -109,49 +108,11 @@ public class DateFieldTypeTests extends FieldTypeTestCase<DateFieldType> {
doTestIsFieldWithinQuery(ft, reader, DateTimeZone.UTC, alternateFormat);
QueryRewriteContext context = new QueryRewriteContext(xContentRegistry(), writableRegistry(), null, () -> nowInMillis);
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2015-10-09"), instant("2016-01-02")));
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2016-01-02"), instant("2016-06-20")));
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2016-01-02"), instant("2016-02-12")));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2014-01-02"), instant("2015-02-12")));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2016-05-11"), instant("2016-08-30")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-09-25"), instant("2016-05-29")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusMillis(1), instant("2016-04-03").minusMillis(1)));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusMillis(1), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03").minusMillis(1)));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusNanos(1), instant("2016-04-03").minusNanos(1)));
assertEquals(ft.resolution() == Resolution.NANOSECONDS ? Relation.INTERSECTS : Relation.WITHIN, // Millis round down here.
ft.isFieldWithinRange(reader, instant("2015-10-12").plusNanos(1), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03").minusNanos(1)));
// Some edge cases
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.EPOCH, instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.ofEpochMilli(-1000), instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.ofEpochMilli(Long.MIN_VALUE), instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-10-12"), Instant.ofEpochMilli(Long.MAX_VALUE)));
// Fields with no value indexed.
DateFieldType ft2 = new DateFieldType("my_date2");
assertEquals(Relation.DISJOINT, ft2.isFieldWithinQuery(reader, "2015-10-09", "2016-01-02", false, false, null, null, context));
assertEquals(Relation.DISJOINT, ft2.isFieldWithinRange(reader, instant("2015-10-09"), instant("2016-01-02")));
// Fire a bunch of random values into isFieldWithinRange to make sure it doesn't crash
for (int iter = 0; iter < 1000; iter++) {
long min = randomLong();
long max = randomLong();
if (min > max) {
long swap = max;
max = min;
min = swap;
}
ft.isFieldWithinRange(reader, Instant.ofEpochMilli(min), Instant.ofEpochMilli(max));
}
IOUtils.close(reader, w, dir);
}

View File

@ -19,24 +19,9 @@
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
@ -129,99 +114,4 @@ public class DateHistogramTests extends BaseAggregationTestCase<DateHistogramAgg
return orders;
}
private static Document documentForDate(String field, long millis) {
Document doc = new Document();
final long value;
switch (field) {
case DATE_FIELD_NAME:
value = millis;
break;
case DATE_NANOS_FIELD_NAME:
value = DateUtils.toNanoSeconds(millis);
break;
default:
throw new AssertionError();
}
doc.add(new LongPoint(field, value));
doc.add(new SortedNumericDocValuesField(field, value));
return doc;
}
public void testRewriteTimeZone() throws IOException {
DateFormatter format = DateFormatter.forPattern("strict_date_optional_time");
for (String fieldName : new String[]{DATE_FIELD_NAME, DATE_NANOS_FIELD_NAME}) {
try (Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
long millis1 = DateFormatters.from(format.parse("2018-03-11T11:55:00")).toInstant().toEpochMilli();
w.addDocument(documentForDate(fieldName, millis1));
long millis2 = DateFormatters.from(format.parse("2017-10-30T18:13:00")).toInstant().toEpochMilli();
w.addDocument(documentForDate(fieldName, millis2));
try (IndexReader readerThatDoesntCross = DirectoryReader.open(w)) {
long millis3 = DateFormatters.from(format.parse("2018-03-25T02:44:00")).toInstant().toEpochMilli();
w.addDocument(documentForDate(fieldName, millis3));
try (IndexReader readerThatCrosses = DirectoryReader.open(w)) {
QueryShardContext shardContextThatDoesntCross = createShardContext(new IndexSearcher(readerThatDoesntCross));
QueryShardContext shardContextThatCrosses = createShardContext(new IndexSearcher(readerThatCrosses));
DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(fieldName);
builder.calendarInterval(DateHistogramInterval.DAY);
// no timeZone => no rewrite
assertNull(builder.rewriteTimeZone(shardContextThatDoesntCross));
assertNull(builder.rewriteTimeZone(shardContextThatCrosses));
// fixed timeZone => no rewrite
ZoneId tz = ZoneOffset.ofHours(1);
builder.timeZone(tz);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
// timeZone without DST => always rewrite
tz = ZoneId.of("Australia/Brisbane");
builder.timeZone(tz);
assertSame(ZoneOffset.ofHours(10), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(ZoneOffset.ofHours(10), builder.rewriteTimeZone(shardContextThatCrosses));
// another timeZone without DST => always rewrite
tz = ZoneId.of("Asia/Katmandu");
builder.timeZone(tz);
assertSame(ZoneOffset.ofHoursMinutes(5, 45), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(ZoneOffset.ofHoursMinutes(5, 45), builder.rewriteTimeZone(shardContextThatCrosses));
// daylight-saving-times => rewrite if doesn't cross
tz = ZoneId.of("Europe/Paris");
builder.timeZone(tz);
assertEquals(ZoneOffset.ofHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
// Rounded values are no longer all within the same transitions => no rewrite
builder.calendarInterval(DateHistogramInterval.MONTH);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(fieldName);
builder.timeZone(tz);
builder.fixedInterval(new DateHistogramInterval(1000L * 60 * 60 * 24 + "ms")); // ~ 1 day
assertEquals(ZoneOffset.ofHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
// Because the interval is large, rounded values are not
// within the same transitions as the values => no rewrite
builder.fixedInterval(new DateHistogramInterval(1000L * 60 * 60 * 24 * 30 + "ms")); // ~ 1 month
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
}
}
}
}
}
}