Support offset in composite aggs (#50609) (#50808)

Adds support for the `offset` parameter to the `date_histogram` source
of composite aggs. The `offset` parameter is supported by the normal
`date_histogram` aggregation and is useful for folks that need to
measure things from, say, 6am one day to 6am the next day.

This is implemented by creating a new `Rounding` that knows how to
handle offsets and delegates to other rounding implementations. That
implementation doesn't fully implement the `Rounding` contract, namely
`nextRoundingValue`. That method isn't used by composite aggs so I can't
be sure that any implementation that I add will be correct. I propose to
leave it throwing `UnsupportedOperationException` until I need it.

Closes #48757
This commit is contained in:
Nik Everett 2020-01-09 14:11:24 -05:00 committed by GitHub
parent c51303d051
commit 1d8e51f89d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 362 additions and 53 deletions

View File

@ -287,6 +287,72 @@ Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
`-08:00`) or as a timezone id, an identifier used in the TZ database like
`America/Los_Angeles`.
*Offset*
include::datehistogram-aggregation.asciidoc[tag=offset-explanation]
[source,console,id=composite-aggregation-datehistogram-offset-example]
----
PUT my_index/_doc/1?refresh
{
"date": "2015-10-01T05:30:00Z"
}
PUT my_index/_doc/2?refresh
{
"date": "2015-10-01T06:30:00Z"
}
GET my_index/_search?size=0
{
"aggs": {
"my_buckets": {
"composite" : {
"sources" : [
{
"date": {
"date_histogram" : {
"field": "date",
"calendar_interval": "day",
"offset": "+6h",
"format": "iso8601"
}
}
}
]
}
}
}
}
----
include::datehistogram-aggregation.asciidoc[tag=offset-result-intro]
[source,console-result]
----
{
...
"aggregations": {
"my_buckets": {
"after_key": { "date": "2015-10-01T06:00:00.000Z" },
"buckets": [
{
"key": { "date": "2015-09-30T06:00:00.000Z" },
"doc_count": 1
},
{
"key": { "date": "2015-10-01T06:00:00.000Z" },
"doc_count": 1
}
]
}
}
}
----
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
include::datehistogram-aggregation.asciidoc[tag=offset-note]
===== Mixing different values source
The `sources` parameter accepts an array of values source.

View File

@ -461,8 +461,10 @@ the bucket covering that day will only hold data for 23 hours instead of the usu
where you'll have only a 11h bucket on the morning of 27 March when the DST shift
happens.
[[search-aggregations-bucket-datehistogram-offset]]
===== Offset
// tag::offset-explanation[]
Use the `offset` parameter to change the start value of each bucket by the
specified positive (`+`) or negative offset (`-`) duration, such as `1h` for
an hour, or `1d` for a day. See <<time-units>> for more possible time
@ -471,6 +473,7 @@ duration options.
For example, when using an interval of `day`, each bucket runs from midnight
to midnight. Setting the `offset` parameter to `+6h` changes each bucket
to run from 6am to 6am:
// end::offset-explanation[]
[source,console]
-----------------------------
@ -498,8 +501,10 @@ GET my_index/_search?size=0
}
-----------------------------
// tag::offset-result-intro[]
Instead of a single bucket starting at midnight, the above request groups the
documents into buckets starting at 6am:
// end::offset-result-intro[]
[source,console-result]
-----------------------------
@ -525,8 +530,10 @@ documents into buckets starting at 6am:
-----------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
// tag::offset-note[]
NOTE: The start `offset` of each bucket is calculated after `time_zone`
adjustments have been made.
// end::offset-note[]
===== Keyed Response

View File

@ -390,6 +390,40 @@ setup:
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
---
"Composite aggregation with date_histogram offset":
- skip:
version: " - 7.5.99"
reason: offset introduced in 7.6.0
- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
test:
composite:
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"calendar_interval": "1d",
"offset": "4h",
"format": "iso8601" # Format makes the comparisons a little more obvious
}
}
}
]
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-19T04:00:00.000Z" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.date: "2017-10-21T04:00:00.000Z" }
- match: { aggregations.test.buckets.1.doc_count: 1 }
---
"Composite aggregation with after_key in the response":
- skip:
@ -702,7 +736,6 @@ setup:
reason: geotile_grid is not supported until 7.5.0
- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
@ -725,7 +758,8 @@ setup:
]
after: { "geo": "12/730/1590", "kw": "foo" }
- match: {hits.total: 6}
- match: { hits.total.value: 6 }
- match: { hits.total.relation: "eq" }
- length: { aggregations.test.buckets: 3 }
- match: { aggregations.test.buckets.0.key.geo: "12/1236/1533" }
- match: { aggregations.test.buckets.0.key.kw: "bar" }

