[ML] Enable overall buckets aggregation at a custom bucket span (elastic/x-pack-elasticsearch#2782)
For the purpose of getting this API consumed by our UI, returning overall buckets that match the job's largest `bucket_span` can result in too much data. The UI only ever displays a few buckets in the swimlane. Their span depends on the time range selected and the screen resolution, but it will only ever be a relatively low number. This PR adds the ability to aggregate overall buckets in a user specified `bucket_span`. That `bucket_span` may be equal or greater to the largest job's `bucket_span`. The `overall_score` of the result overall buckets is the max score of the corresponding overall buckets with a span equal to the job's largest `bucket_span`. The implementation is now chunking the bucket requests as otherwise the aggregation would fail when too many buckets are matching. Original commit: elastic/x-pack-elasticsearch@981f7a40e5
This commit is contained in:
parent
e028716bec
commit
c7e94b3b4c
|
@ -32,6 +32,11 @@ score in the overall bucket. Alternatively, if you set `top_n` to the number of
|
|||
jobs, the `overall_score` is high only when all jobs detect anomalies in that
|
||||
overall bucket.
|
||||
|
||||
In addition, the optional parameter `bucket_span` may be used in order
|
||||
to request overall buckets that span longer than the largest job's `bucket_span`.
|
||||
When set, the `overall_score` will be the max `overall_score` of the corresponding
|
||||
overall buckets with a span equal to the largest job's `bucket_span`.
|
||||
|
||||
==== Path Parameters
|
||||
|
||||
`job_id`::
|
||||
|
@ -44,6 +49,10 @@ overall bucket.
|
|||
(boolean) If `false` and the `job_id` does not match any job an error will
|
||||
be returned. The default value is `true`.
|
||||
|
||||
`bucket_span`::
|
||||
(string) The span of the overall buckets. Must be greater or equal
|
||||
to the largest job's `bucket_span`. Defaults to the largest job's `bucket_span`.
|
||||
|
||||
`end`::
|
||||
(string) Returns overall buckets with timestamps earlier than this time.
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -14,7 +13,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -38,11 +36,12 @@ import org.elasticsearch.search.aggregations.AggregationBuilder;
|
|||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
import org.elasticsearch.search.aggregations.metrics.min.Min;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
|
@ -50,26 +49,37 @@ import org.elasticsearch.xpack.ml.job.messages.Messages;
|
|||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsAggregator;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsCollector;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProcessor;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider;
|
||||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.utils.Intervals;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This action returns summarized bucket results over multiple jobs.
|
||||
* Overall buckets have the span of the largest job's bucket_span.
|
||||
* Their score is calculated by finding the max anomaly score per job
|
||||
* and then averaging the top N.
|
||||
* </p>
|
||||
* <p>
|
||||
* Overall buckets can be optionally aggregated into larger intervals
|
||||
* by setting the bucket_span parameter. When that is the case, the
|
||||
* overall_score is the max of the overall buckets that are within
|
||||
* the interval.
|
||||
* </p>
|
||||
*/
|
||||
public class GetOverallBucketsAction
|
||||
extends Action<GetOverallBucketsAction.Request, GetOverallBucketsAction.Response, GetOverallBucketsAction.RequestBuilder> {
|
||||
|
@ -94,6 +104,7 @@ public class GetOverallBucketsAction
|
|||
public static class Request extends ActionRequest implements ToXContentObject {
|
||||
|
||||
public static final ParseField TOP_N = new ParseField("top_n");
|
||||
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
|
||||
public static final ParseField OVERALL_SCORE = new ParseField("overall_score");
|
||||
public static final ParseField EXCLUDE_INTERIM = new ParseField("exclude_interim");
|
||||
public static final ParseField START = new ParseField("start");
|
||||
|
@ -105,6 +116,7 @@ public class GetOverallBucketsAction
|
|||
static {
|
||||
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
|
||||
PARSER.declareInt(Request::setTopN, TOP_N);
|
||||
PARSER.declareString(Request::setBucketSpan, BUCKET_SPAN);
|
||||
PARSER.declareDouble(Request::setOverallScore, OVERALL_SCORE);
|
||||
PARSER.declareBoolean(Request::setExcludeInterim, EXCLUDE_INTERIM);
|
||||
PARSER.declareString((request, startTime) -> request.setStart(parseDateOrThrow(
|
||||
|
@ -135,6 +147,7 @@ public class GetOverallBucketsAction
|
|||
|
||||
private String jobId;
|
||||
private int topN = 1;
|
||||
private TimeValue bucketSpan;
|
||||
private double overallScore = 0.0;
|
||||
private boolean excludeInterim = false;
|
||||
private Long start;
|
||||
|
@ -163,6 +176,18 @@ public class GetOverallBucketsAction
|
|||
this.topN = topN;
|
||||
}
|
||||
|
||||
public TimeValue getBucketSpan() {
|
||||
return bucketSpan;
|
||||
}
|
||||
|
||||
public void setBucketSpan(TimeValue bucketSpan) {
|
||||
this.bucketSpan = bucketSpan;
|
||||
}
|
||||
|
||||
public void setBucketSpan(String bucketSpan) {
|
||||
this.bucketSpan = TimeValue.parseTimeValue(bucketSpan, BUCKET_SPAN.getPreferredName());
|
||||
}
|
||||
|
||||
public double getOverallScore() {
|
||||
return overallScore;
|
||||
}
|
||||
|
@ -221,6 +246,7 @@ public class GetOverallBucketsAction
|
|||
super.readFrom(in);
|
||||
jobId = in.readString();
|
||||
topN = in.readVInt();
|
||||
bucketSpan = in.readOptionalWriteable(TimeValue::new);
|
||||
overallScore = in.readDouble();
|
||||
excludeInterim = in.readBoolean();
|
||||
start = in.readOptionalLong();
|
||||
|
@ -233,6 +259,7 @@ public class GetOverallBucketsAction
|
|||
super.writeTo(out);
|
||||
out.writeString(jobId);
|
||||
out.writeVInt(topN);
|
||||
out.writeOptionalWriteable(bucketSpan);
|
||||
out.writeDouble(overallScore);
|
||||
out.writeBoolean(excludeInterim);
|
||||
out.writeOptionalLong(start);
|
||||
|
@ -245,6 +272,9 @@ public class GetOverallBucketsAction
|
|||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(TOP_N.getPreferredName(), topN);
|
||||
if (bucketSpan != null) {
|
||||
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan.getStringRep());
|
||||
}
|
||||
builder.field(OVERALL_SCORE.getPreferredName(), overallScore);
|
||||
builder.field(EXCLUDE_INTERIM.getPreferredName(), excludeInterim);
|
||||
if (start != null) {
|
||||
|
@ -260,7 +290,7 @@ public class GetOverallBucketsAction
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, topN, overallScore, excludeInterim, start, end, allowNoJobs);
|
||||
return Objects.hash(jobId, topN, bucketSpan, overallScore, excludeInterim, start, end, allowNoJobs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -274,6 +304,7 @@ public class GetOverallBucketsAction
|
|||
Request that = (Request) other;
|
||||
return Objects.equals(jobId, that.jobId) &&
|
||||
this.topN == that.topN &&
|
||||
Objects.equals(bucketSpan, that.bucketSpan) &&
|
||||
this.excludeInterim == that.excludeInterim &&
|
||||
this.overallScore == that.overallScore &&
|
||||
Objects.equals(start, that.start) &&
|
||||
|
@ -301,6 +332,10 @@ public class GetOverallBucketsAction
|
|||
this.overallBuckets = overallBuckets;
|
||||
}
|
||||
|
||||
public QueryPage<OverallBucket> getOverallBuckets() {
|
||||
return overallBuckets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -346,6 +381,9 @@ public class GetOverallBucketsAction
|
|||
|
||||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private static final String EARLIEST_TIME = "earliest_time";
|
||||
private static final String LATEST_TIME = "latest_time";
|
||||
|
||||
private final Client client;
|
||||
private final ClusterService clusterService;
|
||||
private final JobManager jobManager;
|
||||
|
@ -368,113 +406,192 @@ public class GetOverallBucketsAction
|
|||
return;
|
||||
}
|
||||
|
||||
List<String> indices = new ArrayList<>();
|
||||
TimeValue maxBucketSpan = TimeValue.ZERO;
|
||||
for (Job job : jobsPage.results()) {
|
||||
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
|
||||
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
|
||||
if (maxBucketSpan.compareTo(bucketSpan) < 0) {
|
||||
maxBucketSpan = bucketSpan;
|
||||
// As computing and potentially aggregating overall buckets might take a while,
|
||||
// we run in a different thread to avoid blocking the network thread.
|
||||
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
|
||||
try {
|
||||
getOverallBuckets(request, jobsPage.results(), listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
final long maxBucketSpanSeconds = maxBucketSpan.seconds();
|
||||
});
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = buildSearchRequest(request, maxBucketSpan.millis(), indices);
|
||||
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
|
||||
List<OverallBucket> overallBuckets = computeOverallBuckets(request, maxBucketSpanSeconds, searchResponse);
|
||||
private void getOverallBuckets(Request request, List<Job> jobs, ActionListener<Response> listener) {
|
||||
JobsContext jobsContext = JobsContext.build(jobs, request);
|
||||
|
||||
ActionListener<List<OverallBucket>> overallBucketsListener = ActionListener.wrap(overallBuckets -> {
|
||||
listener.onResponse(new Response(new QueryPage<>(overallBuckets, overallBuckets.size(), OverallBucket.RESULTS_FIELD)));
|
||||
}, listener::onFailure);
|
||||
|
||||
ActionListener<ChunkedBucketSearcher> chunkedBucketSearcherListener = ActionListener.wrap(searcher -> {
|
||||
if (searcher == null) {
|
||||
listener.onResponse(new Response());
|
||||
return;
|
||||
}
|
||||
searcher.searchAndComputeOverallBuckets(overallBucketsListener);
|
||||
}, listener::onFailure);
|
||||
|
||||
OverallBucketsProvider overallBucketsProvider = new OverallBucketsProvider(jobsContext.maxBucketSpan, request.getTopN(),
|
||||
request.getOverallScore());
|
||||
OverallBucketsProcessor overallBucketsProcessor = requiresAggregation(request, jobsContext.maxBucketSpan) ?
|
||||
new OverallBucketsAggregator(request.getBucketSpan()): new OverallBucketsCollector();
|
||||
initChunkedBucketSearcher(request, jobsContext, overallBucketsProvider, overallBucketsProcessor, chunkedBucketSearcherListener);
|
||||
}
|
||||
|
||||
private static boolean requiresAggregation(Request request, TimeValue maxBucketSpan) {
|
||||
return request.getBucketSpan() != null && !request.getBucketSpan().equals(maxBucketSpan);
|
||||
}
|
||||
|
||||
private static void checkValidBucketSpan(TimeValue bucketSpan, TimeValue maxBucketSpan) {
|
||||
if (bucketSpan != null && bucketSpan.compareTo(maxBucketSpan) < 0) {
|
||||
throw ExceptionsHelper.badRequestException("Param [{}] must be greater or equal to the max bucket_span [{}]",
|
||||
Request.BUCKET_SPAN, maxBucketSpan.getStringRep());
|
||||
}
|
||||
}
|
||||
|
||||
private void initChunkedBucketSearcher(Request request, JobsContext jobsContext, OverallBucketsProvider overallBucketsProvider,
|
||||
OverallBucketsProcessor overallBucketsProcessor,
|
||||
ActionListener<ChunkedBucketSearcher> listener) {
|
||||
long maxBucketSpanMillis = jobsContext.maxBucketSpan.millis();
|
||||
SearchRequest searchRequest = buildSearchRequest(request.getStart(), request.getEnd(), request.isExcludeInterim(),
|
||||
maxBucketSpanMillis, jobsContext.indices);
|
||||
searchRequest.source().aggregation(AggregationBuilders.min(EARLIEST_TIME).field(Result.TIMESTAMP.getPreferredName()));
|
||||
searchRequest.source().aggregation(AggregationBuilders.max(LATEST_TIME).field(Result.TIMESTAMP.getPreferredName()));
|
||||
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
|
||||
long totalHits = searchResponse.getHits().getTotalHits();
|
||||
if (totalHits > 0) {
|
||||
Aggregations aggregations = searchResponse.getAggregations();
|
||||
Min min = aggregations.get(EARLIEST_TIME);
|
||||
long earliestTime = Intervals.alignToFloor((long) min.getValue(), maxBucketSpanMillis);
|
||||
Max max = aggregations.get(LATEST_TIME);
|
||||
long latestTime = Intervals.alignToCeil((long) max.getValue() + 1, maxBucketSpanMillis);
|
||||
listener.onResponse(new ChunkedBucketSearcher(jobsContext, earliestTime, latestTime, request.isExcludeInterim(),
|
||||
overallBucketsProvider, overallBucketsProcessor));
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
private static SearchRequest buildSearchRequest(Request request, long bucketSpanMillis, List<String> indices) {
|
||||
String startTime = request.getStart() == null ? null : String.valueOf(
|
||||
Intervals.alignToCeil(request.getStart(), bucketSpanMillis));
|
||||
String endTime = request.getEnd() == null ? null : String.valueOf(Intervals.alignToFloor(request.getEnd(), bucketSpanMillis));
|
||||
private static class JobsContext {
|
||||
private final int jobCount;
|
||||
private final String[] indices;
|
||||
private final TimeValue maxBucketSpan;
|
||||
|
||||
private JobsContext(int jobCount, String[] indices, TimeValue maxBucketSpan) {
|
||||
this.jobCount = jobCount;
|
||||
this.indices = indices;
|
||||
this.maxBucketSpan = maxBucketSpan;
|
||||
}
|
||||
|
||||
private static JobsContext build(List<Job> jobs, Request request) {
|
||||
Set<String> indices = new HashSet<>();
|
||||
TimeValue maxBucketSpan = TimeValue.ZERO;
|
||||
for (Job job : jobs) {
|
||||
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
|
||||
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
|
||||
if (maxBucketSpan.compareTo(bucketSpan) < 0) {
|
||||
maxBucketSpan = bucketSpan;
|
||||
}
|
||||
}
|
||||
checkValidBucketSpan(request.getBucketSpan(), maxBucketSpan);
|
||||
|
||||
// If top_n is 1, we can use the request bucket_span in order to optimize the aggregations
|
||||
if (request.getBucketSpan() != null && (request.getTopN() == 1 || jobs.size() <= 1)) {
|
||||
maxBucketSpan = request.getBucketSpan();
|
||||
}
|
||||
|
||||
return new JobsContext(jobs.size(), indices.toArray(new String[indices.size()]), maxBucketSpan);
|
||||
}
|
||||
}
|
||||
|
||||
private class ChunkedBucketSearcher {
|
||||
|
||||
private static final int BUCKETS_PER_CHUNK = 1000;
|
||||
private static final int MAX_RESULT_COUNT = 10000;
|
||||
|
||||
private final String[] indices;
|
||||
private final long maxBucketSpanMillis;
|
||||
private final boolean excludeInterim;
|
||||
private final long chunkMillis;
|
||||
private final long endTime;
|
||||
private volatile long curTime;
|
||||
private final AggregationBuilder aggs;
|
||||
private final OverallBucketsProvider overallBucketsProvider;
|
||||
private final OverallBucketsProcessor overallBucketsProcessor;
|
||||
|
||||
ChunkedBucketSearcher(JobsContext jobsContext, long startTime, long endTime,
|
||||
boolean excludeInterim, OverallBucketsProvider overallBucketsProvider,
|
||||
OverallBucketsProcessor overallBucketsProcessor) {
|
||||
this.indices = jobsContext.indices;
|
||||
this.maxBucketSpanMillis = jobsContext.maxBucketSpan.millis();
|
||||
this.chunkMillis = BUCKETS_PER_CHUNK * maxBucketSpanMillis;
|
||||
this.endTime = endTime;
|
||||
this.curTime = startTime;
|
||||
this.excludeInterim = excludeInterim;
|
||||
this.aggs = buildAggregations(maxBucketSpanMillis, jobsContext.jobCount);
|
||||
this.overallBucketsProvider = overallBucketsProvider;
|
||||
this.overallBucketsProcessor = overallBucketsProcessor;
|
||||
}
|
||||
|
||||
void searchAndComputeOverallBuckets(ActionListener<List<OverallBucket>> listener) {
|
||||
if (curTime >= endTime) {
|
||||
listener.onResponse(overallBucketsProcessor.finish());
|
||||
return;
|
||||
}
|
||||
client.search(nextSearch(), ActionListener.wrap(searchResponse -> {
|
||||
Histogram histogram = searchResponse.getAggregations().get(Result.TIMESTAMP.getPreferredName());
|
||||
overallBucketsProcessor.process(overallBucketsProvider.computeOverallBuckets(histogram));
|
||||
if (overallBucketsProcessor.size() > MAX_RESULT_COUNT) {
|
||||
listener.onFailure(ExceptionsHelper.badRequestException("Unable to return more than [{}] results; please use " +
|
||||
"parameters [{}] and [{}] to limit the time range", MAX_RESULT_COUNT, Request.START, Request.END));
|
||||
return;
|
||||
}
|
||||
searchAndComputeOverallBuckets(listener);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
SearchRequest nextSearch() {
|
||||
long curEnd = Math.min(curTime + chunkMillis, endTime);
|
||||
logger.debug("Search for buckets in: [{}, {})", curTime, curEnd);
|
||||
SearchRequest searchRequest = buildSearchRequest(curTime, curEnd, excludeInterim, maxBucketSpanMillis, indices);
|
||||
searchRequest.source().aggregation(aggs);
|
||||
curTime += chunkMillis;
|
||||
return searchRequest;
|
||||
}
|
||||
}
|
||||
|
||||
private static SearchRequest buildSearchRequest(Long start, Long end, boolean excludeInterim, long bucketSpanMillis,
|
||||
String[] indices) {
|
||||
String startTime = start == null ? null : String.valueOf(Intervals.alignToCeil(start, bucketSpanMillis));
|
||||
String endTime = end == null ? null : String.valueOf(Intervals.alignToFloor(end, bucketSpanMillis));
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = new BucketsQueryBuilder()
|
||||
.size(0)
|
||||
.includeInterim(request.isExcludeInterim() == false)
|
||||
.includeInterim(excludeInterim == false)
|
||||
.start(startTime)
|
||||
.end(endTime)
|
||||
.build();
|
||||
searchSourceBuilder.aggregation(buildAggregations(bucketSpanMillis));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[indices.size()]));
|
||||
SearchRequest searchRequest = new SearchRequest(indices);
|
||||
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
return searchRequest;
|
||||
}
|
||||
|
||||
private static AggregationBuilder buildAggregations(long bucketSpanMillis) {
|
||||
private static AggregationBuilder buildAggregations(long maxBucketSpanMillis, int jobCount) {
|
||||
AggregationBuilder overallScoreAgg = AggregationBuilders.max(OverallBucket.OVERALL_SCORE.getPreferredName())
|
||||
.field(Bucket.ANOMALY_SCORE.getPreferredName());
|
||||
AggregationBuilder jobsAgg = AggregationBuilders.terms(Job.ID.getPreferredName())
|
||||
.field(Job.ID.getPreferredName()).subAggregation(overallScoreAgg);
|
||||
.field(Job.ID.getPreferredName()).size(jobCount).subAggregation(overallScoreAgg);
|
||||
AggregationBuilder interimAgg = AggregationBuilders.max(Result.IS_INTERIM.getPreferredName())
|
||||
.field(Result.IS_INTERIM.getPreferredName());
|
||||
return AggregationBuilders.dateHistogram(Result.TIMESTAMP.getPreferredName())
|
||||
.field(Result.TIMESTAMP.getPreferredName())
|
||||
.interval(bucketSpanMillis)
|
||||
.interval(maxBucketSpanMillis)
|
||||
.subAggregation(jobsAgg)
|
||||
.subAggregation(interimAgg);
|
||||
}
|
||||
|
||||
private List<OverallBucket> computeOverallBuckets(Request request, long bucketSpanSeconds, SearchResponse searchResponse) {
|
||||
List<OverallBucket> overallBuckets = new ArrayList<>();
|
||||
Histogram histogram = searchResponse.getAggregations().get(Result.TIMESTAMP.getPreferredName());
|
||||
for (Histogram.Bucket histogramBucket : histogram.getBuckets()) {
|
||||
Aggregations histogramBucketAggs = histogramBucket.getAggregations();
|
||||
Terms jobsAgg = histogramBucketAggs.get(Job.ID.getPreferredName());
|
||||
int jobsCount = jobsAgg.getBuckets().size();
|
||||
int topN = Math.min(request.getTopN(), jobsCount);
|
||||
List<OverallBucket.JobInfo> jobs = new ArrayList<>(jobsCount);
|
||||
TopNScores topNScores = new TopNScores(topN);
|
||||
for (Terms.Bucket jobsBucket : jobsAgg.getBuckets()) {
|
||||
Max maxScore = jobsBucket.getAggregations().get(OverallBucket.OVERALL_SCORE.getPreferredName());
|
||||
topNScores.insertWithOverflow(maxScore.getValue());
|
||||
jobs.add(new OverallBucket.JobInfo((String) jobsBucket.getKey(), maxScore.getValue()));
|
||||
}
|
||||
|
||||
double overallScore = topNScores.overallScore();
|
||||
if (overallScore < request.getOverallScore()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Max interimAgg = histogramBucketAggs.get(Result.IS_INTERIM.getPreferredName());
|
||||
boolean isInterim = interimAgg.getValue() > 0;
|
||||
if (request.isExcludeInterim() && isInterim) {
|
||||
continue;
|
||||
}
|
||||
|
||||
overallBuckets.add(new OverallBucket(getHistogramBucketTimestamp(histogramBucket),
|
||||
bucketSpanSeconds, overallScore, jobs, isInterim));
|
||||
}
|
||||
return overallBuckets;
|
||||
}
|
||||
|
||||
private static Date getHistogramBucketTimestamp(Histogram.Bucket bucket) {
|
||||
DateTime bucketTimestamp = (DateTime) bucket.getKey();
|
||||
return new Date(bucketTimestamp.getMillis());
|
||||
}
|
||||
|
||||
static class TopNScores extends PriorityQueue<Double> {
|
||||
|
||||
TopNScores(int n) {
|
||||
super(n, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean lessThan(Double a, Double b) {
|
||||
return a < b;
|
||||
}
|
||||
|
||||
double overallScore() {
|
||||
double overallScore = 0.0;
|
||||
for (double score : this) {
|
||||
overallScore += score;
|
||||
}
|
||||
return size() > 0 ? overallScore / size() : 0.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
import org.elasticsearch.xpack.ml.utils.Intervals;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class OverallBucketsAggregator implements OverallBucketsProcessor {
|
||||
|
||||
private final long bucketSpanSeconds;
|
||||
private final long bucketSpanMillis;
|
||||
private double maxOverallScore = 0.0;
|
||||
private Map<String, Double> maxScoreByJob = new TreeMap<>();
|
||||
private boolean isInterim = false;
|
||||
private Long startTime;
|
||||
private final List<OverallBucket> aggregated = new ArrayList<>();
|
||||
|
||||
public OverallBucketsAggregator(TimeValue bucketSpan) {
|
||||
bucketSpanSeconds = bucketSpan.seconds();
|
||||
bucketSpanMillis = bucketSpan.millis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void process(List<OverallBucket> buckets) {
|
||||
if (buckets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (startTime == null) {
|
||||
startTime = Intervals.alignToFloor(buckets.get(0).getTimestamp().getTime(), bucketSpanMillis);
|
||||
}
|
||||
long bucketTime;
|
||||
for (OverallBucket bucket : buckets) {
|
||||
bucketTime = bucket.getTimestamp().getTime();
|
||||
if (bucketTime >= startTime + bucketSpanMillis) {
|
||||
aggregated.add(outputBucket());
|
||||
startNextBucket(bucketTime);
|
||||
}
|
||||
processBucket(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
private OverallBucket outputBucket() {
|
||||
List<OverallBucket.JobInfo> jobs = new ArrayList<>(maxScoreByJob.size());
|
||||
maxScoreByJob.entrySet().stream().forEach(entry -> jobs.add(
|
||||
new OverallBucket.JobInfo(entry.getKey(), entry.getValue())));
|
||||
return new OverallBucket(new Date(startTime), bucketSpanSeconds, maxOverallScore, jobs, isInterim);
|
||||
}
|
||||
|
||||
private void startNextBucket(long bucketTime) {
|
||||
maxOverallScore = 0.0;
|
||||
maxScoreByJob.clear();
|
||||
isInterim = false;
|
||||
startTime = Intervals.alignToFloor(bucketTime, bucketSpanMillis);
|
||||
}
|
||||
|
||||
private void processBucket(OverallBucket bucket) {
|
||||
maxOverallScore = Math.max(maxOverallScore, bucket.getOverallScore());
|
||||
bucket.getJobs().stream().forEach(j -> {
|
||||
double currentMax = maxScoreByJob.computeIfAbsent(j.getJobId(), k -> 0.0);
|
||||
if (j.getMaxAnomalyScore() > currentMax) {
|
||||
maxScoreByJob.put(j.getJobId(), j.getMaxAnomalyScore());
|
||||
}
|
||||
});
|
||||
isInterim |= bucket.isInterim();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<OverallBucket> finish() {
|
||||
if (startTime != null) {
|
||||
aggregated.add(outputBucket());
|
||||
}
|
||||
return aggregated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int size() {
|
||||
return aggregated.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class OverallBucketsCollector implements OverallBucketsProcessor {
|
||||
|
||||
private final List<OverallBucket> collected = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public synchronized void process(List<OverallBucket> overallBuckets) {
|
||||
collected.addAll(overallBuckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<OverallBucket> finish() {
|
||||
return collected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int size() {
|
||||
return collected.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface OverallBucketsProcessor {
|
||||
|
||||
void process(List<OverallBucket> overallBuckets);
|
||||
List<OverallBucket> finish();
|
||||
int size();
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class OverallBucketsProvider {
|
||||
|
||||
private final long maxJobBucketSpanSeconds;
|
||||
private final int topN;
|
||||
private final double minOverallScore;
|
||||
|
||||
public OverallBucketsProvider(TimeValue maxJobBucketSpan, int topN, double minOverallScore) {
|
||||
this.maxJobBucketSpanSeconds = maxJobBucketSpan.seconds();
|
||||
this.topN = topN;
|
||||
this.minOverallScore = minOverallScore;
|
||||
}
|
||||
|
||||
public List<OverallBucket> computeOverallBuckets(Histogram histogram) {
|
||||
List<OverallBucket> overallBuckets = new ArrayList<>();
|
||||
for (Histogram.Bucket histogramBucket : histogram.getBuckets()) {
|
||||
Aggregations histogramBucketAggs = histogramBucket.getAggregations();
|
||||
Terms jobsAgg = histogramBucketAggs.get(Job.ID.getPreferredName());
|
||||
int jobsCount = jobsAgg.getBuckets().size();
|
||||
int bucketTopN = Math.min(topN, jobsCount);
|
||||
Set<OverallBucket.JobInfo> jobs = new TreeSet<>();
|
||||
TopNScores topNScores = new TopNScores(bucketTopN);
|
||||
for (Terms.Bucket jobsBucket : jobsAgg.getBuckets()) {
|
||||
Max maxScore = jobsBucket.getAggregations().get(OverallBucket.OVERALL_SCORE.getPreferredName());
|
||||
topNScores.insertWithOverflow(maxScore.getValue());
|
||||
jobs.add(new OverallBucket.JobInfo((String) jobsBucket.getKey(), maxScore.getValue()));
|
||||
}
|
||||
|
||||
double overallScore = topNScores.overallScore();
|
||||
if (overallScore < minOverallScore) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Max interimAgg = histogramBucketAggs.get(Result.IS_INTERIM.getPreferredName());
|
||||
boolean isInterim = interimAgg.getValue() > 0;
|
||||
|
||||
overallBuckets.add(new OverallBucket(getHistogramBucketTimestamp(histogramBucket),
|
||||
maxJobBucketSpanSeconds, overallScore, new ArrayList<>(jobs), isInterim));
|
||||
}
|
||||
return overallBuckets;
|
||||
}
|
||||
|
||||
private static Date getHistogramBucketTimestamp(Histogram.Bucket bucket) {
|
||||
DateTime bucketTimestamp = (DateTime) bucket.getKey();
|
||||
return new Date(bucketTimestamp.getMillis());
|
||||
}
|
||||
|
||||
static class TopNScores extends PriorityQueue<Double> {
|
||||
|
||||
TopNScores(int n) {
|
||||
super(n, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean lessThan(Double a, Double b) {
|
||||
return a < b;
|
||||
}
|
||||
|
||||
double overallScore() {
|
||||
double overallScore = 0.0;
|
||||
for (double score : this) {
|
||||
overallScore += score;
|
||||
}
|
||||
return size() > 0 ? overallScore / size() : 0.0;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -95,6 +95,10 @@ public class OverallBucket implements ToXContentObject, Writeable {
|
|||
return overallScore;
|
||||
}
|
||||
|
||||
public List<JobInfo> getJobs() {
|
||||
return jobs;
|
||||
}
|
||||
|
||||
public boolean isInterim() {
|
||||
return isInterim;
|
||||
}
|
||||
|
@ -126,7 +130,7 @@ public class OverallBucket implements ToXContentObject, Writeable {
|
|||
&& this.isInterim == that.isInterim;
|
||||
}
|
||||
|
||||
public static class JobInfo implements ToXContentObject, Writeable {
|
||||
public static class JobInfo implements ToXContentObject, Writeable, Comparable<JobInfo> {
|
||||
|
||||
private static final ParseField MAX_ANOMALY_SCORE = new ParseField("max_anomaly_score");
|
||||
|
||||
|
@ -143,6 +147,14 @@ public class OverallBucket implements ToXContentObject, Writeable {
|
|||
maxAnomalyScore = in.readDouble();
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public double getMaxAnomalyScore() {
|
||||
return maxAnomalyScore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
|
@ -174,5 +186,14 @@ public class OverallBucket implements ToXContentObject, Writeable {
|
|||
JobInfo that = (JobInfo) other;
|
||||
return Objects.equals(this.jobId, that.jobId) && this.maxAnomalyScore == that.maxAnomalyScore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(JobInfo other) {
|
||||
int result = this.jobId.compareTo(other.jobId);
|
||||
if (result == 0) {
|
||||
result = Double.compare(this.maxAnomalyScore, other.maxAnomalyScore);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,9 @@ public class RestGetOverallBucketsAction extends BaseRestHandler {
|
|||
} else {
|
||||
request = new Request(jobId);
|
||||
request.setTopN(restRequest.paramAsInt(Request.TOP_N.getPreferredName(), request.getTopN()));
|
||||
if (restRequest.hasParam(Request.BUCKET_SPAN.getPreferredName())) {
|
||||
request.setBucketSpan(restRequest.param(Request.BUCKET_SPAN.getPreferredName()));
|
||||
}
|
||||
request.setOverallScore(Double.parseDouble(restRequest.param(Request.OVERALL_SCORE.getPreferredName(), "0.0")));
|
||||
request.setExcludeInterim(restRequest.paramAsBoolean(Request.EXCLUDE_INTERIM.getPreferredName(), request.isExcludeInterim()));
|
||||
if (restRequest.hasParam(Request.START.getPreferredName())) {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||
import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction.Request;
|
||||
|
@ -18,7 +19,9 @@ public class GetOverallBucketsActionRequestTests extends AbstractStreamableXCont
|
|||
if (randomBoolean()) {
|
||||
request.setTopN(randomIntBetween(1, 1000));
|
||||
}
|
||||
request.setAllowNoJobs(randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
request.setBucketSpan(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.setStart(randomNonNegativeLong());
|
||||
}
|
||||
|
@ -31,6 +34,7 @@ public class GetOverallBucketsActionRequestTests extends AbstractStreamableXCont
|
|||
if (randomBoolean()) {
|
||||
request.setEnd(randomNonNegativeLong());
|
||||
}
|
||||
request.setAllowNoJobs(randomBoolean());
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction;
|
||||
import org.elasticsearch.xpack.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Tests that overall bucket results are calculated correctly
|
||||
* for jobs that have many buckets.
|
||||
*/
|
||||
public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase {
|
||||
|
||||
private static final String JOB_ID = "overall-buckets-test";
|
||||
private static final long BUCKET_SPAN_SECONDS = 3600;
|
||||
|
||||
@After
|
||||
public void cleanUpTest() throws Exception {
|
||||
cleanUp();
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
|
||||
Collections.singletonList(new Detector.Builder("count", null).build()));
|
||||
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setTimeFormat("epoch");
|
||||
Job.Builder job = new Job.Builder(JOB_ID);
|
||||
job.setAnalysisConfig(analysisConfig);
|
||||
job.setDataDescription(dataDescription);
|
||||
|
||||
registerJob(job);
|
||||
putJob(job);
|
||||
openJob(job.getId());
|
||||
|
||||
long timestamp = 1483228800000L; // 2017-01-01T00:00:00Z
|
||||
List<String> data = new ArrayList<>();
|
||||
for (int i = 0; i < 3000; i++) {
|
||||
data.add(createJsonRecord(createRecord(timestamp)));
|
||||
if (i % 1000 == 0) {
|
||||
data.add(createJsonRecord(createRecord(timestamp)));
|
||||
data.add(createJsonRecord(createRecord(timestamp)));
|
||||
data.add(createJsonRecord(createRecord(timestamp)));
|
||||
}
|
||||
timestamp += BUCKET_SPAN_SECONDS;
|
||||
}
|
||||
|
||||
postData(job.getId(), data.stream().collect(Collectors.joining()));
|
||||
flushJob(job.getId(), true);
|
||||
closeJob(job.getId());
|
||||
|
||||
GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
|
||||
request.setPageParams(new PageParams(0, 3000));
|
||||
assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(3000L));
|
||||
|
||||
{
|
||||
// Check we get equal number of overall buckets on a default request
|
||||
GetOverallBucketsAction.Request overallBucketsRequest = new GetOverallBucketsAction.Request(job.getId());
|
||||
GetOverallBucketsAction.Response overallBucketsResponse = client().execute(
|
||||
GetOverallBucketsAction.INSTANCE, overallBucketsRequest).actionGet();
|
||||
assertThat(overallBucketsResponse.getOverallBuckets().count(), equalTo(3000L));
|
||||
}
|
||||
|
||||
{
|
||||
// Check overall buckets are half when the bucket_span is set to double the job bucket span
|
||||
GetOverallBucketsAction.Request aggregatedOverallBucketsRequest = new GetOverallBucketsAction.Request(job.getId());
|
||||
aggregatedOverallBucketsRequest.setBucketSpan(TimeValue.timeValueSeconds(2 * BUCKET_SPAN_SECONDS));
|
||||
GetOverallBucketsAction.Response aggregatedOverallBucketsResponse = client().execute(
|
||||
GetOverallBucketsAction.INSTANCE, aggregatedOverallBucketsRequest).actionGet();
|
||||
assertThat(aggregatedOverallBucketsResponse.getOverallBuckets().count(), equalTo(1500L));
|
||||
}
|
||||
|
||||
{
|
||||
// Check overall score filtering works when chunking takes place
|
||||
GetOverallBucketsAction.Request filteredOverallBucketsRequest = new GetOverallBucketsAction.Request(job.getId());
|
||||
filteredOverallBucketsRequest.setOverallScore(0.1);
|
||||
GetOverallBucketsAction.Response filteredOverallBucketsResponse = client().execute(
|
||||
GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet();
|
||||
assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L));
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> createRecord(long timestamp) {
|
||||
Map<String, Object> record = new HashMap<>();
|
||||
record.put("time", timestamp);
|
||||
return record;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket.JobInfo;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class OverallBucketsAggregatorTests extends ESTestCase {
|
||||
|
||||
public void testProcess_GivenEmpty() {
|
||||
OverallBucketsAggregator aggregator = new OverallBucketsAggregator(TimeValue.timeValueHours(1));
|
||||
aggregator.process(Collections.emptyList());
|
||||
List<OverallBucket> aggregated = aggregator.finish();
|
||||
assertThat(aggregated.isEmpty(), is(true));
|
||||
}
|
||||
|
||||
public void testProcess_GivenAggSpanIsTwiceTheBucketSpan() {
|
||||
// Monday, October 16, 2017 12:00:00 AM UTC
|
||||
long startTime = 1508112000000L;
|
||||
|
||||
List<OverallBucket> rawBuckets1 = new ArrayList<>();
|
||||
List<OverallBucket> rawBuckets2 = new ArrayList<>();
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime), 3600L, 10.0,
|
||||
Arrays.asList(new OverallBucket.JobInfo("job_1", 10.0),
|
||||
new OverallBucket.JobInfo("job_2", 6.0)),
|
||||
false));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(1).millis()), 3600L, 20.0,
|
||||
Arrays.asList(new JobInfo("job_1", 20.0), new JobInfo("job_2", 2.0)),
|
||||
false));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(2).millis()), 3600L, 30.0,
|
||||
Arrays.asList(new JobInfo("job_1", 30.0), new JobInfo("job_2", 7.0)),
|
||||
false));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(3).millis()), 3600L, 40.0,
|
||||
Arrays.asList(new JobInfo("job_1", 10.0), new JobInfo("job_2", 40.0)),
|
||||
false));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(4).millis()), 3600L, 50.0,
|
||||
Collections.singletonList(new JobInfo("job_1", 50.0)), false));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(5).millis()), 3600L, 60.0,
|
||||
Collections.singletonList(new JobInfo("job_1", 60.0)), true));
|
||||
rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(6).millis()), 3600L, 70.0,
|
||||
Arrays.asList(new JobInfo("job_1", 70.0), new JobInfo("job_2", 0.0)),
|
||||
true));
|
||||
|
||||
|
||||
TimeValue bucketSpan = TimeValue.timeValueHours(2);
|
||||
OverallBucketsAggregator aggregator = new OverallBucketsAggregator(bucketSpan);
|
||||
aggregator.process(rawBuckets1);
|
||||
aggregator.process(rawBuckets2);
|
||||
List<OverallBucket> aggregated = aggregator.finish();
|
||||
|
||||
assertThat(aggregated.size(), equalTo(4));
|
||||
assertThat(aggregated.get(0).getTimestamp().getTime(), equalTo(startTime));
|
||||
assertThat(aggregated.get(0).getOverallScore(), equalTo(20.0));
|
||||
assertThat(aggregated.get(0).getJobs(), contains(new JobInfo("job_1", 20.0),
|
||||
new JobInfo("job_2", 6.0)));
|
||||
assertThat(aggregated.get(0).isInterim(), is(false));
|
||||
assertThat(aggregated.get(1).getTimestamp().getTime(), equalTo(startTime + bucketSpan.millis()));
|
||||
assertThat(aggregated.get(1).getOverallScore(), equalTo(40.0));
|
||||
assertThat(aggregated.get(1).getJobs(), contains(new JobInfo("job_1", 30.0),
|
||||
new JobInfo("job_2", 40.0)));
|
||||
assertThat(aggregated.get(1).isInterim(), is(false));
|
||||
assertThat(aggregated.get(2).getTimestamp().getTime(), equalTo(startTime + 2 * bucketSpan.millis()));
|
||||
assertThat(aggregated.get(2).getOverallScore(), equalTo(60.0));
|
||||
assertThat(aggregated.get(2).getJobs().size(), equalTo(1));
|
||||
assertThat(aggregated.get(2).getJobs(), contains(new JobInfo("job_1", 60.0)));
|
||||
assertThat(aggregated.get(2).isInterim(), is(true));
|
||||
assertThat(aggregated.get(3).getTimestamp().getTime(), equalTo(startTime + 3 * bucketSpan.millis()));
|
||||
assertThat(aggregated.get(3).getOverallScore(), equalTo(70.0));
|
||||
assertThat(aggregated.get(3).getJobs(), contains(new JobInfo("job_1", 70.0),
|
||||
new JobInfo("job_2", 0.0)));
|
||||
assertThat(aggregated.get(3).isInterim(), is(true));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.results.OverallBucket;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class OverallBucketsCollectorTests extends ESTestCase {
|
||||
|
||||
public void testProcess_GivenEmpty() {
|
||||
OverallBucketsCollector collector = new OverallBucketsCollector();
|
||||
collector.process(Collections.emptyList());
|
||||
assertThat(collector.finish().isEmpty(), is(true));
|
||||
}
|
||||
|
||||
public void testProcess_GivenTwoBatches() {
|
||||
OverallBucket b1 = mock(OverallBucket.class);
|
||||
OverallBucket b2 = mock(OverallBucket.class);
|
||||
OverallBucket b3 = mock(OverallBucket.class);
|
||||
OverallBucket b4 = mock(OverallBucket.class);
|
||||
OverallBucket b5 = mock(OverallBucket.class);
|
||||
|
||||
OverallBucketsCollector collector = new OverallBucketsCollector();
|
||||
collector.process(Arrays.asList(b1, b2, b3));
|
||||
collector.process(Arrays.asList(b4, b5));
|
||||
|
||||
assertThat(collector.finish(), equalTo(Arrays.asList(b1, b2, b3, b4, b5)));
|
||||
}
|
||||
}
|
|
@ -3,14 +3,14 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
package org.elasticsearch.xpack.ml.job.persistence.overallbuckets;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction.TransportAction.TopNScores;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider.TopNScores;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class GetOverallBucketsActionTests extends ESTestCase {
|
||||
public class OverallBucketsProviderTests extends ESTestCase {
|
||||
|
||||
public void testTopNScores() {
|
||||
TopNScores topNScores = new TopNScores(3);
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.results;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobTests;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
public class OverallBucketTests extends AbstractWireSerializingTestCase<OverallBucket> {
|
||||
|
||||
@Override
|
||||
protected OverallBucket createTestInstance() {
|
||||
int jobCount = randomIntBetween(0, 10);
|
||||
List<OverallBucket.JobInfo> jobs = new ArrayList<>(jobCount);
|
||||
for (int i = 0; i < jobCount; ++i) {
|
||||
jobs.add(new OverallBucket.JobInfo(JobTests.randomValidJobId(), randomDoubleBetween(0.0, 100.0, true)));
|
||||
}
|
||||
return new OverallBucket(new Date(randomNonNegativeLong()),
|
||||
randomIntBetween(60, 24 * 3600),
|
||||
randomDoubleBetween(0.0, 100.0, true),
|
||||
jobs,
|
||||
randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<OverallBucket> instanceReader() {
|
||||
return OverallBucket::new;
|
||||
}
|
||||
|
||||
public void testCompareTo() {
|
||||
OverallBucket.JobInfo jobInfo1 = new OverallBucket.JobInfo("aaa", 1.0);
|
||||
OverallBucket.JobInfo jobInfo2 = new OverallBucket.JobInfo("aaa", 3.0);
|
||||
OverallBucket.JobInfo jobInfo3 = new OverallBucket.JobInfo("bbb", 1.0);
|
||||
assertThat(jobInfo1.compareTo(jobInfo1), equalTo(0));
|
||||
assertThat(jobInfo1.compareTo(jobInfo2), lessThan(0));
|
||||
assertThat(jobInfo1.compareTo(jobInfo3), lessThan(0));
|
||||
assertThat(jobInfo2.compareTo(jobInfo3), lessThan(0));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,10 @@
|
|||
"type": "int",
|
||||
"description": "The number of top job bucket scores to be used in the overall_score calculation"
|
||||
},
|
||||
"bucket_span": {
|
||||
"type": "string",
|
||||
"description": "The span of the overall buckets. Defaults to the longest job bucket_span"
|
||||
},
|
||||
"overall_score": {
|
||||
"type": "double",
|
||||
"description": "Returns overall buckets with overall scores higher than this value"
|
||||
|
@ -37,7 +41,6 @@
|
|||
},
|
||||
"allow_no_jobs": {
|
||||
"type": "boolean",
|
||||
"required": false,
|
||||
"description": "Whether to ignore if a wildcard expression matches no jobs. (This includes `_all` string or when no jobs have been specified)"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -518,3 +518,99 @@ setup:
|
|||
xpack.ml.get_overall_buckets:
|
||||
job_id: "jobs-get-result-overall-buckets-*"
|
||||
end: "invalid"
|
||||
|
||||
---
|
||||
"Test overall buckets given bucket_span":
|
||||
- do:
|
||||
xpack.ml.get_overall_buckets:
|
||||
job_id: "jobs-get-result-overall-buckets-*"
|
||||
bucket_span: "2h"
|
||||
- match: { count: 2 }
|
||||
- match: { overall_buckets.0.timestamp: 1464739200000 }
|
||||
- match: { overall_buckets.0.bucket_span: 7200 }
|
||||
- match: { overall_buckets.0.overall_score: 30.0 }
|
||||
- length: { overall_buckets.0.jobs: 3}
|
||||
- match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 }
|
||||
- match: {overall_buckets.0.jobs.0.max_anomaly_score: 0.0 }
|
||||
- match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 }
|
||||
- match: {overall_buckets.0.jobs.1.max_anomaly_score: 0.0 }
|
||||
- match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 }
|
||||
- match: {overall_buckets.0.jobs.2.max_anomaly_score: 30.0 }
|
||||
- match: { overall_buckets.0.is_interim: false }
|
||||
- match: { overall_buckets.0.result_type: overall_bucket }
|
||||
- match: { overall_buckets.1.timestamp: 1464746400000 }
|
||||
- match: { overall_buckets.1.bucket_span: 7200 }
|
||||
- match: { overall_buckets.1.overall_score: 60.0 }
|
||||
- length: { overall_buckets.1.jobs: 3}
|
||||
- match: {overall_buckets.1.jobs.0.job_id: jobs-get-result-overall-buckets-17 }
|
||||
- match: {overall_buckets.1.jobs.0.max_anomaly_score: 60.0 }
|
||||
- match: {overall_buckets.1.jobs.1.job_id: jobs-get-result-overall-buckets-30 }
|
||||
- match: {overall_buckets.1.jobs.1.max_anomaly_score: 40.0 }
|
||||
- match: {overall_buckets.1.jobs.2.job_id: jobs-get-result-overall-buckets-60 }
|
||||
- match: {overall_buckets.1.jobs.2.max_anomaly_score: 20.0 }
|
||||
- match: { overall_buckets.1.is_interim: true }
|
||||
- match: { overall_buckets.1.result_type: overall_bucket }
|
||||
|
||||
---
|
||||
"Test overall buckets given bucket_span and top_n is 2":
|
||||
- do:
|
||||
xpack.ml.get_overall_buckets:
|
||||
job_id: "jobs-get-result-overall-buckets-*"
|
||||
top_n: 2
|
||||
bucket_span: "2h"
|
||||
|
||||
- match: { count: 2 }
|
||||
- match: { overall_buckets.0.timestamp: 1464739200000 }
|
||||
- match: { overall_buckets.0.bucket_span: 7200 }
|
||||
- match: { overall_buckets.0.overall_score: 30.0 }
|
||||
- length: { overall_buckets.0.jobs: 3}
|
||||
- match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 }
|
||||
- match: {overall_buckets.0.jobs.0.max_anomaly_score: 0.0 }
|
||||
- match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 }
|
||||
- match: {overall_buckets.0.jobs.1.max_anomaly_score: 0.0 }
|
||||
- match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 }
|
||||
- match: {overall_buckets.0.jobs.2.max_anomaly_score: 30.0 }
|
||||
- match: { overall_buckets.0.is_interim: false }
|
||||
- match: { overall_buckets.0.result_type: overall_bucket }
|
||||
- match: { overall_buckets.1.timestamp: 1464746400000 }
|
||||
- match: { overall_buckets.1.bucket_span: 7200 }
|
||||
- match: { overall_buckets.1.overall_score: 50.0 }
|
||||
- length: { overall_buckets.1.jobs: 3}
|
||||
- match: {overall_buckets.1.jobs.0.job_id: jobs-get-result-overall-buckets-17 }
|
||||
- match: {overall_buckets.1.jobs.0.max_anomaly_score: 60.0 }
|
||||
- match: {overall_buckets.1.jobs.1.job_id: jobs-get-result-overall-buckets-30 }
|
||||
- match: {overall_buckets.1.jobs.1.max_anomaly_score: 40.0 }
|
||||
- match: {overall_buckets.1.jobs.2.job_id: jobs-get-result-overall-buckets-60 }
|
||||
- match: {overall_buckets.1.jobs.2.max_anomaly_score: 20.0 }
|
||||
- match: { overall_buckets.1.is_interim: true }
|
||||
- match: { overall_buckets.1.result_type: overall_bucket }
|
||||
|
||||
---
|
||||
"Test overall buckets given bucket_span and overall_score filter":
|
||||
- do:
|
||||
xpack.ml.get_overall_buckets:
|
||||
job_id: "jobs-get-result-overall-buckets-*"
|
||||
bucket_span: "2h"
|
||||
overall_score: "41.0"
|
||||
|
||||
- match: { count: 1 }
|
||||
- match: { overall_buckets.0.timestamp: 1464746400000 }
|
||||
- match: { overall_buckets.0.bucket_span: 7200 }
|
||||
- match: { overall_buckets.0.overall_score: 60.0 }
|
||||
- length: { overall_buckets.0.jobs: 3}
|
||||
- match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 }
|
||||
- match: {overall_buckets.0.jobs.0.max_anomaly_score: 60.0 }
|
||||
- match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 }
|
||||
- match: {overall_buckets.0.jobs.1.max_anomaly_score: 40.0 }
|
||||
- match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 }
|
||||
- match: {overall_buckets.0.jobs.2.max_anomaly_score: 20.0 }
|
||||
- match: { overall_buckets.0.is_interim: true }
|
||||
- match: { overall_buckets.0.result_type: overall_bucket }
|
||||
|
||||
---
|
||||
"Test overall buckets given bucket_span is smaller than max job bucket_span":
|
||||
- do:
|
||||
catch: /.*Param \[bucket_span\] must be greater or equal to the max bucket_span \[60m\]*/
|
||||
xpack.ml.get_overall_buckets:
|
||||
job_id: "jobs-get-result-overall-buckets-*"
|
||||
bucket_span: "59m"
|
||||
|
|
|
@ -52,6 +52,7 @@ integTestRunner {
|
|||
'ml/jobs_get_result_overall_buckets/Test overall buckets given top_n is negative',
|
||||
'ml/jobs_get_result_overall_buckets/Test overall buckets given invalid start param',
|
||||
'ml/jobs_get_result_overall_buckets/Test overall buckets given invalid end param',
|
||||
'ml/jobs_get_result_overall_buckets/Test overall buckets given bucket_span is smaller than max job bucket_span',
|
||||
'ml/jobs_get_stats/Test get job stats given missing job',
|
||||
'ml/jobs_get_stats/Test no exception on get job stats with missing index',
|
||||
'ml/job_groups/Test put job with empty group',
|
||||
|
|
Loading…
Reference in New Issue