mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
[Rollup] Only allow aggregating on multiples of configured interval (#32052)
We need to limit the search request aggregations to whole multiples of the configured interval for both histogram and date_histogram. Otherwise, agg buckets won't overlap with the rolled up buckets and the results will be incorrect. For histogram, the validation is very simple: request must be >= the config, and modulo evenly. Dates are more tricky. - If both request and config are fixed dates, we can convert to millis and treat them just like the histo - If both are calendar, we make sure the request is >= the config with a static lookup map that ranks the calendar values relatively. All calendar units are "singles", so they are evenly divisible already - We disallow any other combination (one fixed, one calendar, etc)
This commit is contained in:
parent
13880bd8c1
commit
d93b2a2e9a
@ -685,9 +685,8 @@ setups['sensor_prefab_data'] = '''
|
|||||||
page_size: 1000
|
page_size: 1000
|
||||||
groups:
|
groups:
|
||||||
date_histogram:
|
date_histogram:
|
||||||
delay: "7d"
|
|
||||||
field: "timestamp"
|
field: "timestamp"
|
||||||
interval: "1h"
|
interval: "7d"
|
||||||
time_zone: "UTC"
|
time_zone: "UTC"
|
||||||
terms:
|
terms:
|
||||||
fields:
|
fields:
|
||||||
|
@ -43,6 +43,8 @@ started with the <<rollup-start-job,Start Job API>>.
|
|||||||
`metrics`::
|
`metrics`::
|
||||||
(object) Defines the metrics that should be collected for each grouping tuple. See <<rollup-job-config,rollup job config>>.
|
(object) Defines the metrics that should be collected for each grouping tuple. See <<rollup-job-config,rollup job config>>.
|
||||||
|
|
||||||
|
For more details about the job configuration, see <<rollup-job-config>>.
|
||||||
|
|
||||||
==== Authorization
|
==== Authorization
|
||||||
|
|
||||||
You must have `manage` or `manage_rollup` cluster privileges to use this API.
|
You must have `manage` or `manage_rollup` cluster privileges to use this API.
|
||||||
|
@ -23,7 +23,7 @@ PUT _xpack/rollup/job/sensor
|
|||||||
"groups" : {
|
"groups" : {
|
||||||
"date_histogram": {
|
"date_histogram": {
|
||||||
"field": "timestamp",
|
"field": "timestamp",
|
||||||
"interval": "1h",
|
"interval": "60m",
|
||||||
"delay": "7d"
|
"delay": "7d"
|
||||||
},
|
},
|
||||||
"terms": {
|
"terms": {
|
||||||
@ -99,7 +99,7 @@ fields will then be available later for aggregating into buckets. For example,
|
|||||||
"groups" : {
|
"groups" : {
|
||||||
"date_histogram": {
|
"date_histogram": {
|
||||||
"field": "timestamp",
|
"field": "timestamp",
|
||||||
"interval": "1h",
|
"interval": "60m",
|
||||||
"delay": "7d"
|
"delay": "7d"
|
||||||
},
|
},
|
||||||
"terms": {
|
"terms": {
|
||||||
@ -133,9 +133,9 @@ The `date_histogram` group has several parameters:
|
|||||||
The date field that is to be rolled up.
|
The date field that is to be rolled up.
|
||||||
|
|
||||||
`interval` (required)::
|
`interval` (required)::
|
||||||
The interval of time buckets to be generated when rolling up. E.g. `"1h"` will produce hourly rollups. This follows standard time formatting
|
The interval of time buckets to be generated when rolling up. E.g. `"60m"` will produce 60 minute (hourly) rollups. This follows standard time formatting
|
||||||
syntax as used elsewhere in Elasticsearch. The `interval` defines the _minimum_ interval that can be aggregated only. If hourly (`"1h"`)
|
syntax as used elsewhere in Elasticsearch. The `interval` defines the _minimum_ interval that can be aggregated only. If hourly (`"60m"`)
|
||||||
intervals are configured, <<rollup-search,Rollup Search>> can execute aggregations with 1hr or greater (weekly, monthly, etc) intervals.
|
intervals are configured, <<rollup-search,Rollup Search>> can execute aggregations with 60m or greater (weekly, monthly, etc) intervals.
|
||||||
So define the interval as the smallest unit that you wish to later query.
|
So define the interval as the smallest unit that you wish to later query.
|
||||||
|
|
||||||
Note: smaller, more granular intervals take up proportionally more space.
|
Note: smaller, more granular intervals take up proportionally more space.
|
||||||
@ -154,6 +154,46 @@ The `date_histogram` group has several parameters:
|
|||||||
to be stored with a specific timezone. By default, rollup documents are stored in `UTC`, but this can be changed with the `time_zone`
|
to be stored with a specific timezone. By default, rollup documents are stored in `UTC`, but this can be changed with the `time_zone`
|
||||||
parameter.
|
parameter.
|
||||||
|
|
||||||
|
.Calendar vs Fixed time intervals
|
||||||
|
**********************************
|
||||||
|
Elasticsearch understands both "calendar" and "fixed" time intervals. Fixed time intervals are fairly easy to understand;
|
||||||
|
`"60s"` means sixty seconds. But what does `"1M` mean? One month of time depends on which month we are talking about,
|
||||||
|
some months are longer or shorter than others. This is an example of "calendar" time, and the duration of that unit
|
||||||
|
depends on context. Calendar units are also affected by leap-seconds, leap-years, etc.
|
||||||
|
|
||||||
|
This is important because the buckets generated by Rollup will be in either calendar or fixed intervals, and will limit
|
||||||
|
how you can query them later (see <<rollup-search-limitations-intervals, Requests must be multiples of the config>>.
|
||||||
|
|
||||||
|
We recommend sticking with "fixed" time intervals, since they are easier to understand and are more flexible at query
|
||||||
|
time. It will introduce some drift in your data during leap-events, and you will have to think about months in a fixed
|
||||||
|
quantity (30 days) instead of the actual calendar length... but it is often easier than dealing with calendar units
|
||||||
|
at query time.
|
||||||
|
|
||||||
|
Multiples of units are always "fixed" (e.g. `"2h"` is always the fixed quantity `7200` seconds. Single units can be
|
||||||
|
fixed or calendar depending on the unit:
|
||||||
|
|
||||||
|
[options="header"]
|
||||||
|
|=======
|
||||||
|
|Unit |Calendar |Fixed
|
||||||
|
|millisecond |NA |`1ms`, `10ms`, etc
|
||||||
|
|second |NA |`1s`, `10s`, etc
|
||||||
|
|minute |`1m` |`2m`, `10m`, etc
|
||||||
|
|hour |`1h` |`2h`, `10h`, etc
|
||||||
|
|day |`1d` |`2d`, `10d`, etc
|
||||||
|
|week |`1w` |NA
|
||||||
|
|month |`1M` |NA
|
||||||
|
|quarter |`1q` |NA
|
||||||
|
|year |`1y` |NA
|
||||||
|
|=======
|
||||||
|
|
||||||
|
For some units where there are both fixed and calendar, you may need to express the quantity in terms of the next
|
||||||
|
smaller unit. For example, if you want a fixed day (not a calendar day), you should specify `24h` instead of `1d`.
|
||||||
|
Similarly, if you want fixed hours, specify `60m` instead of `1h`. This is because the single quantity entails
|
||||||
|
calendar time, and limits you to querying by calendar time in the future.
|
||||||
|
|
||||||
|
|
||||||
|
**********************************
|
||||||
|
|
||||||
===== Terms
|
===== Terms
|
||||||
|
|
||||||
The `terms` group can be used on `keyword` or numeric fields, to allow bucketing via the `terms` aggregation at a later point. The `terms`
|
The `terms` group can be used on `keyword` or numeric fields, to allow bucketing via the `terms` aggregation at a later point. The `terms`
|
||||||
|
@ -37,8 +37,7 @@ PUT _xpack/rollup/job/sensor
|
|||||||
"groups" : {
|
"groups" : {
|
||||||
"date_histogram": {
|
"date_histogram": {
|
||||||
"field": "timestamp",
|
"field": "timestamp",
|
||||||
"interval": "1h",
|
"interval": "60m"
|
||||||
"delay": "7d"
|
|
||||||
},
|
},
|
||||||
"terms": {
|
"terms": {
|
||||||
"fields": ["node"]
|
"fields": ["node"]
|
||||||
@ -66,7 +65,7 @@ The `cron` parameter controls when and how often the job activates. When a roll
|
|||||||
from where it left off after the last activation. So if you configure the cron to run every 30 seconds, the job will process the last 30
|
from where it left off after the last activation. So if you configure the cron to run every 30 seconds, the job will process the last 30
|
||||||
seconds worth of data that was indexed into the `sensor-*` indices.
|
seconds worth of data that was indexed into the `sensor-*` indices.
|
||||||
|
|
||||||
If instead the cron was configured to run once a day at midnight, the job would process the last 24hours worth of data. The choice is largely
|
If instead the cron was configured to run once a day at midnight, the job would process the last 24 hours worth of data. The choice is largely
|
||||||
preference, based on how "realtime" you want the rollups, and if you wish to process continuously or move it to off-peak hours.
|
preference, based on how "realtime" you want the rollups, and if you wish to process continuously or move it to off-peak hours.
|
||||||
|
|
||||||
Next, we define a set of `groups` and `metrics`. The metrics are fairly straightforward: we want to save the min/max/sum of the `temperature`
|
Next, we define a set of `groups` and `metrics`. The metrics are fairly straightforward: we want to save the min/max/sum of the `temperature`
|
||||||
@ -79,7 +78,7 @@ It also allows us to run terms aggregations on the `node` field.
|
|||||||
.Date histogram interval vs cron schedule
|
.Date histogram interval vs cron schedule
|
||||||
**********************************
|
**********************************
|
||||||
You'll note that the job's cron is configured to run every 30 seconds, but the date_histogram is configured to
|
You'll note that the job's cron is configured to run every 30 seconds, but the date_histogram is configured to
|
||||||
rollup at hourly intervals. How do these relate?
|
rollup at 60 minute intervals. How do these relate?
|
||||||
|
|
||||||
The date_histogram controls the granularity of the saved data. Data will be rolled up into hourly intervals, and you will be unable
|
The date_histogram controls the granularity of the saved data. Data will be rolled up into hourly intervals, and you will be unable
|
||||||
to query with finer granularity. The cron simply controls when the process looks for new data to rollup. Every 30 seconds it will see
|
to query with finer granularity. The cron simply controls when the process looks for new data to rollup. Every 30 seconds it will see
|
||||||
@ -223,70 +222,71 @@ Which returns a corresponding response:
|
|||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
{
|
{
|
||||||
"took" : 93,
|
"took" : 93,
|
||||||
"timed_out" : false,
|
"timed_out" : false,
|
||||||
"terminated_early" : false,
|
"terminated_early" : false,
|
||||||
"_shards" : ... ,
|
"_shards" : ... ,
|
||||||
"hits" : {
|
"hits" : {
|
||||||
"total" : 0,
|
"total" : 0,
|
||||||
"max_score" : 0.0,
|
"max_score" : 0.0,
|
||||||
"hits" : [ ]
|
"hits" : [ ]
|
||||||
},
|
},
|
||||||
"aggregations" : {
|
"aggregations" : {
|
||||||
"timeline" : {
|
"timeline" : {
|
||||||
"meta" : { },
|
"meta" : { },
|
||||||
"buckets" : [
|
"buckets" : [
|
||||||
{
|
{
|
||||||
"key_as_string" : "2018-01-18T00:00:00.000Z",
|
"key_as_string" : "2018-01-18T00:00:00.000Z",
|
||||||
"key" : 1516233600000,
|
"key" : 1516233600000,
|
||||||
"doc_count" : 6,
|
"doc_count" : 6,
|
||||||
"nodes" : {
|
"nodes" : {
|
||||||
"doc_count_error_upper_bound" : 0,
|
"doc_count_error_upper_bound" : 0,
|
||||||
"sum_other_doc_count" : 0,
|
"sum_other_doc_count" : 0,
|
||||||
"buckets" : [
|
"buckets" : [
|
||||||
{
|
{
|
||||||
"key" : "a",
|
"key" : "a",
|
||||||
"doc_count" : 2,
|
"doc_count" : 2,
|
||||||
"max_temperature" : {
|
"max_temperature" : {
|
||||||
"value" : 202.0
|
"value" : 202.0
|
||||||
},
|
},
|
||||||
"avg_voltage" : {
|
"avg_voltage" : {
|
||||||
"value" : 5.1499998569488525
|
"value" : 5.1499998569488525
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"key" : "b",
|
"key" : "b",
|
||||||
"doc_count" : 2,
|
"doc_count" : 2,
|
||||||
"max_temperature" : {
|
"max_temperature" : {
|
||||||
"value" : 201.0
|
"value" : 201.0
|
||||||
},
|
},
|
||||||
"avg_voltage" : {
|
"avg_voltage" : {
|
||||||
"value" : 5.700000047683716
|
"value" : 5.700000047683716
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"key" : "c",
|
"key" : "c",
|
||||||
"doc_count" : 2,
|
"doc_count" : 2,
|
||||||
"max_temperature" : {
|
"max_temperature" : {
|
||||||
"value" : 202.0
|
"value" : 202.0
|
||||||
},
|
},
|
||||||
"avg_voltage" : {
|
"avg_voltage" : {
|
||||||
"value" : 4.099999904632568
|
"value" : 4.099999904632568
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
----
|
----
|
||||||
// TESTRESPONSE[s/"took" : 93/"took" : $body.$_path/]
|
// TESTRESPONSE[s/"took" : 93/"took" : $body.$_path/]
|
||||||
// TESTRESPONSE[s/"_shards" : \.\.\. /"_shards" : $body.$_path/]
|
// TESTRESPONSE[s/"_shards" : \.\.\. /"_shards" : $body.$_path/]
|
||||||
|
|
||||||
In addition to being more complicated (date histogram and a terms aggregation, plus an additional average metric), you'll notice
|
In addition to being more complicated (date histogram and a terms aggregation, plus an additional average metric), you'll notice
|
||||||
the date_histogram uses a `7d` interval instead of `1h`.
|
the date_histogram uses a `7d` interval instead of `60m`.
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
=== Conclusion
|
=== Conclusion
|
||||||
|
@ -80,9 +80,25 @@ The response will tell you that the field and aggregation were not possible, bec
|
|||||||
[float]
|
[float]
|
||||||
=== Interval Granularity
|
=== Interval Granularity
|
||||||
|
|
||||||
Rollups are stored at a certain granularity, as defined by the `date_histogram` group in the configuration. If data is rolled up at hourly
|
Rollups are stored at a certain granularity, as defined by the `date_histogram` group in the configuration. This means you
|
||||||
intervals, the <<rollup-search>> API can aggregate on any time interval hourly or greater. Intervals that are less than an hour will throw
|
can only search/aggregate the rollup data with an interval that is greater-than or equal to the configured rollup interval.
|
||||||
an exception, since the data simply doesn't exist for finer granularities.
|
|
||||||
|
For example, if data is rolled up at hourly intervals, the <<rollup-search>> API can aggregate on any time interval
|
||||||
|
hourly or greater. Intervals that are less than an hour will throw an exception, since the data simply doesn't
|
||||||
|
exist for finer granularities.
|
||||||
|
|
||||||
|
[[rollup-search-limitations-intervals]]
|
||||||
|
.Requests must be multiples of the config
|
||||||
|
**********************************
|
||||||
|
Perhaps not immediately apparent, but the interval specified in an aggregation request must be a whole
|
||||||
|
multiple of the configured interval. If the job was configured to rollup on `3d` intervals, you can only
|
||||||
|
query and aggregate on multiples of three (`3d`, `6d`, `9d`, etc).
|
||||||
|
|
||||||
|
A non-multiple wouldn't work, since the rolled up data wouldn't cleanly "overlap" with the buckets generated
|
||||||
|
by the aggregation, leading to incorrect results.
|
||||||
|
|
||||||
|
For that reason, an error is thrown if a whole multiple of the configured interval isn't found.
|
||||||
|
**********************************
|
||||||
|
|
||||||
Because the RollupSearch endpoint can "upsample" intervals, there is no need to configure jobs with multiple intervals (hourly, daily, etc).
|
Because the RollupSearch endpoint can "upsample" intervals, there is no need to configure jobs with multiple intervals (hourly, daily, etc).
|
||||||
It's recommended to just configure a single job with the smallest granularity that is needed, and allow the search endpoint to upsample
|
It's recommended to just configure a single job with the smallest granularity that is needed, and allow the search endpoint to upsample
|
||||||
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.rollup;
|
|||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
|
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
|
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
|
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
|
||||||
@ -17,7 +18,9 @@ import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
|
|||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -32,6 +35,29 @@ public class RollupJobIdentifierUtils {
|
|||||||
|
|
||||||
private static final Comparator<RollupJobCaps> COMPARATOR = RollupJobIdentifierUtils.getComparator();
|
private static final Comparator<RollupJobCaps> COMPARATOR = RollupJobIdentifierUtils.getComparator();
|
||||||
|
|
||||||
|
public static final Map<String, Integer> CALENDAR_ORDERING;
|
||||||
|
|
||||||
|
static {
|
||||||
|
Map<String, Integer> dateFieldUnits = new HashMap<>(16);
|
||||||
|
dateFieldUnits.put("year", 8);
|
||||||
|
dateFieldUnits.put("1y", 8);
|
||||||
|
dateFieldUnits.put("quarter", 7);
|
||||||
|
dateFieldUnits.put("1q", 7);
|
||||||
|
dateFieldUnits.put("month", 6);
|
||||||
|
dateFieldUnits.put("1M", 6);
|
||||||
|
dateFieldUnits.put("week", 5);
|
||||||
|
dateFieldUnits.put("1w", 5);
|
||||||
|
dateFieldUnits.put("day", 4);
|
||||||
|
dateFieldUnits.put("1d", 4);
|
||||||
|
dateFieldUnits.put("hour", 3);
|
||||||
|
dateFieldUnits.put("1h", 3);
|
||||||
|
dateFieldUnits.put("minute", 2);
|
||||||
|
dateFieldUnits.put("1m", 2);
|
||||||
|
dateFieldUnits.put("second", 1);
|
||||||
|
dateFieldUnits.put("1s", 1);
|
||||||
|
CALENDAR_ORDERING = Collections.unmodifiableMap(dateFieldUnits);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the aggregation tree and a list of available job capabilities, this method will return a set
|
* Given the aggregation tree and a list of available job capabilities, this method will return a set
|
||||||
* of the "best" jobs that should be searched.
|
* of the "best" jobs that should be searched.
|
||||||
@ -93,8 +119,9 @@ public class RollupJobIdentifierUtils {
|
|||||||
if (fieldCaps != null) {
|
if (fieldCaps != null) {
|
||||||
for (Map<String, Object> agg : fieldCaps.getAggs()) {
|
for (Map<String, Object> agg : fieldCaps.getAggs()) {
|
||||||
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
|
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
|
||||||
TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval");
|
DateHistogramInterval interval = new DateHistogramInterval((String)agg.get(RollupField.INTERVAL));
|
||||||
String thisTimezone = (String) agg.get(DateHistogramGroupConfig.TIME_ZONE);
|
|
||||||
|
String thisTimezone = (String)agg.get(DateHistogramGroupConfig.TIME_ZONE);
|
||||||
String sourceTimeZone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString();
|
String sourceTimeZone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString();
|
||||||
|
|
||||||
// Ensure we are working on the same timezone
|
// Ensure we are working on the same timezone
|
||||||
@ -102,17 +129,20 @@ public class RollupJobIdentifierUtils {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (source.dateHistogramInterval() != null) {
|
if (source.dateHistogramInterval() != null) {
|
||||||
TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(),
|
// Check if both are calendar and validate if they are.
|
||||||
"source.date_histogram.interval");
|
// If not, check if both are fixed and validate
|
||||||
//TODO should be divisor of interval
|
if (validateCalendarInterval(source.dateHistogramInterval(), interval)) {
|
||||||
if (interval.compareTo(sourceInterval) <= 0) {
|
localCaps.add(cap);
|
||||||
|
} else if (validateFixedInterval(source.dateHistogramInterval(), interval)) {
|
||||||
localCaps.add(cap);
|
localCaps.add(cap);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (interval.getMillis() <= source.interval()) {
|
// check if config is fixed and validate if it is
|
||||||
|
if (validateFixedInterval(source.interval(), interval)) {
|
||||||
localCaps.add(cap);
|
localCaps.add(cap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// not a candidate if we get here
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -133,6 +163,55 @@ public class RollupJobIdentifierUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isCalendarInterval(DateHistogramInterval interval) {
|
||||||
|
return DateHistogramAggregationBuilder.DATE_FIELD_UNITS.containsKey(interval.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean validateCalendarInterval(DateHistogramInterval requestInterval,
|
||||||
|
DateHistogramInterval configInterval) {
|
||||||
|
// Both must be calendar intervals
|
||||||
|
if (isCalendarInterval(requestInterval) == false || isCalendarInterval(configInterval) == false) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The request must be gte the config. The CALENDAR_ORDERING map values are integers representing
|
||||||
|
// relative orders between the calendar units
|
||||||
|
int requestOrder = CALENDAR_ORDERING.getOrDefault(requestInterval.toString(), Integer.MAX_VALUE);
|
||||||
|
int configOrder = CALENDAR_ORDERING.getOrDefault(configInterval.toString(), Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
// All calendar units are multiples naturally, so we just care about gte
|
||||||
|
return requestOrder >= configOrder;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean validateFixedInterval(DateHistogramInterval requestInterval,
|
||||||
|
DateHistogramInterval configInterval) {
|
||||||
|
// Neither can be calendar intervals
|
||||||
|
if (isCalendarInterval(requestInterval) || isCalendarInterval(configInterval)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Both are fixed, good to conver to millis now
|
||||||
|
long configIntervalMillis = TimeValue.parseTimeValue(configInterval.toString(),
|
||||||
|
"date_histo.config.interval").getMillis();
|
||||||
|
long requestIntervalMillis = TimeValue.parseTimeValue(requestInterval.toString(),
|
||||||
|
"date_histo.request.interval").getMillis();
|
||||||
|
|
||||||
|
// Must be a multiple and gte the config
|
||||||
|
return requestIntervalMillis >= configIntervalMillis && requestIntervalMillis % configIntervalMillis == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean validateFixedInterval(long requestInterval, DateHistogramInterval configInterval) {
|
||||||
|
// config must not be a calendar interval
|
||||||
|
if (isCalendarInterval(configInterval)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
long configIntervalMillis = TimeValue.parseTimeValue(configInterval.toString(),
|
||||||
|
"date_histo.config.interval").getMillis();
|
||||||
|
|
||||||
|
// Must be a multiple and gte the config
|
||||||
|
return requestInterval >= configIntervalMillis && requestInterval % configIntervalMillis == 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the set of histo's with the largest interval
|
* Find the set of histo's with the largest interval
|
||||||
*/
|
*/
|
||||||
@ -144,8 +223,8 @@ public class RollupJobIdentifierUtils {
|
|||||||
for (Map<String, Object> agg : fieldCaps.getAggs()) {
|
for (Map<String, Object> agg : fieldCaps.getAggs()) {
|
||||||
if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) {
|
if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) {
|
||||||
Long interval = (long)agg.get(RollupField.INTERVAL);
|
Long interval = (long)agg.get(RollupField.INTERVAL);
|
||||||
// TODO should be divisor of interval
|
// query interval must be gte the configured interval, and a whole multiple
|
||||||
if (interval <= source.interval()) {
|
if (interval <= source.interval() && source.interval() % interval == 0) {
|
||||||
localCaps.add(cap);
|
localCaps.add(cap);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -155,8 +234,8 @@ public class RollupJobIdentifierUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (localCaps.isEmpty()) {
|
if (localCaps.isEmpty()) {
|
||||||
throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" +
|
throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName()
|
||||||
source.field() + "] which also satisfies all requirements of query.");
|
+ "] agg on field [" + source.field() + "] which also satisfies all requirements of query.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// We are a leaf, save our best caps
|
// We are a leaf, save our best caps
|
||||||
|
@ -61,6 +61,32 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
|
|||||||
assertThat(bestCaps.size(), equalTo(1));
|
assertThat(bestCaps.size(), equalTo(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBiggerButCompatibleFixedInterval() {
|
||||||
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("100s")));
|
||||||
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
|
RollupJobCaps cap = new RollupJobCaps(job);
|
||||||
|
Set<RollupJobCaps> caps = singletonSet(cap);
|
||||||
|
|
||||||
|
DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("foo").field("foo")
|
||||||
|
.dateHistogramInterval(new DateHistogramInterval("1000s"));
|
||||||
|
|
||||||
|
Set<RollupJobCaps> bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
|
||||||
|
assertThat(bestCaps.size(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBiggerButCompatibleFixedMillisInterval() {
|
||||||
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("100ms")));
|
||||||
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
|
RollupJobCaps cap = new RollupJobCaps(job);
|
||||||
|
Set<RollupJobCaps> caps = singletonSet(cap);
|
||||||
|
|
||||||
|
DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("foo").field("foo")
|
||||||
|
.interval(1000);
|
||||||
|
|
||||||
|
Set<RollupJobCaps> bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
|
||||||
|
assertThat(bestCaps.size(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
public void testIncompatibleInterval() {
|
public void testIncompatibleInterval() {
|
||||||
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1d")));
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1d")));
|
||||||
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
@ -75,6 +101,20 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
|
|||||||
"[foo] which also satisfies all requirements of query."));
|
"[foo] which also satisfies all requirements of query."));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIncompatibleFixedCalendarInterval() {
|
||||||
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("5d")));
|
||||||
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
|
RollupJobCaps cap = new RollupJobCaps(job);
|
||||||
|
Set<RollupJobCaps> caps = singletonSet(cap);
|
||||||
|
|
||||||
|
DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("foo").field("foo")
|
||||||
|
.dateHistogramInterval(new DateHistogramInterval("day"));
|
||||||
|
|
||||||
|
RuntimeException e = expectThrows(RuntimeException.class, () -> RollupJobIdentifierUtils.findBestJobs(builder, caps));
|
||||||
|
assertThat(e.getMessage(), equalTo("There is not a rollup job that has a [date_histogram] agg on field " +
|
||||||
|
"[foo] which also satisfies all requirements of query."));
|
||||||
|
}
|
||||||
|
|
||||||
public void testBadTimeZone() {
|
public void testBadTimeZone() {
|
||||||
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "EST"));
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "EST"));
|
||||||
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
@ -385,6 +425,27 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
|
|||||||
"[bar] which also satisfies all requirements of query."));
|
"[bar] which also satisfies all requirements of query."));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHistoIntervalNotMultiple() {
|
||||||
|
HistogramAggregationBuilder histo = new HistogramAggregationBuilder("test_histo");
|
||||||
|
histo.interval(10) // <--- interval is not a multiple of 3
|
||||||
|
.field("bar")
|
||||||
|
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
|
||||||
|
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
|
||||||
|
|
||||||
|
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo",
|
||||||
|
new DateHistogramInterval("1d"), null, "UTC"),
|
||||||
|
new HistogramGroupConfig(3L, "bar"),
|
||||||
|
null);
|
||||||
|
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
|
||||||
|
RollupJobCaps cap = new RollupJobCaps(job);
|
||||||
|
Set<RollupJobCaps> caps = singletonSet(cap);
|
||||||
|
|
||||||
|
Exception e = expectThrows(RuntimeException.class,
|
||||||
|
() -> RollupJobIdentifierUtils.findBestJobs(histo, caps));
|
||||||
|
assertThat(e.getMessage(), equalTo("There is not a rollup job that has a [histogram] agg on field " +
|
||||||
|
"[bar] which also satisfies all requirements of query."));
|
||||||
|
}
|
||||||
|
|
||||||
public void testMissingMetric() {
|
public void testMissingMetric() {
|
||||||
int i = ESTestCase.randomIntBetween(0, 3);
|
int i = ESTestCase.randomIntBetween(0, 3);
|
||||||
|
|
||||||
@ -417,6 +478,105 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testValidateFixedInterval() {
|
||||||
|
boolean valid = RollupJobIdentifierUtils.validateFixedInterval(100, new DateHistogramInterval("100ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(200, new DateHistogramInterval("100ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(1000, new DateHistogramInterval("200ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(5*60*1000, new DateHistogramInterval("5m"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(10*5*60*1000, new DateHistogramInterval("5m"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(100, new DateHistogramInterval("500ms"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(100, new DateHistogramInterval("5m"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(100, new DateHistogramInterval("minute"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(100, new DateHistogramInterval("second"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
// -----------
|
||||||
|
// Same tests, with both being DateHistoIntervals
|
||||||
|
// -----------
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("100ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("200ms"),
|
||||||
|
new DateHistogramInterval("100ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("1000ms"),
|
||||||
|
new DateHistogramInterval("200ms"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("5m"),
|
||||||
|
new DateHistogramInterval("5m"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("20m"),
|
||||||
|
new DateHistogramInterval("5m"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("500ms"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("5m"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("minute"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateFixedInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("second"));
|
||||||
|
assertFalse(valid);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidateCalendarInterval() {
|
||||||
|
boolean valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("second"),
|
||||||
|
new DateHistogramInterval("second"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("minute"),
|
||||||
|
new DateHistogramInterval("second"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("month"),
|
||||||
|
new DateHistogramInterval("day"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("1d"),
|
||||||
|
new DateHistogramInterval("1s"));
|
||||||
|
assertTrue(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("second"),
|
||||||
|
new DateHistogramInterval("minute"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("second"),
|
||||||
|
new DateHistogramInterval("1m"));
|
||||||
|
assertFalse(valid);
|
||||||
|
|
||||||
|
// Fails because both are actually fixed
|
||||||
|
valid = RollupJobIdentifierUtils.validateCalendarInterval(new DateHistogramInterval("100ms"),
|
||||||
|
new DateHistogramInterval("100ms"));
|
||||||
|
assertFalse(valid);
|
||||||
|
}
|
||||||
|
|
||||||
private Set<RollupJobCaps> singletonSet(RollupJobCaps cap) {
|
private Set<RollupJobCaps> singletonSet(RollupJobCaps cap) {
|
||||||
Set<RollupJobCaps> caps = new HashSet<>();
|
Set<RollupJobCaps> caps = new HashSet<>();
|
||||||
caps.add(cap);
|
caps.add(cap);
|
||||||
|
@ -173,7 +173,7 @@ public class RollupIT extends ESRestTestCase {
|
|||||||
" \"date_histo\": {\n" +
|
" \"date_histo\": {\n" +
|
||||||
" \"date_histogram\": {\n" +
|
" \"date_histogram\": {\n" +
|
||||||
" \"field\": \"timestamp\",\n" +
|
" \"field\": \"timestamp\",\n" +
|
||||||
" \"interval\": \"1h\",\n" +
|
" \"interval\": \"60m\",\n" +
|
||||||
" \"format\": \"date_time\"\n" +
|
" \"format\": \"date_time\"\n" +
|
||||||
" },\n" +
|
" },\n" +
|
||||||
" \"aggs\": {\n" +
|
" \"aggs\": {\n" +
|
||||||
|
Loading…
x
Reference in New Issue
Block a user