View File

@ -184,6 +184,7 @@ public abstract class Rounding implements Writeable {
private final long interval;
private ZoneId timeZone = ZoneOffset.UTC;
private long offset = 0;
public Builder(DateTimeUnit unit) {
this.unit = unit;
@ -205,14 +206,28 @@ public abstract class Rounding implements Writeable {
return this;
}
public Rounding build() {
Rounding timeZoneRounding;
if (unit != null) {
timeZoneRounding = new TimeUnitRounding(unit, timeZone);
} else {
timeZoneRounding = new TimeIntervalRounding(interval, timeZone);
/**
* Sets the offset of this rounding from the normal beginning of the interval. Use this
* to start days at 6am or months on the 15th.
* @param offset the offset, in milliseconds
*/
public Builder offset(long offset) {
this.offset = offset;
return this;
}
return timeZoneRounding;
public Rounding build() {
Rounding rounding;
if (unit != null) {
rounding = new TimeUnitRounding(unit, timeZone);
} else {
rounding = new TimeIntervalRounding(interval, timeZone);
}
if (offset != 0) {
rounding = new OffsetRounding(rounding, offset);
}
return rounding;
}
}
@ -237,7 +252,17 @@ public abstract class Rounding implements Writeable {
}
TimeUnitRounding(StreamInput in) throws IOException {
this(DateTimeUnit.resolve(in.readByte()), DateUtils.of(in.readString()));
this(DateTimeUnit.resolve(in.readByte()), in.readZoneId());
}
@Override
public void innerWriteTo(StreamOutput out) throws IOException {
out.writeByte(unit.getId());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZoneId(timeZone);
} else {
out.writeString(DateUtils.zoneIdToDateTimeZone(timeZone).getID());
}
}
@Override
@ -399,16 +424,6 @@ public abstract class Rounding implements Writeable {
}
}
@Override
public void innerWriteTo(StreamOutput out) throws IOException {
out.writeByte(unit.getId());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeString(timeZone.getId());
} else {
out.writeString(DateUtils.zoneIdToDateTimeZone(timeZone).getID());
}
}
@Override
public int hashCode() {
return Objects.hash(unit, timeZone);
@ -428,19 +443,11 @@ public abstract class Rounding implements Writeable {
@Override
public String toString() {
return "[" + timeZone + "][" + unit + "]";
return "Rounding[" + unit + " in " + timeZone + "]";
}
}
static class TimeIntervalRounding extends Rounding {
@Override
public String toString() {
return "TimeIntervalRounding{" +
"interval=" + interval +
", timeZone=" + timeZone +
'}';
}
static final byte ID = 2;
/** Since, there is no offset of -1 ms, it is safe to use -1 for non-fixed timezones */
private static final long TZ_OFFSET_NON_FIXED = -1;
@ -460,7 +467,17 @@ public abstract class Rounding implements Writeable {
}
TimeIntervalRounding(StreamInput in) throws IOException {
this(in.readVLong(), DateUtils.of(in.readString()));
this(in.readVLong(), in.readZoneId());
}
@Override
public void innerWriteTo(StreamOutput out) throws IOException {
out.writeVLong(interval);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZoneId(timeZone);
} else {
out.writeString(DateUtils.zoneIdToDateTimeZone(timeZone).getID());
}
}
@Override
@ -537,16 +554,6 @@ public abstract class Rounding implements Writeable {
.toInstant().toEpochMilli();
}
@Override
public void innerWriteTo(StreamOutput out) throws IOException {
out.writeVLong(interval);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeString(timeZone.getId());
} else {
out.writeString(DateUtils.zoneIdToDateTimeZone(timeZone).getID());
}
}
@Override
public int hashCode() {
return Objects.hash(interval, timeZone);
@ -563,21 +570,86 @@ public abstract class Rounding implements Writeable {
TimeIntervalRounding other = (TimeIntervalRounding) obj;
return Objects.equals(interval, other.interval) && Objects.equals(timeZone, other.timeZone);
}
@Override
public String toString() {
return "Rounding[" + interval + " in " + timeZone + "]";
}
}
static class OffsetRounding extends Rounding {
static final byte ID = 3;
private final Rounding delegate;
private final long offset;
OffsetRounding(Rounding delegate, long offset) {
this.delegate = delegate;
this.offset = offset;
}
OffsetRounding(StreamInput in) throws IOException {
// Versions before 7.6.0 will never send this type of rounding.
delegate = Rounding.read(in);
offset = in.readZLong();
}
@Override
public void innerWriteTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_7_6_0)) {
throw new IllegalArgumentException("Offset rounding not supported before 7.6.0");
}
delegate.writeTo(out);
out.writeZLong(offset);
}
@Override
public byte id() {
return ID;
}
@Override
public long round(long value) {
return delegate.round(value - offset) + offset;
}
@Override
public long nextRoundingValue(long value) {
// This isn't needed by the current users. We'll implement it when we migrate other users to it.
throw new UnsupportedOperationException("not yet supported");
}
@Override
public int hashCode() {
return Objects.hash(delegate, offset);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OffsetRounding other = (OffsetRounding) obj;
return delegate.equals(other.delegate) && offset == other.offset;
}
@Override
public String toString() {
return delegate + " offset by " + offset;
}
}
public static Rounding read(StreamInput in) throws IOException {
Rounding rounding;
byte id = in.readByte();
switch (id) {
case TimeUnitRounding.ID:
rounding = new TimeUnitRounding(in);
break;
return new TimeUnitRounding(in);
case TimeIntervalRounding.ID:
rounding = new TimeIntervalRounding(in);
break;
return new TimeIntervalRounding(in);
case OffsetRounding.ID:
return new OffsetRounding(in);
default:
throw new ElasticsearchException("unknown rounding id [" + id + "]");
}
return rounding;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
@ -31,9 +32,11 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@ -56,6 +59,13 @@ public class DateHistogramValuesSourceBuilder
PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
PARSER.declareString(DateHistogramValuesSourceBuilder::format, new ParseField("format"));
DateIntervalWrapper.declareIntervalFields(PARSER);
PARSER.declareField(DateHistogramValuesSourceBuilder::offset, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return p.longValue();
} else {
return DateHistogramAggregationBuilder.parseStringOffset(p.text());
}
}, Histogram.OFFSET_FIELD, ObjectParser.ValueType.LONG);
PARSER.declareField(DateHistogramValuesSourceBuilder::timeZone, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
@ -71,6 +81,7 @@ public class DateHistogramValuesSourceBuilder
private ZoneId timeZone = null;
private DateIntervalWrapper dateHistogramInterval = new DateIntervalWrapper();
private long offset = 0;
public DateHistogramValuesSourceBuilder(String name) {
super(name, ValueType.DATE);
@ -80,12 +91,18 @@ public class DateHistogramValuesSourceBuilder
super(in);
dateHistogramInterval = new DateIntervalWrapper(in);
timeZone = in.readOptionalZoneId();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
offset = in.readLong();
}
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
dateHistogramInterval.writeTo(out);
out.writeOptionalZoneId(timeZone);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeLong(offset);
}
}
@Override
@ -215,9 +232,25 @@ public class DateHistogramValuesSourceBuilder
return timeZone;
}
/**
* Get the offset to use when rounding, which is a number of milliseconds.
*/
public long offset() {
return offset;
}
/**
* Set the offset on this builder, which is a number of milliseconds.
* @return this for chaining
*/
public DateHistogramValuesSourceBuilder offset(long offset) {
this.offset = offset;
return this;
}
@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig<?> config) throws IOException {
Rounding rounding = dateHistogramInterval.createRounding(timeZone());
Rounding rounding = dateHistogramInterval.createRounding(timeZone(), offset);
ValuesSource orig = config.toValuesSource(queryShardContext);
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;

View File

@ -291,7 +291,10 @@ public class DateHistogramAggregationBuilder extends ValuesSourceAggregationBuil
return offset(parseStringOffset(offset));
}
static long parseStringOffset(String offset) {
/**
* Parse the string specification of an offset.
*/
public static long parseStringOffset(String offset) {
if (offset.charAt(0) == '-') {
return -TimeValue
.parseTimeValue(offset.substring(1), null, DateHistogramAggregationBuilder.class.getSimpleName() + ".parseOffset")
@ -496,13 +499,14 @@ public class DateHistogramAggregationBuilder extends ValuesSourceAggregationBuil
AggregatorFactory parent,
Builder subFactoriesBuilder) throws IOException {
final ZoneId tz = timeZone();
final Rounding rounding = dateHistogramInterval.createRounding(tz);
// TODO use offset here rather than explicitly in the aggregation
final Rounding rounding = dateHistogramInterval.createRounding(tz, 0);
final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone);
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, 0);
}
ExtendedBounds roundedBounds = null;

