240 lines
7.8 KiB
Plaintext
240 lines
7.8 KiB
Plaintext
[role="xpack"]
|
|
[[ml-configuring-aggregation]]
|
|
=== Aggregating data for faster performance
|
|
|
|
By default, {dfeeds} fetch data from {es} using search and scroll requests.
|
|
It can be significantly more efficient, however, to aggregate data in {es}
|
|
and to configure your jobs to analyze aggregated data.
|
|
|
|
One of the benefits of aggregating data this way is that {es} automatically
|
|
distributes these calculations across your cluster. You can then feed this
|
|
aggregated data into {xpackml} instead of raw results, which
|
|
reduces the volume of data that must be considered while detecting anomalies.
|
|
|
|
There are some limitations to using aggregations in {dfeeds}, however.
|
|
Your aggregation must include a `date_histogram` aggregation, which in turn must
|
|
contain a `max` aggregation on the time field. This requirement ensures that the
|
|
aggregated data is a time series and the timestamp of each bucket is the time
|
|
of the last record in the bucket. If you use a terms aggregation and the
|
|
cardinality of a term is high, then the aggregation might not be effective and
|
|
you might want to just use the default search and scroll behavior.
|
|
|
|
When you create or update a job, you can include the names of aggregations, for
|
|
example:
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
PUT _ml/anomaly_detectors/farequote
|
|
{
|
|
"analysis_config": {
|
|
"bucket_span": "60m",
|
|
"detectors": [{
|
|
"function": "mean",
|
|
"field_name": "responsetime",
|
|
"by_field_name": "airline"
|
|
}],
|
|
"summary_count_field_name": "doc_count"
|
|
},
|
|
"data_description": {
|
|
"time_field":"time"
|
|
}
|
|
}
|
|
----------------------------------
|
|
// CONSOLE
|
|
// TEST[skip:setup:farequote_data]
|
|
|
|
In this example, the `airline`, `responsetime`, and `time` fields are
|
|
aggregations.
|
|
|
|
NOTE: When the `summary_count_field_name` property is set to a non-null value,
|
|
the job expects to receive aggregated input. The property must be set to the
|
|
name of the field that contains the count of raw data points that have been
|
|
aggregated. It applies to all detectors in the job.
|
|
|
|
The aggregations are defined in the {dfeed} as follows:
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
PUT _ml/datafeeds/datafeed-farequote
|
|
{
|
|
"job_id":"farequote",
|
|
"indices": ["farequote"],
|
|
"aggregations": {
|
|
"buckets": {
|
|
"date_histogram": {
|
|
"field": "time",
|
|
"interval": "360s",
|
|
"time_zone": "UTC"
|
|
},
|
|
"aggregations": {
|
|
"time": {
|
|
"max": {"field": "time"}
|
|
},
|
|
"airline": {
|
|
"terms": {
|
|
"field": "airline",
|
|
"size": 100
|
|
},
|
|
"aggregations": {
|
|
"responsetime": {
|
|
"avg": {
|
|
"field": "responsetime"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// CONSOLE
|
|
// TEST[skip:setup:farequote_job]
|
|
|
|
In this example, the aggregations have names that match the fields that they
|
|
operate on. That is to say, the `max` aggregation is named `time` and its
|
|
field is also `time`. The same is true for the aggregations with the names
|
|
`airline` and `responsetime`. Since you must create the job before you can
|
|
create the {dfeed}, synchronizing your aggregation and field names can simplify
|
|
these configuration steps.
|
|
|
|
IMPORTANT: If you use a `max` aggregation on a time field, the aggregation name
|
|
in the {dfeed} must match the name of the time field, as in the previous example.
|
|
For all other aggregations, if the aggregation name doesn't match the field name,
|
|
there are limitations in the drill-down functionality within the {ml} page in
|
|
{kib}.
|
|
|
|
{dfeeds-cap} support complex nested aggregations, this example uses the `derivative`
|
|
pipeline aggregation to find the first order derivative of the counter
|
|
`system.network.out.bytes` for each value of the field `beat.name`.
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
"aggregations": {
|
|
"beat.name": {
|
|
"terms": {
|
|
"field": "beat.name"
|
|
},
|
|
"aggregations": {
|
|
"buckets": {
|
|
"date_histogram": {
|
|
"field": "@timestamp",
|
|
"interval": "5m"
|
|
},
|
|
"aggregations": {
|
|
"@timestamp": {
|
|
"max": {
|
|
"field": "@timestamp"
|
|
}
|
|
},
|
|
"bytes_out_average": {
|
|
"avg": {
|
|
"field": "system.network.out.bytes"
|
|
}
|
|
},
|
|
"bytes_out_derivative": {
|
|
"derivative": {
|
|
"buckets_path": "bytes_out_average"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
When you define an aggregation in a {dfeed}, it must have the following form:
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
"aggregations": {
|
|
["bucketing_aggregation": {
|
|
"bucket_agg": {
|
|
...
|
|
},
|
|
"aggregations": {]
|
|
"data_histogram_aggregation": {
|
|
"date_histogram": {
|
|
"field": "time",
|
|
},
|
|
"aggregations": {
|
|
"timestamp": {
|
|
"max": {
|
|
"field": "time"
|
|
}
|
|
},
|
|
[,"<first_term>": {
|
|
"terms":{...
|
|
}
|
|
[,"aggregations" : {
|
|
[<sub_aggregation>]+
|
|
} ]
|
|
}]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The top level aggregation must be either a {ref}/search-aggregations-bucket.html[Bucket Aggregation]
|
|
containing as single sub-aggregation that is a `date_histogram` or the top level aggregation
|
|
is the required `date_histogram`. There must be exactly 1 `date_histogram` aggregation.
|
|
For more information, see
|
|
{ref}/search-aggregations-bucket-datehistogram-aggregation.html[Date Histogram Aggregation].
|
|
|
|
NOTE: The `time_zone` parameter in the date histogram aggregation must be set to `UTC`,
|
|
which is the default value.
|
|
|
|
Each histogram bucket has a key, which is the bucket start time. This key cannot
|
|
be used for aggregations in {dfeeds}, however, because they need to know the
|
|
time of the latest record within a bucket. Otherwise, when you restart a {dfeed},
|
|
it continues from the start time of the histogram bucket and possibly fetches
|
|
the same data twice. The max aggregation for the time field is therefore
|
|
necessary to provide the time of the latest record within a bucket.
|
|
|
|
You can optionally specify a terms aggregation, which creates buckets for
|
|
different values of a field.
|
|
|
|
IMPORTANT: If you use a terms aggregation, by default it returns buckets for
|
|
the top ten terms. Thus if the cardinality of the term is greater than 10, not
|
|
all terms are analyzed.
|
|
|
|
You can change this behavior by setting the `size` parameter. To
|
|
determine the cardinality of your data, you can run searches such as:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET .../_search {
|
|
"aggs": {
|
|
"service_cardinality": {
|
|
"cardinality": {
|
|
"field": "service"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
By default, {es} limits the maximum number of terms returned to 10000. For high
|
|
cardinality fields, the query might not run. It might return errors related to
|
|
circuit breaking exceptions that indicate that the data is too large. In such
|
|
cases, do not use aggregations in your {dfeed}. For more
|
|
information, see {ref}/search-aggregations-bucket-terms-aggregation.html[Terms Aggregation].
|
|
|
|
You can also optionally specify multiple sub-aggregations.
|
|
The sub-aggregations are aggregated for the buckets that were created by their
|
|
parent aggregation. For more information, see
|
|
{ref}/search-aggregations.html[Aggregations].
|
|
|
|
TIP: If your detectors use metric or sum analytical functions, set the
|
|
`interval` of the date histogram aggregation to a tenth of the `bucket_span`
|
|
that was defined in the job. This suggestion creates finer, more granular time
|
|
buckets, which are ideal for this type of analysis. If your detectors use count
|
|
or rare functions, set `interval` to the same value as `bucket_span`. For more
|
|
information about analytical functions, see <<ml-functions>>.
|