[7.x] Add rate aggregation (#61369) (#61554)

Adds a new rate aggregation that can calculate a document rate for buckets
of a date_histogram.

Closes #60674
This commit is contained in:
Igor Motov 2020-08-25 17:39:00 -04:00 committed by GitHub
parent 82585107aa
commit f70a59971a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1839 additions and 42 deletions

View File

@ -51,6 +51,7 @@ include::metrics/valuecount-aggregation.asciidoc[]
include::metrics/t-test-aggregation.asciidoc[]
include::metrics/rate-aggregation.asciidoc[]

View File

@ -0,0 +1,257 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-metrics-rate-aggregation]]
=== Rate Aggregation
A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
`date_histogram` bucket.
==== Syntax
A `rate` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"rate": {
"unit": "month",
"field": "requests"
}
}
--------------------------------------------------
// NOTCONSOLE
The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket
into per annual sales rate.
[source,console]
--------------------------------------------------
GET sales/_search
{
"size": 0,
"aggs": {
"by_date": {
"date_histogram": {
"field": "date",
"calendar_interval": "month" <1>
},
"aggs": {
"my_rate": {
"rate": {
"unit": "year" <2>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> Histogram is grouped by month.
<2> But the rate is converted into annual rate.
The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying monthly rate by 12.
[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"by_date" : {
"buckets" : [
{
"key_as_string" : "2015/01/01 00:00:00",
"key" : 1420070400000,
"doc_count" : 3,
"my_rate" : {
"value" : 36.0
}
},
{
"key_as_string" : "2015/02/01 00:00:00",
"key" : 1422748800000,
"doc_count" : 2,
"my_rate" : {
"value" : 24.0
}
},
{
"key_as_string" : "2015/03/01 00:00:00",
"key" : 1425168000000,
"doc_count" : 2,
"my_rate" : {
"value" : 24.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
Instead of counting the number of documents, it is also possible to calculate a sum of all values of the fields in the documents in each
bucket. The following request will group all sales records into monthly bucket and than calculate the total monthly sales and convert them
into average daily sales.
[source,console]
--------------------------------------------------
GET sales/_search
{
"size": 0,
"aggs": {
"by_date": {
"date_histogram": {
"field": "date",
"calendar_interval": "month" <1>
},
"aggs": {
"avg_price": {
"rate": {
"field": "price", <2>
"unit": "day" <3>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> Histogram is grouped by month.
<2> Calculate sum of all sale prices
<3> Convert to average daily sales
The response will contain the average daily sale prices for each month.
[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"by_date" : {
"buckets" : [
{
"key_as_string" : "2015/01/01 00:00:00",
"key" : 1420070400000,
"doc_count" : 3,
"avg_price" : {
"value" : 17.741935483870968
}
},
{
"key_as_string" : "2015/02/01 00:00:00",
"key" : 1422748800000,
"doc_count" : 2,
"avg_price" : {
"value" : 2.142857142857143
}
},
{
"key_as_string" : "2015/03/01 00:00:00",
"key" : 1425168000000,
"doc_count" : 2,
"avg_price" : {
"value" : 12.096774193548388
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
==== Relationship between bucket sizes and rate
The `rate` aggregation supports all rate that can be used <<calendar_intervals,calendar_intervals parameter>> of `date_histogram`
aggregation. The specified rate should compatible with the `date_histogram` aggregation interval, i.e. it should be possible to
convert the bucket size into the rate. By default the interval of the `date_histogram` is used.
`"rate": "second"`:: compatible with all intervals
`"rate": "minute"`:: compatible with all intervals
`"rate": "hour"`:: compatible with all intervals
`"rate": "day"`:: compatible with all intervals
`"rate": "week"`:: compatible with all intervals
`"rate": "month"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
`"rate": "quarter"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
`"rate": "year"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
==== Script
The `rate` aggregation also supports scripting. For example, if we need to adjust out prices before calculating rates, we could use
a script to recalculate them on-the-fly:
[source,console]
--------------------------------------------------
GET sales/_search
{
"size": 0,
"aggs": {
"by_date": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"avg_price": {
"rate": {
"script": { <1>
"lang": "painless",
"source": "doc['price'].value * params.adjustment",
"params": {
"adjustment": 0.9 <2>
}
}
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> The `field` parameter is replaced with a `script` parameter, which uses the
script to generate values which percentiles are calculated on.
<2> Scripting supports parameterized input just like any other script.
[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"by_date" : {
"buckets" : [
{
"key_as_string" : "2015/01/01 00:00:00",
"key" : 1420070400000,
"doc_count" : 3,
"avg_price" : {
"value" : 495.0
}
},
{
"key_as_string" : "2015/02/01 00:00:00",
"key" : 1422748800000,
"doc_count" : 2,
"avg_price" : {
"value" : 54.0
}
},
{
"key_as_string" : "2015/03/01 00:00:00",
"key" : 1425168000000,
"doc_count" : 2,
"avg_price" : {
"value" : 337.5
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

View File

@ -279,7 +279,16 @@ GET /_xpack/usage
"analytics" : {
"available" : true,
"enabled" : true,
...
"stats": {
"boxplot_usage" : 0,
"top_metrics_usage" : 0,
"normalize_usage" : 0,
"cumulative_cardinality_usage" : 0,
"t_test_usage" : 0,
"rate_usage" : 0,
"string_stats_usage" : 0,
"moving_percentiles_usage" : 0
}
},
"data_streams" : {
"available" : true,
@ -294,7 +303,6 @@ GET /_xpack/usage
// TESTRESPONSE[s/"eql" : \{[^\}]*\},/"eql" : $body.$_path,/]
// TESTRESPONSE[s/"ilm" : \{[^\}]*\},/"ilm" : $body.$_path,/]
// TESTRESPONSE[s/"slm" : \{[^\}]*\},/"slm" : $body.$_path,/]
// TESTRESPONSE[s/"analytics" : \{[^\}]*\}/"analytics" : $body.$_path/]
// TESTRESPONSE[s/ : true/ : $body.$_path/]
// TESTRESPONSE[s/ : false/ : $body.$_path/]
// TESTRESPONSE[s/ : (\-)?[0-9]+/ : $body.$_path/]

View File

@ -45,6 +45,7 @@ import java.time.temporal.TemporalQueries;
import java.time.zone.ZoneOffsetTransition;
import java.time.zone.ZoneRules;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -61,7 +62,13 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class Rounding implements Writeable {
public enum DateTimeUnit {
WEEK_OF_WEEKYEAR((byte) 1, IsoFields.WEEK_OF_WEEK_BASED_YEAR) {
WEEK_OF_WEEKYEAR(
(byte) 1,
"week",
IsoFields.WEEK_OF_WEEK_BASED_YEAR,
true,
TimeUnit.DAYS.toMillis(7)
) {
private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(7);
long roundFloor(long utcMillis) {
@ -73,7 +80,13 @@ public abstract class Rounding implements Writeable {
return extraLocalOffsetLookup;
}
},
YEAR_OF_CENTURY((byte) 2, ChronoField.YEAR_OF_ERA) {
YEAR_OF_CENTURY(
(byte) 2,
"year",
ChronoField.YEAR_OF_ERA,
false,
12
) {
private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(366);
long roundFloor(long utcMillis) {
@ -84,7 +97,13 @@ public abstract class Rounding implements Writeable {
return extraLocalOffsetLookup;
}
},
QUARTER_OF_YEAR((byte) 3, IsoFields.QUARTER_OF_YEAR) {
QUARTER_OF_YEAR(
(byte) 3,
"quarter",
IsoFields.QUARTER_OF_YEAR,
false,
3
) {
private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(92);
long roundFloor(long utcMillis) {
@ -95,7 +114,13 @@ public abstract class Rounding implements Writeable {
return extraLocalOffsetLookup;
}
},
MONTH_OF_YEAR((byte) 4, ChronoField.MONTH_OF_YEAR) {
MONTH_OF_YEAR(
(byte) 4,
"month",
ChronoField.MONTH_OF_YEAR,
false,
1
) {
private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(31);
long roundFloor(long utcMillis) {
@ -106,53 +131,82 @@ public abstract class Rounding implements Writeable {
return extraLocalOffsetLookup;
}
},
DAY_OF_MONTH((byte) 5, ChronoField.DAY_OF_MONTH) {
final long unitMillis = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis();
DAY_OF_MONTH(
(byte) 5,
"day",
ChronoField.DAY_OF_MONTH,
true,
ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis()
) {
long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, unitMillis);
return DateUtils.roundFloor(utcMillis, this.ratio);
}
long extraLocalOffsetLookup() {
return unitMillis;
return ratio;
}
},
HOUR_OF_DAY((byte) 6, ChronoField.HOUR_OF_DAY) {
final long unitMillis = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis();
HOUR_OF_DAY(
(byte) 6,
"hour",
ChronoField.HOUR_OF_DAY,
true,
ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis()
) {
long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, unitMillis);
return DateUtils.roundFloor(utcMillis, ratio);
}
long extraLocalOffsetLookup() {
return unitMillis;
return ratio;
}
},
MINUTES_OF_HOUR((byte) 7, ChronoField.MINUTE_OF_HOUR) {
final long unitMillis = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis();
MINUTES_OF_HOUR(
(byte) 7,
"minute",
ChronoField.MINUTE_OF_HOUR,
true,
ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis()
) {
long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, unitMillis);
return DateUtils.roundFloor(utcMillis, ratio);
}
long extraLocalOffsetLookup() {
return unitMillis;
return ratio;
}
},
SECOND_OF_MINUTE((byte) 8, ChronoField.SECOND_OF_MINUTE) {
final long unitMillis = ChronoField.SECOND_OF_MINUTE.getBaseUnit().getDuration().toMillis();
SECOND_OF_MINUTE(
(byte) 8,
"second",
ChronoField.SECOND_OF_MINUTE,
true,
ChronoField.SECOND_OF_MINUTE.getBaseUnit().getDuration().toMillis()
) {
long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, unitMillis);
return DateUtils.roundFloor(utcMillis, ratio);
}
long extraLocalOffsetLookup() {
return unitMillis;
return ratio;
}
};
private final byte id;
private final TemporalField field;
private final boolean isMillisBased;
private final String shortName;
/**
* ratio to milliseconds if isMillisBased == true or to month otherwise
*/
protected final long ratio;
DateTimeUnit(byte id, TemporalField field) {
DateTimeUnit(byte id, String shortName, TemporalField field, boolean isMillisBased, long ratio) {
this.id = id;
this.shortName = shortName;
this.field = field;
this.isMillisBased = isMillisBased;
this.ratio = ratio;
}
/**
@ -168,7 +222,7 @@ public abstract class Rounding implements Writeable {
* When looking up {@link LocalTimeOffset} go this many milliseconds
* in the past from the minimum millis since epoch that we plan to
* look up so that we can see transitions that we might have rounded
* down beyond.
* down beyond.
*/
abstract long extraLocalOffsetLookup();
@ -180,6 +234,14 @@ public abstract class Rounding implements Writeable {
return field;
}
public static DateTimeUnit resolve(String name) {
return DateTimeUnit.valueOf(name.toUpperCase(Locale.ROOT));
}
public String shortName() {
return shortName;
}
public static DateTimeUnit resolve(byte id) {
switch (id) {
case 1: return WEEK_OF_WEEKYEAR;
@ -220,6 +282,11 @@ public abstract class Rounding implements Writeable {
* 3, {@code nextRoundValue(6) = 9}.
*/
long nextRoundingValue(long utcMillis);
/**
* Given the rounded value, returns the size between this value and the
* next rounded value in specified units if possible.
*/
double roundingSize(long utcMillis, DateTimeUnit timeUnit);
}
/**
* Prepare to round many times.
@ -324,7 +391,6 @@ public abstract class Rounding implements Writeable {
return this;
}
public Rounding build() {
Rounding rounding;
if (unit != null) {
@ -416,7 +482,7 @@ public abstract class Rounding implements Writeable {
/*
* Units that round to midnight can round down from two
* units worth of millis in the future to find the
* nextRoundingValue.
* nextRoundingValue.
*/
unitMillis = unit.field.getBaseUnit().getDuration().toMillis();
maxLookup += 2 * unitMillis;
@ -494,7 +560,24 @@ public abstract class Rounding implements Writeable {
return "Rounding[" + unit + " in " + timeZone + "]";
}
private class FixedToMidnightRounding implements Prepared {
private abstract class TimeUnitPreparedRounding implements Prepared {
@Override
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
if (timeUnit.isMillisBased == unit.isMillisBased) {
return (double) unit.ratio / timeUnit.ratio;
} else {
if (unit.isMillisBased == false) {
return (double) (nextRoundingValue(utcMillis) - utcMillis) / timeUnit.ratio;
} else {
throw new IllegalArgumentException("Cannot use month-based rate unit [" + timeUnit.shortName +
"] with non-month based calendar interval histogram [" + unit.shortName +
"] only week, day, hour, minute and second are supported for this histogram");
}
}
}
}
private class FixedToMidnightRounding extends TimeUnitPreparedRounding {
private final LocalTimeOffset offset;
FixedToMidnightRounding(LocalTimeOffset offset) {
@ -513,7 +596,7 @@ public abstract class Rounding implements Writeable {
}
}
private class FixedNotToMidnightRounding implements Prepared {
private class FixedNotToMidnightRounding extends TimeUnitPreparedRounding {
private final LocalTimeOffset offset;
private final long unitMillis;
@ -533,7 +616,7 @@ public abstract class Rounding implements Writeable {
}
}
private class ToMidnightRounding implements Prepared, LocalTimeOffset.Strategy {
private class ToMidnightRounding extends TimeUnitPreparedRounding implements LocalTimeOffset.Strategy {
private final LocalTimeOffset.Lookup lookup;
ToMidnightRounding(LocalTimeOffset.Lookup lookup) {
@ -614,7 +697,7 @@ public abstract class Rounding implements Writeable {
}
}
private class JavaTimeToMidnightRounding implements Prepared {
private class JavaTimeToMidnightRounding extends TimeUnitPreparedRounding {
@Override
public long round(long utcMillis) {
LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(utcMillis), timeZone);
@ -723,7 +806,7 @@ public abstract class Rounding implements Writeable {
}
}
private abstract class AbstractNotToMidnightRounding implements Prepared {
private abstract class AbstractNotToMidnightRounding extends TimeUnitPreparedRounding {
protected final long unitMillis;
AbstractNotToMidnightRounding(long unitMillis) {
@ -845,6 +928,19 @@ public abstract class Rounding implements Writeable {
}
}
private abstract class TimeIntervalPreparedRounding implements Prepared {
@Override
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
if (timeUnit.isMillisBased) {
return (double) interval / timeUnit.ratio;
} else {
throw new IllegalArgumentException("Cannot use month-based rate unit [" + timeUnit.shortName +
"] with fixed interval based histogram, only week, day, hour, minute and second are supported for " +
"this histogram");
}
}
}
/**
* Rounds to down inside of a time zone with an "effectively fixed"
* time zone. A time zone can be "effectively fixed" if:
@ -854,7 +950,7 @@ public abstract class Rounding implements Writeable {
* <li>It is fixed over the entire range of dates that will be rounded</li>
* </ul>
*/
private class FixedRounding implements Prepared {
private class FixedRounding extends TimeIntervalPreparedRounding {
private final LocalTimeOffset offset;
FixedRounding(LocalTimeOffset offset) {
@ -878,7 +974,7 @@ public abstract class Rounding implements Writeable {
* "effectively fixed". See {@link FixedRounding} for a description of
* "effectively fixed".
*/
private class VariableRounding implements Prepared, LocalTimeOffset.Strategy {
private class VariableRounding extends TimeIntervalPreparedRounding implements LocalTimeOffset.Strategy {
private final LocalTimeOffset.Lookup lookup;
VariableRounding(LocalTimeOffset.Lookup lookup) {
@ -933,7 +1029,7 @@ public abstract class Rounding implements Writeable {
* of dates with the same {@link Prepared} instance.</li>
* </ul>
*/
private class JavaTimeRounding implements Prepared {
private class JavaTimeRounding extends TimeIntervalPreparedRounding {
@Override
public long round(long utcMillis) {
final Instant utcInstant = Instant.ofEpochMilli(utcMillis);
@ -1049,6 +1145,11 @@ public abstract class Rounding implements Writeable {
public long nextRoundingValue(long utcMillis) {
return delegatePrepared.nextRoundingValue(utcMillis - offset) + offset;
}
@Override
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
return delegatePrepared.roundingSize(utcMillis, timeUnit);
}
};
}

View File

@ -50,7 +50,7 @@ import java.util.function.BiConsumer;
*
* @see Rounding
*/
class DateHistogramAggregator extends BucketsAggregator {
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
@ -182,4 +182,18 @@ class DateHistogramAggregator extends BucketsAggregator {
public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("total_buckets", bucketOrds.size());
}
/**
* Returns the size of the bucket in specified units.
*
* If unitSize is null, returns 1.0
*/
@Override
public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
if (unitSize != null) {
return preparedRounding.roundingSize(bucketOrds.get(bucket), unitSize);
} else {
return 1.0;
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.elasticsearch.common.Rounding;
/**
* An aggregator capable of reporting bucket sizes in milliseconds. Used by RateAggregator for calendar-based buckets.
*/
public interface SizedBucketAggregator {
double bucketSize(long bucket, Rounding.DateTimeUnit unit);
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.histogram;
import java.util.concurrent.TimeUnit;
/**
* An aggregator capable of reporting bucket sizes in milliseconds. Used by RateAggregator for calendar-based buckets.
*/
public interface SizedBucketAggregatorBuilder {
double calendarDivider(TimeUnit timeUnit);
}

View File

@ -60,6 +60,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
*/
public abstract long find(long owningBucketOrd, long value);
/**
* Returns the value currently associated with the bucket ordinal
*/
public abstract long get(long ordinal);
/**
* The number of collected buckets.
*/
@ -98,7 +103,7 @@ public abstract class LongKeyedBucketOrds implements Releasable {
long value();
/**
* An {@linkplain BucketOrdsEnum} that is empty.
* An {@linkplain BucketOrdsEnum} that is empty.
*/
BucketOrdsEnum EMPTY = new BucketOrdsEnum() {
@Override
@ -133,6 +138,12 @@ public abstract class LongKeyedBucketOrds implements Releasable {
return ords.find(value);
}
@Override
public long get(long ordinal) {
return ords.get(ordinal);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
assert owningBucketOrd == 0;
@ -205,6 +216,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
return ords.find(owningBucketOrd, value);
}
@Override
public long get(long ordinal) {
return ords.getKey2(ordinal);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
// TODO it'd be faster to count the number of buckets in a list of these ords rather than one at a time

View File

@ -46,6 +46,12 @@ public abstract class ValuesSourceAggregationBuilder<AB extends ValuesSourceAggr
public static <T> void declareFields(
AbstractObjectParser<? extends ValuesSourceAggregationBuilder<?>, T> objectParser,
boolean scriptable, boolean formattable, boolean timezoneAware) {
declareFields(objectParser, scriptable, formattable, timezoneAware, true);
}
public static <T> void declareFields(
AbstractObjectParser<? extends ValuesSourceAggregationBuilder<?>, T> objectParser,
boolean scriptable, boolean formattable, boolean timezoneAware, boolean fieldRequired) {
objectParser.declareField(ValuesSourceAggregationBuilder::field, XContentParser::text,
@ -72,10 +78,15 @@ public abstract class ValuesSourceAggregationBuilder<AB extends ValuesSourceAggr
objectParser.declareField(ValuesSourceAggregationBuilder::script,
(parser, context) -> Script.parse(parser),
Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
String[] fields = new String[]{ParseField.CommonFields.FIELD.getPreferredName(), Script.SCRIPT_PARSE_FIELD.getPreferredName()};
objectParser.declareRequiredFieldSet(fields);
if (fieldRequired) {
String[] fields = new String[]{ParseField.CommonFields.FIELD.getPreferredName(),
Script.SCRIPT_PARSE_FIELD.getPreferredName()};
objectParser.declareRequiredFieldSet(fields);
}
} else {
objectParser.declareRequiredFieldSet(ParseField.CommonFields.FIELD.getPreferredName());
if (fieldRequired) {
objectParser.declareRequiredFieldSet(ParseField.CommonFields.FIELD.getPreferredName());
}
}
if (timezoneAware) {

View File

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -831,6 +832,76 @@ public class RoundingTests extends ESTestCase {
assertThat(rounding.round(time("1982-11-10T02:51:22.662Z")), isDate(time("1982-03-23T05:00:00Z"), tz));
}
public void testFixedIntervalRoundingSize() {
Rounding unitRounding = Rounding.builder(TimeValue.timeValueHours(10)).build();
Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.SECOND_OF_MINUTE),
closeTo(36000.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MINUTES_OF_HOUR),
closeTo(600.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.HOUR_OF_DAY),
closeTo(10.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.DAY_OF_MONTH),
closeTo(10.0 / 24.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR),
closeTo(10.0 / 168.0, 0.000001));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MONTH_OF_YEAR));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [month] with fixed interval based histogram, " +
"only week, day, hour, minute and second are supported for this histogram"));
ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.QUARTER_OF_YEAR));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [quarter] with fixed interval based histogram, " +
"only week, day, hour, minute and second are supported for this histogram"));
ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.YEAR_OF_CENTURY));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [year] with fixed interval based histogram, " +
"only week, day, hour, minute and second are supported for this histogram"));
}
public void testMillisecondsBasedUnitCalendarRoundingSize() {
Rounding unitRounding = Rounding.builder(Rounding.DateTimeUnit.HOUR_OF_DAY).build();
Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.SECOND_OF_MINUTE),
closeTo(3600.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MINUTES_OF_HOUR), closeTo(60.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(1.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.DAY_OF_MONTH),
closeTo(1 / 24.0, 0.000001));
assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR),
closeTo(1 / 168.0, 0.000001));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MONTH_OF_YEAR));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [month] with non-month based calendar interval " +
"histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.QUARTER_OF_YEAR));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [quarter] with non-month based calendar interval " +
"histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
ex = expectThrows(IllegalArgumentException.class,
() -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.YEAR_OF_CENTURY));
assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [year] with non-month based calendar interval " +
"histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
}
public void testNonMillisecondsBasedUnitCalendarRoundingSize() {
Rounding unitRounding = Rounding.builder(Rounding.DateTimeUnit.QUARTER_OF_YEAR).build();
Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
long firstQuarter = prepared.round(time("2015-01-01T00:00:00.000Z"));
// Ratio based
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.MONTH_OF_YEAR), closeTo(3.0, 0.000001));
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.QUARTER_OF_YEAR), closeTo(1.0, 0.000001));
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.YEAR_OF_CENTURY), closeTo(0.25, 0.000001));
// Real interval based
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.SECOND_OF_MINUTE), closeTo(7776000.0, 0.000001));
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.MINUTES_OF_HOUR), closeTo(129600.0, 0.000001));
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(2160.0, 0.000001));
assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.DAY_OF_MONTH), closeTo(90.0, 0.000001));
long thirdQuarter = prepared.round(time("2015-07-01T00:00:00.000Z"));
assertThat(prepared.roundingSize(thirdQuarter, Rounding.DateTimeUnit.DAY_OF_MONTH), closeTo(92.0, 0.000001));
assertThat(prepared.roundingSize(thirdQuarter, Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(2208.0, 0.000001));
}
private void assertInterval(long rounded, long nextRoundingValue, Rounding rounding, int minutes,
ZoneId tz) {
assertInterval(rounded, dateBetween(rounded, nextRoundingValue), nextRoundingValue, rounding, tz);

View File

@ -38,6 +38,8 @@ import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.elasticsearch.xpack.analytics.movingPercentiles.MovingPercentilesPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.rate.InternalRate;
import org.elasticsearch.xpack.analytics.rate.RateAggregationBuilder;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
@ -118,7 +120,13 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
TTestAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.T_TEST, checkLicense(TTestAggregationBuilder.PARSER)))
.addResultReader(InternalTTest::new)
.setAggregatorRegistrar(TTestAggregationBuilder::registerUsage)
.setAggregatorRegistrar(TTestAggregationBuilder::registerUsage),
new AggregationSpec(
RateAggregationBuilder.NAME,
RateAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.RATE, checkLicense(RateAggregationBuilder.PARSER)))
.addResultReader(InternalRate::new)
.setAggregatorRegistrar(RateAggregationBuilder::registerAggregators)
);
}

View File

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class InternalRate extends InternalNumericMetricsAggregation.SingleValue implements Rate {
final double sum;
final double divisor;
public InternalRate(String name, double sum, double divisor, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.sum = sum;
this.divisor = divisor;
this.format = formatter;
}
/**
* Read from a stream.
*/
public InternalRate(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
sum = in.readDouble();
divisor = in.readDouble();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDouble(sum);
out.writeDouble(divisor);
}
@Override
public String getWriteableName() {
return RateAggregationBuilder.NAME;
}
@Override
public double value() {
return sum / divisor;
}
@Override
public double getValue() {
return sum / divisor;
}
// for testing only
DocValueFormat format() {
return format;
}
@Override
public InternalRate reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
Double divisor = null;
for (InternalAggregation aggregation : aggregations) {
double value = ((InternalRate) aggregation).sum;
kahanSummation.add(value);
if (divisor == null) {
divisor = ((InternalRate) aggregation).divisor;
}
}
return new InternalRate(name, kahanSummation.value(), divisor, format, getMetadata());
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(CommonFields.VALUE.getPreferredName(), value());
if (format != DocValueFormat.RAW) {
builder.field(CommonFields.VALUE_AS_STRING.getPreferredName(), format.format(value()).toString());
}
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), sum, divisor);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
InternalRate that = (InternalRate) obj;
return Objects.equals(sum, that.sum) && Objects.equals(divisor, that.divisor);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
/**
* An aggregation that computes the rate of the values in the current bucket by adding all values in the bucket and dividing
* it by the size of the bucket.
*/
public interface Rate extends NumericMetricsAggregation.SingleValue {
/**
* The rate.
*/
double getValue();
}

View File

@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, RateAggregationBuilder> {
public static final String NAME = "rate";
public static final ParseField UNIT_FIELD = new ParseField("unit");
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
NAME,
RateAggregatorSupplier.class
);
public static final ObjectParser<RateAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(NAME, RateAggregationBuilder::new);
static {
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false, false);
PARSER.declareString(RateAggregationBuilder::rateUnit, UNIT_FIELD);
}
Rounding.DateTimeUnit rateUnit;
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
RateAggregatorFactory.registerAggregators(builder);
}
public RateAggregationBuilder(String name) {
super(name);
}
protected RateAggregationBuilder(
RateAggregationBuilder clone,
AggregatorFactories.Builder factoriesBuilder,
Map<String, Object> metadata
) {
super(clone, factoriesBuilder, metadata);
}
@Override
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
return new RateAggregationBuilder(this, factoriesBuilder, metadata);
}
/**
* Read from a stream.
*/
public RateAggregationBuilder(StreamInput in) throws IOException {
super(in);
byte b = in.readByte();
if (b > 0) {
rateUnit = Rounding.DateTimeUnit.resolve(b);
} else {
rateUnit = null;
}
}
@Override
protected ValuesSourceType defaultValueSourceType() {
return CoreValuesSourceType.NUMERIC;
}
@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
if (rateUnit != null) {
out.writeByte(rateUnit.getId());
} else {
out.writeByte((byte) 0);
}
}
@Override
protected ValuesSourceRegistry.RegistryKey<?> getRegistryKey() {
return REGISTRY_KEY;
}
@Override
protected RateAggregatorFactory innerBuild(
QueryShardContext queryShardContext,
ValuesSourceConfig config,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new RateAggregatorFactory(name, config, rateUnit, queryShardContext, parent, subFactoriesBuilder, metadata);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (rateUnit != null) {
builder.field(UNIT_FIELD.getPreferredName(), rateUnit.shortName());
}
return builder;
}
@Override
public String getType() {
return NAME;
}
public RateAggregationBuilder rateUnit(String rateUnit) {
return rateUnit(parse(rateUnit));
}
public RateAggregationBuilder rateUnit(Rounding.DateTimeUnit rateUnit) {
this.rateUnit = rateUnit;
return this;
}
static Rounding.DateTimeUnit parse(String rateUnit) {
Rounding.DateTimeUnit parsedRate = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(rateUnit);
if (parsedRate == null) {
throw new IllegalArgumentException("Unsupported unit " + rateUnit);
}
return parsedRate;
}
@Override
protected ValuesSourceConfig resolveConfig(QueryShardContext queryShardContext) {
if (field() == null && script() == null) {
return new ValuesSourceConfig(
CoreValuesSourceType.NUMERIC,
null,
true,
null,
null,
1.0,
null,
DocValueFormat.RAW,
queryShardContext::nowInMillis
);
} else {
return super.resolveConfig(queryShardContext);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RateAggregationBuilder that = (RateAggregationBuilder) o;
return rateUnit == that.rateUnit;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rateUnit);
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Map;
public class RateAggregator extends NumericMetricsAggregator.SingleValue {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
private final SizedBucketAggregator sizedBucketAggregator;
private DoubleArray sums;
private DoubleArray compensations;
public RateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
this.format = valuesSourceConfig.format();
if (valuesSource != null) {
sums = context.bigArrays().newDoubleArray(1, true);
compensations = context.bigArrays().newDoubleArray(1, true);
}
this.rateUnit = rateUnit;
this.sizedBucketAggregator = findSizedBucketAncestor();
}
private SizedBucketAggregator findSizedBucketAncestor() {
SizedBucketAggregator sizedBucketAggregator = null;
for (Aggregator ancestor = parent; ancestor != null; ancestor = ancestor.parent()) {
if (ancestor instanceof SizedBucketAggregator) {
sizedBucketAggregator = (SizedBucketAggregator) ancestor;
break;
}
}
if (sizedBucketAggregator == null) {
throw new IllegalArgumentException("The rate aggregation can only be used inside a date histogram");
}
return sizedBucketAggregator;
}
@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
}
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}
@Override
public double metric(long owningBucketOrd) {
if (sizedBucketAggregator == null || valuesSource == null || owningBucketOrd >= sums.size()) {
return 0.0;
}
return sums.get(owningBucketOrd) / sizedBucketAggregator.bucketSize(owningBucketOrd, rateUnit);
}
@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalRate(name, sums.get(bucket), sizedBucketAggregator.bucketSize(bucket, rateUnit), format, metadata());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalRate(name, 0.0, 1.0, format, metadata());
}
@Override
public void doClose() {
Releasables.close(sums, compensations);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
private final Rounding.DateTimeUnit rateUnit;
RateAggregatorFactory(
String name,
ValuesSourceConfig config,
Rounding.DateTimeUnit rateUnit,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
this.rateUnit = rateUnit;
}
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(
RateAggregationBuilder.REGISTRY_KEY,
Arrays.asList(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
RateAggregator::new,
true
);
}
@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new RateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
};
}
@Override
protected Aggregator doCreateInternal(
SearchContext searchContext,
Aggregator parent,
CardinalityUpperBound bucketCardinality,
Map<String, Object> metadata
) throws IOException {
return queryShardContext.getValuesSourceRegistry()
.getAggregator(RateAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, rateUnit, searchContext, parent, metadata);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Map;
public interface RateAggregatorSupplier {
Aggregator build(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException;
}

View File

@ -45,6 +45,7 @@ public class AnalyticsStatsActionNodeResponseTests extends AbstractWireSerializi
assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.NORMALIZE.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.RATE.ordinal(), equalTo(i++));
// Please add tests for newly added items here
assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i));
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class InternalRateTests extends InternalAggregationTestCase<InternalRate> {
@Override
protected SearchPlugin registerPlugin() {
return new AnalyticsPlugin(Settings.EMPTY);
}
@Override
protected InternalRate createTestInstance(String name, Map<String, Object> metadata) {
double sum = randomDouble();
double divider = randomDoubleBetween(0.0, 100000.0, false);
DocValueFormat formatter = randomNumericDocValueFormat();
return new InternalRate(name, sum, divider, formatter, metadata);
}
@Override
protected List<InternalRate> randomResultsToReduce(String name, int size) {
double divider = randomDoubleBetween(0.0, 100000.0, false);
List<InternalRate> inputs = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// Make sure the sum of all the counts doesn't wrap and type and tail parameters are consistent
DocValueFormat formatter = randomNumericDocValueFormat();
inputs.add(new InternalRate(name, randomDouble(), divider, formatter, null));
}
return inputs;
}
@Override
protected void assertReduced(InternalRate reduced, List<InternalRate> inputs) {
double expected = inputs.stream().mapToDouble(a -> a.sum).sum() / reduced.divisor;
assertEquals(expected, reduced.getValue(), 0.00001);
}
@Override
protected void assertFromXContent(InternalRate min, ParsedAggregation parsedAggregation) {
// There is no ParsedRate yet so we cannot test it here
}
@Override
protected InternalRate mutateInstance(InternalRate instance) {
String name = instance.getName();
double sum = instance.sum;
double divider = instance.divisor;
DocValueFormat formatter = instance.format();
Map<String, Object> metadata = instance.getMetadata();
switch (between(0, 3)) {
case 0:
name += randomAlphaOfLength(5);
break;
case 1:
sum = randomDouble();
break;
case 2:
divider = randomDouble();
break;
case 3:
if (metadata == null) {
metadata = new HashMap<>(1);
} else {
metadata = new HashMap<>(instance.getMetadata());
}
metadata.put(randomAlphaOfLength(15), randomInt());
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new InternalRate(name, sum, divider, formatter, metadata);
}
@Override
protected List<NamedXContentRegistry.Entry> getNamedXContents() {
List<NamedXContentRegistry.Entry> extendedNamedXContents = new ArrayList<>(super.getNamedXContents());
extendedNamedXContents.add(
new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(RateAggregationBuilder.NAME), (p, c) -> {
assumeTrue("There is no ParsedRate yet", false);
return null;
})
);
return extendedNamedXContents;
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.hasSize;
public class RateAggregationBuilderTests extends AbstractSerializingTestCase<RateAggregationBuilder> {
String aggregationName;
@Before
public void setupName() {
aggregationName = randomAlphaOfLength(10);
}
@Override
protected RateAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser);
assertThat(parsed.getAggregatorFactories(), hasSize(1));
assertThat(parsed.getPipelineAggregatorFactories(), hasSize(0));
RateAggregationBuilder agg = (RateAggregationBuilder) parsed.getAggregatorFactories().iterator().next();
assertNull(parser.nextToken());
assertNotNull(agg);
return agg;
}
@Override
protected RateAggregationBuilder createTestInstance() {
RateAggregationBuilder aggregationBuilder = new RateAggregationBuilder(aggregationName);
if (randomBoolean()) {
if (randomBoolean()) {
aggregationBuilder.field(randomAlphaOfLength(10));
} else {
aggregationBuilder.script(new Script(randomAlphaOfLength(10)));
}
}
if (randomBoolean()) {
aggregationBuilder.rateUnit(randomFrom(Rounding.DateTimeUnit.values()));
}
return aggregationBuilder;
}
@Override
protected Writeable.Reader<RateAggregationBuilder> instanceReader() {
return RateAggregationBuilder::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables());
}
@Override
protected NamedXContentRegistry xContentRegistry() {
List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
namedXContent.add(
new NamedXContentRegistry.Entry(
BaseAggregationBuilder.class,
new ParseField(RateAggregationBuilder.NAME),
(p, n) -> RateAggregationBuilder.PARSER.apply(p, (String) n)
)
);
namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
return new NamedXContentRegistry(namedXContent);
}
}

View File

@ -0,0 +1,445 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.rate;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.lookup.LeafDocLookup;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class RateAggregatorTests extends AggregatorTestCase {
/**
* Script to return the {@code _value} provided by aggs framework.
*/
public static final String ADD_ONE_SCRIPT = "add_one";
public static final String TERM_FILTERING = "term_filtering";
public static final String DATE_FIELD = "t";
@Override
protected ScriptService getMockScriptService() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
scripts.put(ADD_ONE_SCRIPT, vars -> {
LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("doc");
String fieldname = (String) vars.get("fieldname");
ScriptDocValues<?> scriptDocValues = leafDocLookup.get(fieldname);
return ((Number) scriptDocValues.get(0)).doubleValue() + 1.0;
});
scripts.put(TERM_FILTERING, vars -> {
LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("doc");
int term = (Integer) vars.get("term");
ScriptDocValues<?> termDocValues = leafDocLookup.get("term");
int currentTerm = ((Number) termDocValues.get(0)).intValue();
if (currentTerm == term) {
return ((Number) leafDocLookup.get("field").get(0)).doubleValue();
}
return null;
});
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, scripts, Collections.emptyMap());
Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("wrong_val", 102)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("wrong_val", 103)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("wrong_val", 103)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(dh.getBuckets().get(0).getAggregations().asList(), hasSize(1));
assertThat(dh.getBuckets().get(0).getAggregations().asList().get(0), instanceOf(InternalRate.class));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(0.0, 0.000001));
assertThat(dh.getBuckets().get(1).getAggregations().asList(), hasSize(1));
assertThat(dh.getBuckets().get(1).getAggregations().asList().get(0), instanceOf(InternalRate.class));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(0.0, 0.000001));
});
}
public void testSortedNumericDocValuesMonthToMonth() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
iw.addDocument(
doc("2010-03-12T01:07:45", new SortedNumericDocValuesField("val", 1), new SortedNumericDocValuesField("val", 2))
);
iw.addDocument(doc("2010-04-01T03:43:34", new SortedNumericDocValuesField("val", 3)));
iw.addDocument(
doc("2010-04-27T03:43:34", new SortedNumericDocValuesField("val", 4), new SortedNumericDocValuesField("val", 5))
);
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(12.0, 0.000001));
});
}
public void testDocValuesMonthToMonth() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
});
}
public void testDocValuesMonthToMonthDefaultRate() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, null, "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
});
}
public void testDocValuesYearToMonth() throws IOException {
testCase(new MatchAllDocsQuery(), "year", true, "month", "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(1));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
});
}
public void testDocValuesMonthToYear() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "year", "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(12.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(132.0, 0.000001));
});
}
public void testDocValues50DaysToDays() throws IOException {
testCase(new MatchAllDocsQuery(), "50d", false, "day", "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(0.02, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(0.22, 0.000001));
});
}
public void testIncompatibleCalendarRate() {
String interval = randomFrom("second", "minute", "hour", "day", "week", "1s", "1m", "1h", "1d", "1w");
String rate = randomFrom("month", "quarter", "year", "1M", "1q", "1y");
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> testCase(new MatchAllDocsQuery(), interval, true, rate, "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
}, dh -> { fail("Shouldn't be here"); })
);
assertEquals(
"Cannot use month-based rate unit ["
+ RateAggregationBuilder.parse(rate).shortName()
+ "] with non-month based calendar interval histogram ["
+ RateAggregationBuilder.parse(interval).shortName()
+ "] only week, day, hour, minute and second are supported for this histogram",
ex.getMessage()
);
}
public void testIncompatibleIntervalRate() {
String interval = randomFrom("1s", "2m", "4h", "5d");
String rate = randomFrom("month", "quarter", "year", "1M", "1q", "1y");
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> testCase(new MatchAllDocsQuery(), interval, false, rate, "val", iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
}, dh -> { fail("Shouldn't be here"); })
);
assertEquals(
"Cannot use month-based rate unit ["
+ RateAggregationBuilder.parse(rate).shortName()
+ "] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram",
ex.getMessage()
);
}
public void testNoFieldMonthToDay() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "day", null, iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1 / 31.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(2 / 30.0, 0.000001));
});
}
public void testNoWrapping() throws IOException {
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
MappedFieldType dateType = dateFieldType(DATE_FIELD);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("day");
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> testCase(rateAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, h -> { fail("Shouldn't be here"); }, dateType, numType)
);
assertEquals("The rate aggregation can only be used inside a date histogram", ex.getMessage());
}
public void testDoubleWrapping() throws IOException {
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
MappedFieldType dateType = dateFieldType(DATE_FIELD);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(rateAggregationBuilder);
DateHistogramAggregationBuilder topDateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date");
topDateHistogramAggregationBuilder.field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("year"))
.subAggregation(dateHistogramAggregationBuilder);
testCase(topDateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(doc("2009-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, (Consumer<InternalDateHistogram>) tdh -> {
assertThat(tdh.getBuckets(), hasSize(2));
InternalDateHistogram dh1 = (InternalDateHistogram) tdh.getBuckets().get(0).getAggregations().asList().get(0);
assertThat(dh1.getBuckets(), hasSize(1));
assertThat(((InternalRate) dh1.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
InternalDateHistogram dh2 = (InternalDateHistogram) tdh.getBuckets().get(1).getAggregations().asList().get(0);
assertThat(dh2.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh2.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(2.0, 0.000001));
assertThat(((InternalRate) dh2.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
}, dateType, numType);
}
public void testKeywordSandwich() throws IOException {
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
MappedFieldType dateType = dateFieldType(DATE_FIELD);
MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
TermsAggregationBuilder termsAggregationBuilder = new TermsAggregationBuilder("my_term").field("term")
.subAggregation(rateAggregationBuilder);
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(termsAggregationBuilder);
testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(
doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1), new SortedSetDocValuesField("term", new BytesRef("a")))
);
iw.addDocument(
doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2), new SortedSetDocValuesField("term", new BytesRef("a")))
);
iw.addDocument(
doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3), new SortedSetDocValuesField("term", new BytesRef("a")))
);
iw.addDocument(
doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4), new SortedSetDocValuesField("term", new BytesRef("b")))
);
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(2));
StringTerms st1 = (StringTerms) dh.getBuckets().get(0).getAggregations().asList().get(0);
assertThat(st1.getBuckets(), hasSize(1));
assertThat(((InternalRate) st1.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
StringTerms st2 = (StringTerms) dh.getBuckets().get(1).getAggregations().asList().get(0);
assertThat(st2.getBuckets(), hasSize(2));
assertThat(((InternalRate) st2.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
assertThat(((InternalRate) st2.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(4.0, 0.000001));
}, dateType, numType, keywordType);
}
public void testScriptMonthToDay() throws IOException {
testCase(
new MatchAllDocsQuery(),
"month",
true,
"day",
new Script(ScriptType.INLINE, MockScriptEngine.NAME, ADD_ONE_SCRIPT, Collections.singletonMap("fieldname", "val")),
iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
},
dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(2 / 31.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(9 / 30.0, 0.000001));
}
);
}
public void testFilter() throws IOException {
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
MappedFieldType dateType = dateFieldType(DATE_FIELD);
MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(rateAggregationBuilder);
testCase(dateHistogramAggregationBuilder, new TermQuery(new Term("term", "a")), iw -> {
iw.addDocument(doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1), new StringField("term", "a", Field.Store.NO)));
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2), new StringField("term", "a", Field.Store.NO)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3), new StringField("term", "a", Field.Store.NO)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4), new StringField("term", "b", Field.Store.NO)));
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
}, dateType, numType, keywordType);
}
public void testFormatter() throws IOException {
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
MappedFieldType dateType = dateFieldType(DATE_FIELD);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month")
.field("val")
.format("00.0/M");
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(rateAggregationBuilder);
testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1)));
iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2)));
iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValueAsString(), equalTo("03.0/M"));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).getValueAsString(), equalTo("07.0/M"));
}, dateType, numType);
}
private void testCase(
Query query,
String interval,
boolean isCalendar,
String unit,
Object field,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalDateHistogram> verify
) throws IOException {
MappedFieldType dateType = dateFieldType(DATE_FIELD);
MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate");
if (unit != null) {
rateAggregationBuilder.rateUnit(unit);
}
if (field != null) {
if (field instanceof Script) {
rateAggregationBuilder.script((Script) field);
} else {
rateAggregationBuilder.field((String) field);
}
}
DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date");
dateHistogramAggregationBuilder.field(DATE_FIELD);
if (isCalendar) {
dateHistogramAggregationBuilder.calendarInterval(new DateHistogramInterval(interval));
} else {
dateHistogramAggregationBuilder.fixedInterval(new DateHistogramInterval(interval));
}
dateHistogramAggregationBuilder.subAggregation(rateAggregationBuilder);
testCase(dateHistogramAggregationBuilder, query, buildIndex, verify, dateType, numType);
}
@Override
protected List<SearchPlugin> getSearchPlugins() {
return Collections.singletonList(new AnalyticsPlugin(Settings.EMPTY));
}
private DateFieldMapper.DateFieldType dateFieldType(String name) {
return new DateFieldMapper.DateFieldType(
name,
true,
true,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
DateFieldMapper.Resolution.MILLISECONDS,
Collections.emptyMap()
);
}
private Iterable<IndexableField> doc(String date, IndexableField... fields) {
List<IndexableField> indexableFields = new ArrayList<>();
long instant = dateFieldType(DATE_FIELD).parse(date);
indexableFields.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
indexableFields.addAll(Arrays.asList(fields));
return indexableFields;
}
}

View File

@ -45,7 +45,8 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
TOP_METRICS,
T_TEST,
MOVING_PERCENTILES,
NORMALIZE;
NORMALIZE,
RATE;
}
public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {

View File

@ -0,0 +1,36 @@
---
setup:
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"timestamp": "2020-02-03T10:00:00Z", "val": 3}'
- '{"index": {}}'
- '{"timestamp": "2020-02-04T10:00:00Z", "val": 4}'
- '{"index": {}}'
- '{"timestamp": "2020-02-11T10:00:00Z", "val": 6}'
- '{"index": {}}'
- '{"timestamp": "2020-02-12T10:00:00Z", "val": 8}'
---
"value rate":
- do:
search:
size: 0
index: "test"
body:
aggs:
by_date:
date_histogram:
field: timestamp
calendar_interval: week
aggs:
rate:
rate:
field: val
unit: day
- length: { aggregations.by_date.buckets: 2 }
- match: { aggregations.by_date.buckets.0.rate.value: 1.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 2.0 }

View File

@ -28,6 +28,7 @@ setup:
- set: {analytics.stats.string_stats_usage: string_stats_usage}
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
- set: { analytics.stats.normalize_usage: normalize_usage }
- set: { analytics.stats.rate_usage: rate_usage }
# use boxplot agg
- do:
@ -54,6 +55,7 @@ setup:
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
# use top_metrics agg
- do:
@ -83,6 +85,7 @@ setup:
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
# use cumulative_cardinality agg
- do:
@ -116,6 +119,7 @@ setup:
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
# use t-test agg
- do:
@ -143,6 +147,7 @@ setup:
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
- do:
search:
@ -166,6 +171,7 @@ setup:
- set: {analytics.stats.string_stats_usage: string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
# use moving_percentile agg
- do:
@ -200,6 +206,7 @@ setup:
- gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage }
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- match: { analytics.stats.rate_usage: $rate_usage }
# use normalize agg
- do:
@ -234,3 +241,35 @@ setup:
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- gt: { analytics.stats.normalize_usage: $normalize_usage }
- set: {analytics.stats.normalize_usage: normalize_usage}
- match: { analytics.stats.rate_usage: $rate_usage }
# use rate agg
- do:
search:
index: "test"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
avg_users:
rate:
field: "s"
unit: "hour"
- length: { aggregations.histo.buckets: 1 }
- do: {xpack.usage: {}}
- match: { analytics.available: true }
- match: { analytics.enabled: true }
- match: {analytics.stats.boxplot_usage: $boxplot_usage}
- match: {analytics.stats.top_metrics_usage: $top_metrics_usage}
- match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage}
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- gt: { analytics.stats.rate_usage: $rate_usage }
- set: {analytics.stats.rate_usage: rate_usage}

View File

@ -79,7 +79,8 @@ public final class TransformAggregations {
"top_hits",
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
"t_test", // https://github.com/elastic/elasticsearch/issues/54503,
"variable_width_histogram" // https://github.com/elastic/elasticsearch/issues/58140
"variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
"rate" // https://github.com/elastic/elasticsearch/issues/61351
);
private TransformAggregations() {}