View File

@ -275,7 +275,7 @@ public class DateIntervalWrapper implements ToXContentFragment, Writeable {
}
}
public Rounding createRounding(ZoneId timeZone) {
public Rounding createRounding(ZoneId timeZone, long offset) {
Rounding.Builder tzRoundingBuilder;
if (isEmpty()) {
throw new IllegalArgumentException("Invalid interval specified, must be non-null and non-empty");
@ -302,6 +302,7 @@ public class DateIntervalWrapper implements ToXContentFragment, Writeable {
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
tzRoundingBuilder.offset(offset);
return tzRoundingBuilder.build();
}

View File

@ -195,6 +195,18 @@ public class RoundingTests extends ESTestCase {
assertThat(tzRounding_chg.round(time("2014-11-02T06:01:01", chg)), isDate(time("2014-11-02T06:00:00", chg), chg));
}
public void testOffsetRounding() {
long twoHours = TimeUnit.HOURS.toMillis(2);
long oneDay = TimeUnit.DAYS.toMillis(1);
Rounding rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(twoHours).build();
assertThat(rounding.round(0), equalTo(-oneDay + twoHours));
assertThat(rounding.round(twoHours), equalTo(twoHours));
rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(-twoHours).build();
assertThat(rounding.round(0), equalTo(-twoHours));
assertThat(rounding.round(oneDay - twoHours), equalTo(oneDay - twoHours));
}
/**
* Randomized test on TimeUnitRounding. Test uses random
* {@link DateTimeUnit} and {@link ZoneId} and often (50% of the time)

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.common.Rounding.DateTimeUnit;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class RoundingWireTests extends AbstractWireSerializingTestCase<Rounding> {
@Override
protected Rounding createTestInstance() {
Rounding.Builder builder;
if (randomBoolean()) {
builder = Rounding.builder(randomFrom(DateTimeUnit.values()));
} else {
// The time value's millisecond component must be > 0 so we're limited in the suffixes we can use.
final String tv = randomTimeValue(1, 1000, "d", "h", "ms", "s", "m");
builder = Rounding.builder(TimeValue.parseTimeValue(tv, "test"));
}
if (randomBoolean()) {
builder.timeZone(randomZone());
}
if (randomBoolean()) {
builder.offset(randomLong());
}
return builder.build();
}
@Override
protected Reader<Rounding> instanceReader() {
return Rounding::read;
}
}

View File

@ -90,6 +90,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -1020,7 +1021,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1));
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
},
(result) -> {
@ -1044,7 +1045,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1));
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", 1474329600000L));
@ -1058,7 +1059,35 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
}
);
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
/*
* Tests a four hour offset, which moves the document with
* date 2017-10-20T03:08:45 into 2017-10-19's bucket.
*/
testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
LongPoint.newRangeQuery(
"date",
asLong("2016-09-20T09:00:34"),
asLong("2017-10-20T06:09:24")
)), dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.calendarInterval(DateHistogramInterval.days(1))
.offset(TimeUnit.HOURS.toMillis(4));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", 1474329600000L));
}, (result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1508472000000}", result.afterKey().toString());
assertEquals("{date=1474344000000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508385600000}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1508472000000}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
}
);
}
public void testWithDateTerms() throws IOException {