From c7e94b3b4c4728a046a3984d403e8ed33c738496 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 27 Oct 2017 11:14:13 +0100 Subject: [PATCH] [ML] Enable overall buckets aggregation at a custom bucket span (elastic/x-pack-elasticsearch#2782) For the purpose of getting this API consumed by our UI, returning overall buckets that match the job's largest `bucket_span` can result in too much data. The UI only ever displays a few buckets in the swimlane. Their span depends on the time range selected and the screen resolution, but it will only ever be a relatively low number. This PR adds the ability to aggregate overall buckets in a user specified `bucket_span`. That `bucket_span` may be equal or greater to the largest job's `bucket_span`. The `overall_score` of the result overall buckets is the max score of the corresponding overall buckets with a span equal to the job's largest `bucket_span`. The implementation is now chunking the bucket requests as otherwise the aggregation would fail when too many buckets are matching. Original commit: elastic/x-pack-elasticsearch@981f7a40e5d733c755992a83171891cc332d366d --- .../rest-api/ml/get-overall-buckets.asciidoc | 9 + .../ml/action/GetOverallBucketsAction.java | 291 ++++++++++++------ .../OverallBucketsAggregator.java | 89 ++++++ .../OverallBucketsCollector.java | 31 ++ .../OverallBucketsProcessor.java | 17 + .../OverallBucketsProvider.java | 90 ++++++ .../xpack/ml/job/results/OverallBucket.java | 23 +- .../results/RestGetOverallBucketsAction.java | 3 + .../GetOverallBucketsActionRequestTests.java | 6 +- .../ml/integration/OverallBucketsIT.java | 107 +++++++ .../OverallBucketsAggregatorTests.java | 88 ++++++ .../OverallBucketsCollectorTests.java | 39 +++ .../OverallBucketsProviderTests.java} | 6 +- .../ml/job/results/OverallBucketTests.java | 49 +++ .../api/xpack.ml.get_overall_buckets.json | 5 +- .../ml/jobs_get_result_overall_buckets.yml | 96 ++++++ qa/smoke-test-ml-with-security/build.gradle | 1 + 17 files changed, 857 insertions(+), 93 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregator.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollector.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProcessor.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProvider.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregatorTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollectorTests.java rename plugin/src/test/java/org/elasticsearch/xpack/ml/{action/GetOverallBucketsActionTests.java => job/persistence/overallbuckets/OverallBucketsProviderTests.java} (81%) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/OverallBucketTests.java diff --git a/docs/en/rest-api/ml/get-overall-buckets.asciidoc b/docs/en/rest-api/ml/get-overall-buckets.asciidoc index 68115265a00..5ced48c12bd 100644 --- a/docs/en/rest-api/ml/get-overall-buckets.asciidoc +++ b/docs/en/rest-api/ml/get-overall-buckets.asciidoc @@ -32,6 +32,11 @@ score in the overall bucket. Alternatively, if you set `top_n` to the number of jobs, the `overall_score` is high only when all jobs detect anomalies in that overall bucket. +In addition, the optional parameter `bucket_span` may be used in order +to request overall buckets that span longer than the largest job's `bucket_span`. +When set, the `overall_score` will be the max `overall_score` of the corresponding +overall buckets with a span equal to the largest job's `bucket_span`. + ==== Path Parameters `job_id`:: @@ -44,6 +49,10 @@ overall bucket. (boolean) If `false` and the `job_id` does not match any job an error will be returned. The default value is `true`. +`bucket_span`:: + (string) The span of the overall buckets. Must be greater or equal + to the largest job's `bucket_span`. Defaults to the largest job's `bucket_span`. + `end`:: (string) Returns overall buckets with timestamps earlier than this time. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsAction.java index 73935a22f09..01e8c7d0c26 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -14,7 +13,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; @@ -38,11 +36,12 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.max.Max; +import org.elasticsearch.search.aggregations.metrics.min.Min; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; @@ -50,26 +49,37 @@ import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsAggregator; +import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsCollector; +import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProcessor; +import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.OverallBucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.Intervals; -import org.joda.time.DateTime; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.function.LongSupplier; /** + *

* This action returns summarized bucket results over multiple jobs. * Overall buckets have the span of the largest job's bucket_span. * Their score is calculated by finding the max anomaly score per job * and then averaging the top N. + *

+ *

+ * Overall buckets can be optionally aggregated into larger intervals + * by setting the bucket_span parameter. When that is the case, the + * overall_score is the max of the overall buckets that are within + * the interval. + *

*/ public class GetOverallBucketsAction extends Action { @@ -94,6 +104,7 @@ public class GetOverallBucketsAction public static class Request extends ActionRequest implements ToXContentObject { public static final ParseField TOP_N = new ParseField("top_n"); + public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField OVERALL_SCORE = new ParseField("overall_score"); public static final ParseField EXCLUDE_INTERIM = new ParseField("exclude_interim"); public static final ParseField START = new ParseField("start"); @@ -105,6 +116,7 @@ public class GetOverallBucketsAction static { PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID); PARSER.declareInt(Request::setTopN, TOP_N); + PARSER.declareString(Request::setBucketSpan, BUCKET_SPAN); PARSER.declareDouble(Request::setOverallScore, OVERALL_SCORE); PARSER.declareBoolean(Request::setExcludeInterim, EXCLUDE_INTERIM); PARSER.declareString((request, startTime) -> request.setStart(parseDateOrThrow( @@ -135,6 +147,7 @@ public class GetOverallBucketsAction private String jobId; private int topN = 1; + private TimeValue bucketSpan; private double overallScore = 0.0; private boolean excludeInterim = false; private Long start; @@ -163,6 +176,18 @@ public class GetOverallBucketsAction this.topN = topN; } + public TimeValue getBucketSpan() { + return bucketSpan; + } + + public void setBucketSpan(TimeValue bucketSpan) { + this.bucketSpan = bucketSpan; + } + + public void setBucketSpan(String bucketSpan) { + this.bucketSpan = TimeValue.parseTimeValue(bucketSpan, BUCKET_SPAN.getPreferredName()); + } + public double getOverallScore() { return overallScore; } @@ -221,6 +246,7 @@ public class GetOverallBucketsAction super.readFrom(in); jobId = in.readString(); topN = in.readVInt(); + bucketSpan = in.readOptionalWriteable(TimeValue::new); overallScore = in.readDouble(); excludeInterim = in.readBoolean(); start = in.readOptionalLong(); @@ -233,6 +259,7 @@ public class GetOverallBucketsAction super.writeTo(out); out.writeString(jobId); out.writeVInt(topN); + out.writeOptionalWriteable(bucketSpan); out.writeDouble(overallScore); out.writeBoolean(excludeInterim); out.writeOptionalLong(start); @@ -245,6 +272,9 @@ public class GetOverallBucketsAction builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(TOP_N.getPreferredName(), topN); + if (bucketSpan != null) { + builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan.getStringRep()); + } builder.field(OVERALL_SCORE.getPreferredName(), overallScore); builder.field(EXCLUDE_INTERIM.getPreferredName(), excludeInterim); if (start != null) { @@ -260,7 +290,7 @@ public class GetOverallBucketsAction @Override public int hashCode() { - return Objects.hash(jobId, topN, overallScore, excludeInterim, start, end, allowNoJobs); + return Objects.hash(jobId, topN, bucketSpan, overallScore, excludeInterim, start, end, allowNoJobs); } @Override @@ -274,6 +304,7 @@ public class GetOverallBucketsAction Request that = (Request) other; return Objects.equals(jobId, that.jobId) && this.topN == that.topN && + Objects.equals(bucketSpan, that.bucketSpan) && this.excludeInterim == that.excludeInterim && this.overallScore == that.overallScore && Objects.equals(start, that.start) && @@ -301,6 +332,10 @@ public class GetOverallBucketsAction this.overallBuckets = overallBuckets; } + public QueryPage getOverallBuckets() { + return overallBuckets; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -346,6 +381,9 @@ public class GetOverallBucketsAction public static class TransportAction extends HandledTransportAction { + private static final String EARLIEST_TIME = "earliest_time"; + private static final String LATEST_TIME = "latest_time"; + private final Client client; private final ClusterService clusterService; private final JobManager jobManager; @@ -368,113 +406,192 @@ public class GetOverallBucketsAction return; } - List indices = new ArrayList<>(); - TimeValue maxBucketSpan = TimeValue.ZERO; - for (Job job : jobsPage.results()) { - indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); - TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); - if (maxBucketSpan.compareTo(bucketSpan) < 0) { - maxBucketSpan = bucketSpan; + // As computing and potentially aggregating overall buckets might take a while, + // we run in a different thread to avoid blocking the network thread. + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { + try { + getOverallBuckets(request, jobsPage.results(), listener); + } catch (Exception e) { + listener.onFailure(e); } - } - final long maxBucketSpanSeconds = maxBucketSpan.seconds(); + }); + } - SearchRequest searchRequest = buildSearchRequest(request, maxBucketSpan.millis(), indices); - client.search(searchRequest, ActionListener.wrap(searchResponse -> { - List overallBuckets = computeOverallBuckets(request, maxBucketSpanSeconds, searchResponse); + private void getOverallBuckets(Request request, List jobs, ActionListener listener) { + JobsContext jobsContext = JobsContext.build(jobs, request); + + ActionListener> overallBucketsListener = ActionListener.wrap(overallBuckets -> { listener.onResponse(new Response(new QueryPage<>(overallBuckets, overallBuckets.size(), OverallBucket.RESULTS_FIELD))); + }, listener::onFailure); + + ActionListener chunkedBucketSearcherListener = ActionListener.wrap(searcher -> { + if (searcher == null) { + listener.onResponse(new Response()); + return; + } + searcher.searchAndComputeOverallBuckets(overallBucketsListener); + }, listener::onFailure); + + OverallBucketsProvider overallBucketsProvider = new OverallBucketsProvider(jobsContext.maxBucketSpan, request.getTopN(), + request.getOverallScore()); + OverallBucketsProcessor overallBucketsProcessor = requiresAggregation(request, jobsContext.maxBucketSpan) ? + new OverallBucketsAggregator(request.getBucketSpan()): new OverallBucketsCollector(); + initChunkedBucketSearcher(request, jobsContext, overallBucketsProvider, overallBucketsProcessor, chunkedBucketSearcherListener); + } + + private static boolean requiresAggregation(Request request, TimeValue maxBucketSpan) { + return request.getBucketSpan() != null && !request.getBucketSpan().equals(maxBucketSpan); + } + + private static void checkValidBucketSpan(TimeValue bucketSpan, TimeValue maxBucketSpan) { + if (bucketSpan != null && bucketSpan.compareTo(maxBucketSpan) < 0) { + throw ExceptionsHelper.badRequestException("Param [{}] must be greater or equal to the max bucket_span [{}]", + Request.BUCKET_SPAN, maxBucketSpan.getStringRep()); + } + } + + private void initChunkedBucketSearcher(Request request, JobsContext jobsContext, OverallBucketsProvider overallBucketsProvider, + OverallBucketsProcessor overallBucketsProcessor, + ActionListener listener) { + long maxBucketSpanMillis = jobsContext.maxBucketSpan.millis(); + SearchRequest searchRequest = buildSearchRequest(request.getStart(), request.getEnd(), request.isExcludeInterim(), + maxBucketSpanMillis, jobsContext.indices); + searchRequest.source().aggregation(AggregationBuilders.min(EARLIEST_TIME).field(Result.TIMESTAMP.getPreferredName())); + searchRequest.source().aggregation(AggregationBuilders.max(LATEST_TIME).field(Result.TIMESTAMP.getPreferredName())); + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + long totalHits = searchResponse.getHits().getTotalHits(); + if (totalHits > 0) { + Aggregations aggregations = searchResponse.getAggregations(); + Min min = aggregations.get(EARLIEST_TIME); + long earliestTime = Intervals.alignToFloor((long) min.getValue(), maxBucketSpanMillis); + Max max = aggregations.get(LATEST_TIME); + long latestTime = Intervals.alignToCeil((long) max.getValue() + 1, maxBucketSpanMillis); + listener.onResponse(new ChunkedBucketSearcher(jobsContext, earliestTime, latestTime, request.isExcludeInterim(), + overallBucketsProvider, overallBucketsProcessor)); + } else { + listener.onResponse(null); + } }, listener::onFailure)); } - private static SearchRequest buildSearchRequest(Request request, long bucketSpanMillis, List indices) { - String startTime = request.getStart() == null ? null : String.valueOf( - Intervals.alignToCeil(request.getStart(), bucketSpanMillis)); - String endTime = request.getEnd() == null ? null : String.valueOf(Intervals.alignToFloor(request.getEnd(), bucketSpanMillis)); + private static class JobsContext { + private final int jobCount; + private final String[] indices; + private final TimeValue maxBucketSpan; + + private JobsContext(int jobCount, String[] indices, TimeValue maxBucketSpan) { + this.jobCount = jobCount; + this.indices = indices; + this.maxBucketSpan = maxBucketSpan; + } + + private static JobsContext build(List jobs, Request request) { + Set indices = new HashSet<>(); + TimeValue maxBucketSpan = TimeValue.ZERO; + for (Job job : jobs) { + indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); + TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); + if (maxBucketSpan.compareTo(bucketSpan) < 0) { + maxBucketSpan = bucketSpan; + } + } + checkValidBucketSpan(request.getBucketSpan(), maxBucketSpan); + + // If top_n is 1, we can use the request bucket_span in order to optimize the aggregations + if (request.getBucketSpan() != null && (request.getTopN() == 1 || jobs.size() <= 1)) { + maxBucketSpan = request.getBucketSpan(); + } + + return new JobsContext(jobs.size(), indices.toArray(new String[indices.size()]), maxBucketSpan); + } + } + + private class ChunkedBucketSearcher { + + private static final int BUCKETS_PER_CHUNK = 1000; + private static final int MAX_RESULT_COUNT = 10000; + + private final String[] indices; + private final long maxBucketSpanMillis; + private final boolean excludeInterim; + private final long chunkMillis; + private final long endTime; + private volatile long curTime; + private final AggregationBuilder aggs; + private final OverallBucketsProvider overallBucketsProvider; + private final OverallBucketsProcessor overallBucketsProcessor; + + ChunkedBucketSearcher(JobsContext jobsContext, long startTime, long endTime, + boolean excludeInterim, OverallBucketsProvider overallBucketsProvider, + OverallBucketsProcessor overallBucketsProcessor) { + this.indices = jobsContext.indices; + this.maxBucketSpanMillis = jobsContext.maxBucketSpan.millis(); + this.chunkMillis = BUCKETS_PER_CHUNK * maxBucketSpanMillis; + this.endTime = endTime; + this.curTime = startTime; + this.excludeInterim = excludeInterim; + this.aggs = buildAggregations(maxBucketSpanMillis, jobsContext.jobCount); + this.overallBucketsProvider = overallBucketsProvider; + this.overallBucketsProcessor = overallBucketsProcessor; + } + + void searchAndComputeOverallBuckets(ActionListener> listener) { + if (curTime >= endTime) { + listener.onResponse(overallBucketsProcessor.finish()); + return; + } + client.search(nextSearch(), ActionListener.wrap(searchResponse -> { + Histogram histogram = searchResponse.getAggregations().get(Result.TIMESTAMP.getPreferredName()); + overallBucketsProcessor.process(overallBucketsProvider.computeOverallBuckets(histogram)); + if (overallBucketsProcessor.size() > MAX_RESULT_COUNT) { + listener.onFailure(ExceptionsHelper.badRequestException("Unable to return more than [{}] results; please use " + + "parameters [{}] and [{}] to limit the time range", MAX_RESULT_COUNT, Request.START, Request.END)); + return; + } + searchAndComputeOverallBuckets(listener); + }, listener::onFailure)); + } + + SearchRequest nextSearch() { + long curEnd = Math.min(curTime + chunkMillis, endTime); + logger.debug("Search for buckets in: [{}, {})", curTime, curEnd); + SearchRequest searchRequest = buildSearchRequest(curTime, curEnd, excludeInterim, maxBucketSpanMillis, indices); + searchRequest.source().aggregation(aggs); + curTime += chunkMillis; + return searchRequest; + } + } + + private static SearchRequest buildSearchRequest(Long start, Long end, boolean excludeInterim, long bucketSpanMillis, + String[] indices) { + String startTime = start == null ? null : String.valueOf(Intervals.alignToCeil(start, bucketSpanMillis)); + String endTime = end == null ? null : String.valueOf(Intervals.alignToFloor(end, bucketSpanMillis)); SearchSourceBuilder searchSourceBuilder = new BucketsQueryBuilder() .size(0) - .includeInterim(request.isExcludeInterim() == false) + .includeInterim(excludeInterim == false) .start(startTime) .end(endTime) .build(); - searchSourceBuilder.aggregation(buildAggregations(bucketSpanMillis)); - SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[indices.size()])); + SearchRequest searchRequest = new SearchRequest(indices); searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); searchRequest.source(searchSourceBuilder); return searchRequest; } - private static AggregationBuilder buildAggregations(long bucketSpanMillis) { + private static AggregationBuilder buildAggregations(long maxBucketSpanMillis, int jobCount) { AggregationBuilder overallScoreAgg = AggregationBuilders.max(OverallBucket.OVERALL_SCORE.getPreferredName()) .field(Bucket.ANOMALY_SCORE.getPreferredName()); AggregationBuilder jobsAgg = AggregationBuilders.terms(Job.ID.getPreferredName()) - .field(Job.ID.getPreferredName()).subAggregation(overallScoreAgg); + .field(Job.ID.getPreferredName()).size(jobCount).subAggregation(overallScoreAgg); AggregationBuilder interimAgg = AggregationBuilders.max(Result.IS_INTERIM.getPreferredName()) .field(Result.IS_INTERIM.getPreferredName()); return AggregationBuilders.dateHistogram(Result.TIMESTAMP.getPreferredName()) .field(Result.TIMESTAMP.getPreferredName()) - .interval(bucketSpanMillis) + .interval(maxBucketSpanMillis) .subAggregation(jobsAgg) .subAggregation(interimAgg); } - - private List computeOverallBuckets(Request request, long bucketSpanSeconds, SearchResponse searchResponse) { - List overallBuckets = new ArrayList<>(); - Histogram histogram = searchResponse.getAggregations().get(Result.TIMESTAMP.getPreferredName()); - for (Histogram.Bucket histogramBucket : histogram.getBuckets()) { - Aggregations histogramBucketAggs = histogramBucket.getAggregations(); - Terms jobsAgg = histogramBucketAggs.get(Job.ID.getPreferredName()); - int jobsCount = jobsAgg.getBuckets().size(); - int topN = Math.min(request.getTopN(), jobsCount); - List jobs = new ArrayList<>(jobsCount); - TopNScores topNScores = new TopNScores(topN); - for (Terms.Bucket jobsBucket : jobsAgg.getBuckets()) { - Max maxScore = jobsBucket.getAggregations().get(OverallBucket.OVERALL_SCORE.getPreferredName()); - topNScores.insertWithOverflow(maxScore.getValue()); - jobs.add(new OverallBucket.JobInfo((String) jobsBucket.getKey(), maxScore.getValue())); - } - - double overallScore = topNScores.overallScore(); - if (overallScore < request.getOverallScore()) { - continue; - } - - Max interimAgg = histogramBucketAggs.get(Result.IS_INTERIM.getPreferredName()); - boolean isInterim = interimAgg.getValue() > 0; - if (request.isExcludeInterim() && isInterim) { - continue; - } - - overallBuckets.add(new OverallBucket(getHistogramBucketTimestamp(histogramBucket), - bucketSpanSeconds, overallScore, jobs, isInterim)); - } - return overallBuckets; - } - - private static Date getHistogramBucketTimestamp(Histogram.Bucket bucket) { - DateTime bucketTimestamp = (DateTime) bucket.getKey(); - return new Date(bucketTimestamp.getMillis()); - } - - static class TopNScores extends PriorityQueue { - - TopNScores(int n) { - super(n, false); - } - - @Override - protected boolean lessThan(Double a, Double b) { - return a < b; - } - - double overallScore() { - double overallScore = 0.0; - for (double score : this) { - overallScore += score; - } - return size() > 0 ? overallScore / size() : 0.0; - } - } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregator.java new file mode 100644 index 00000000000..67253396510 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregator.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.job.results.OverallBucket; +import org.elasticsearch.xpack.ml.utils.Intervals; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class OverallBucketsAggregator implements OverallBucketsProcessor { + + private final long bucketSpanSeconds; + private final long bucketSpanMillis; + private double maxOverallScore = 0.0; + private Map maxScoreByJob = new TreeMap<>(); + private boolean isInterim = false; + private Long startTime; + private final List aggregated = new ArrayList<>(); + + public OverallBucketsAggregator(TimeValue bucketSpan) { + bucketSpanSeconds = bucketSpan.seconds(); + bucketSpanMillis = bucketSpan.millis(); + } + + @Override + public synchronized void process(List buckets) { + if (buckets.isEmpty()) { + return; + } + if (startTime == null) { + startTime = Intervals.alignToFloor(buckets.get(0).getTimestamp().getTime(), bucketSpanMillis); + } + long bucketTime; + for (OverallBucket bucket : buckets) { + bucketTime = bucket.getTimestamp().getTime(); + if (bucketTime >= startTime + bucketSpanMillis) { + aggregated.add(outputBucket()); + startNextBucket(bucketTime); + } + processBucket(bucket); + } + } + + private OverallBucket outputBucket() { + List jobs = new ArrayList<>(maxScoreByJob.size()); + maxScoreByJob.entrySet().stream().forEach(entry -> jobs.add( + new OverallBucket.JobInfo(entry.getKey(), entry.getValue()))); + return new OverallBucket(new Date(startTime), bucketSpanSeconds, maxOverallScore, jobs, isInterim); + } + + private void startNextBucket(long bucketTime) { + maxOverallScore = 0.0; + maxScoreByJob.clear(); + isInterim = false; + startTime = Intervals.alignToFloor(bucketTime, bucketSpanMillis); + } + + private void processBucket(OverallBucket bucket) { + maxOverallScore = Math.max(maxOverallScore, bucket.getOverallScore()); + bucket.getJobs().stream().forEach(j -> { + double currentMax = maxScoreByJob.computeIfAbsent(j.getJobId(), k -> 0.0); + if (j.getMaxAnomalyScore() > currentMax) { + maxScoreByJob.put(j.getJobId(), j.getMaxAnomalyScore()); + } + }); + isInterim |= bucket.isInterim(); + } + + @Override + public synchronized List finish() { + if (startTime != null) { + aggregated.add(outputBucket()); + } + return aggregated; + } + + @Override + public synchronized int size() { + return aggregated.size(); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollector.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollector.java new file mode 100644 index 00000000000..acf47831f3c --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollector.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.elasticsearch.xpack.ml.job.results.OverallBucket; + +import java.util.ArrayList; +import java.util.List; + +public class OverallBucketsCollector implements OverallBucketsProcessor { + + private final List collected = new ArrayList<>(); + + @Override + public synchronized void process(List overallBuckets) { + collected.addAll(overallBuckets); + } + + @Override + public synchronized List finish() { + return collected; + } + + @Override + public synchronized int size() { + return collected.size(); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProcessor.java new file mode 100644 index 00000000000..ca4dbb80969 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProcessor.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.elasticsearch.xpack.ml.job.results.OverallBucket; + +import java.util.List; + +public interface OverallBucketsProcessor { + + void process(List overallBuckets); + List finish(); + int size(); +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProvider.java new file mode 100644 index 00000000000..fe853c5890a --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProvider.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.max.Max; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.OverallBucket; +import org.elasticsearch.xpack.ml.job.results.Result; +import org.joda.time.DateTime; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +public class OverallBucketsProvider { + + private final long maxJobBucketSpanSeconds; + private final int topN; + private final double minOverallScore; + + public OverallBucketsProvider(TimeValue maxJobBucketSpan, int topN, double minOverallScore) { + this.maxJobBucketSpanSeconds = maxJobBucketSpan.seconds(); + this.topN = topN; + this.minOverallScore = minOverallScore; + } + + public List computeOverallBuckets(Histogram histogram) { + List overallBuckets = new ArrayList<>(); + for (Histogram.Bucket histogramBucket : histogram.getBuckets()) { + Aggregations histogramBucketAggs = histogramBucket.getAggregations(); + Terms jobsAgg = histogramBucketAggs.get(Job.ID.getPreferredName()); + int jobsCount = jobsAgg.getBuckets().size(); + int bucketTopN = Math.min(topN, jobsCount); + Set jobs = new TreeSet<>(); + TopNScores topNScores = new TopNScores(bucketTopN); + for (Terms.Bucket jobsBucket : jobsAgg.getBuckets()) { + Max maxScore = jobsBucket.getAggregations().get(OverallBucket.OVERALL_SCORE.getPreferredName()); + topNScores.insertWithOverflow(maxScore.getValue()); + jobs.add(new OverallBucket.JobInfo((String) jobsBucket.getKey(), maxScore.getValue())); + } + + double overallScore = topNScores.overallScore(); + if (overallScore < minOverallScore) { + continue; + } + + Max interimAgg = histogramBucketAggs.get(Result.IS_INTERIM.getPreferredName()); + boolean isInterim = interimAgg.getValue() > 0; + + overallBuckets.add(new OverallBucket(getHistogramBucketTimestamp(histogramBucket), + maxJobBucketSpanSeconds, overallScore, new ArrayList<>(jobs), isInterim)); + } + return overallBuckets; + } + + private static Date getHistogramBucketTimestamp(Histogram.Bucket bucket) { + DateTime bucketTimestamp = (DateTime) bucket.getKey(); + return new Date(bucketTimestamp.getMillis()); + } + + static class TopNScores extends PriorityQueue { + + TopNScores(int n) { + super(n, false); + } + + @Override + protected boolean lessThan(Double a, Double b) { + return a < b; + } + + double overallScore() { + double overallScore = 0.0; + for (double score : this) { + overallScore += score; + } + return size() > 0 ? overallScore / size() : 0.0; + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/OverallBucket.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/OverallBucket.java index ee2d17d3d88..3a2aa328f4c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/OverallBucket.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/OverallBucket.java @@ -95,6 +95,10 @@ public class OverallBucket implements ToXContentObject, Writeable { return overallScore; } + public List getJobs() { + return jobs; + } + public boolean isInterim() { return isInterim; } @@ -126,7 +130,7 @@ public class OverallBucket implements ToXContentObject, Writeable { && this.isInterim == that.isInterim; } - public static class JobInfo implements ToXContentObject, Writeable { + public static class JobInfo implements ToXContentObject, Writeable, Comparable { private static final ParseField MAX_ANOMALY_SCORE = new ParseField("max_anomaly_score"); @@ -143,6 +147,14 @@ public class OverallBucket implements ToXContentObject, Writeable { maxAnomalyScore = in.readDouble(); } + public String getJobId() { + return jobId; + } + + public double getMaxAnomalyScore() { + return maxAnomalyScore; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); @@ -174,5 +186,14 @@ public class OverallBucket implements ToXContentObject, Writeable { JobInfo that = (JobInfo) other; return Objects.equals(this.jobId, that.jobId) && this.maxAnomalyScore == that.maxAnomalyScore; } + + @Override + public int compareTo(JobInfo other) { + int result = this.jobId.compareTo(other.jobId); + if (result == 0) { + result = Double.compare(this.maxAnomalyScore, other.maxAnomalyScore); + } + return result; + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetOverallBucketsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetOverallBucketsAction.java index fad85b1d67b..287d5ba1e6b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetOverallBucketsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetOverallBucketsAction.java @@ -44,6 +44,9 @@ public class RestGetOverallBucketsAction extends BaseRestHandler { } else { request = new Request(jobId); request.setTopN(restRequest.paramAsInt(Request.TOP_N.getPreferredName(), request.getTopN())); + if (restRequest.hasParam(Request.BUCKET_SPAN.getPreferredName())) { + request.setBucketSpan(restRequest.param(Request.BUCKET_SPAN.getPreferredName())); + } request.setOverallScore(Double.parseDouble(restRequest.param(Request.OVERALL_SCORE.getPreferredName(), "0.0"))); request.setExcludeInterim(restRequest.paramAsBoolean(Request.EXCLUDE_INTERIM.getPreferredName(), request.isExcludeInterim())); if (restRequest.hasParam(Request.START.getPreferredName())) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionRequestTests.java index 901fb8e0c52..40aa875787b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction.Request; @@ -18,7 +19,9 @@ public class GetOverallBucketsActionRequestTests extends AbstractStreamableXCont if (randomBoolean()) { request.setTopN(randomIntBetween(1, 1000)); } - request.setAllowNoJobs(randomBoolean()); + if (randomBoolean()) { + request.setBucketSpan(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000))); + } if (randomBoolean()) { request.setStart(randomNonNegativeLong()); } @@ -31,6 +34,7 @@ public class GetOverallBucketsActionRequestTests extends AbstractStreamableXCont if (randomBoolean()) { request.setEnd(randomNonNegativeLong()); } + request.setAllowNoJobs(randomBoolean()); return request; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java new file mode 100644 index 00000000000..3b63a5b1e99 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction; +import org.elasticsearch.xpack.ml.action.util.PageParams; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests that overall bucket results are calculated correctly + * for jobs that have many buckets. + */ +public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase { + + private static final String JOB_ID = "overall-buckets-test"; + private static final long BUCKET_SPAN_SECONDS = 3600; + + @After + public void cleanUpTest() throws Exception { + cleanUp(); + } + + public void test() throws Exception { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList(new Detector.Builder("count", null).build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1483228800000L; // 2017-01-01T00:00:00Z + List data = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + if (i % 1000 == 0) { + data.add(createJsonRecord(createRecord(timestamp))); + data.add(createJsonRecord(createRecord(timestamp))); + data.add(createJsonRecord(createRecord(timestamp))); + } + timestamp += BUCKET_SPAN_SECONDS; + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setPageParams(new PageParams(0, 3000)); + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(3000L)); + + { + // Check we get equal number of overall buckets on a default request + GetOverallBucketsAction.Request overallBucketsRequest = new GetOverallBucketsAction.Request(job.getId()); + GetOverallBucketsAction.Response overallBucketsResponse = client().execute( + GetOverallBucketsAction.INSTANCE, overallBucketsRequest).actionGet(); + assertThat(overallBucketsResponse.getOverallBuckets().count(), equalTo(3000L)); + } + + { + // Check overall buckets are half when the bucket_span is set to double the job bucket span + GetOverallBucketsAction.Request aggregatedOverallBucketsRequest = new GetOverallBucketsAction.Request(job.getId()); + aggregatedOverallBucketsRequest.setBucketSpan(TimeValue.timeValueSeconds(2 * BUCKET_SPAN_SECONDS)); + GetOverallBucketsAction.Response aggregatedOverallBucketsResponse = client().execute( + GetOverallBucketsAction.INSTANCE, aggregatedOverallBucketsRequest).actionGet(); + assertThat(aggregatedOverallBucketsResponse.getOverallBuckets().count(), equalTo(1500L)); + } + + { + // Check overall score filtering works when chunking takes place + GetOverallBucketsAction.Request filteredOverallBucketsRequest = new GetOverallBucketsAction.Request(job.getId()); + filteredOverallBucketsRequest.setOverallScore(0.1); + GetOverallBucketsAction.Response filteredOverallBucketsResponse = client().execute( + GetOverallBucketsAction.INSTANCE, filteredOverallBucketsRequest).actionGet(); + assertThat(filteredOverallBucketsResponse.getOverallBuckets().count(), equalTo(2L)); + } + } + + private static Map createRecord(long timestamp) { + Map record = new HashMap<>(); + record.put("time", timestamp); + return record; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregatorTests.java new file mode 100644 index 00000000000..f71ebc01551 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsAggregatorTests.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.results.OverallBucket; +import org.elasticsearch.xpack.ml.job.results.OverallBucket.JobInfo; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class OverallBucketsAggregatorTests extends ESTestCase { + + public void testProcess_GivenEmpty() { + OverallBucketsAggregator aggregator = new OverallBucketsAggregator(TimeValue.timeValueHours(1)); + aggregator.process(Collections.emptyList()); + List aggregated = aggregator.finish(); + assertThat(aggregated.isEmpty(), is(true)); + } + + public void testProcess_GivenAggSpanIsTwiceTheBucketSpan() { + // Monday, October 16, 2017 12:00:00 AM UTC + long startTime = 1508112000000L; + + List rawBuckets1 = new ArrayList<>(); + List rawBuckets2 = new ArrayList<>(); + rawBuckets1.add(new OverallBucket(new Date(startTime), 3600L, 10.0, + Arrays.asList(new OverallBucket.JobInfo("job_1", 10.0), + new OverallBucket.JobInfo("job_2", 6.0)), + false)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(1).millis()), 3600L, 20.0, + Arrays.asList(new JobInfo("job_1", 20.0), new JobInfo("job_2", 2.0)), + false)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(2).millis()), 3600L, 30.0, + Arrays.asList(new JobInfo("job_1", 30.0), new JobInfo("job_2", 7.0)), + false)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(3).millis()), 3600L, 40.0, + Arrays.asList(new JobInfo("job_1", 10.0), new JobInfo("job_2", 40.0)), + false)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(4).millis()), 3600L, 50.0, + Collections.singletonList(new JobInfo("job_1", 50.0)), false)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(5).millis()), 3600L, 60.0, + Collections.singletonList(new JobInfo("job_1", 60.0)), true)); + rawBuckets1.add(new OverallBucket(new Date(startTime + TimeValue.timeValueHours(6).millis()), 3600L, 70.0, + Arrays.asList(new JobInfo("job_1", 70.0), new JobInfo("job_2", 0.0)), + true)); + + + TimeValue bucketSpan = TimeValue.timeValueHours(2); + OverallBucketsAggregator aggregator = new OverallBucketsAggregator(bucketSpan); + aggregator.process(rawBuckets1); + aggregator.process(rawBuckets2); + List aggregated = aggregator.finish(); + + assertThat(aggregated.size(), equalTo(4)); + assertThat(aggregated.get(0).getTimestamp().getTime(), equalTo(startTime)); + assertThat(aggregated.get(0).getOverallScore(), equalTo(20.0)); + assertThat(aggregated.get(0).getJobs(), contains(new JobInfo("job_1", 20.0), + new JobInfo("job_2", 6.0))); + assertThat(aggregated.get(0).isInterim(), is(false)); + assertThat(aggregated.get(1).getTimestamp().getTime(), equalTo(startTime + bucketSpan.millis())); + assertThat(aggregated.get(1).getOverallScore(), equalTo(40.0)); + assertThat(aggregated.get(1).getJobs(), contains(new JobInfo("job_1", 30.0), + new JobInfo("job_2", 40.0))); + assertThat(aggregated.get(1).isInterim(), is(false)); + assertThat(aggregated.get(2).getTimestamp().getTime(), equalTo(startTime + 2 * bucketSpan.millis())); + assertThat(aggregated.get(2).getOverallScore(), equalTo(60.0)); + assertThat(aggregated.get(2).getJobs().size(), equalTo(1)); + assertThat(aggregated.get(2).getJobs(), contains(new JobInfo("job_1", 60.0))); + assertThat(aggregated.get(2).isInterim(), is(true)); + assertThat(aggregated.get(3).getTimestamp().getTime(), equalTo(startTime + 3 * bucketSpan.millis())); + assertThat(aggregated.get(3).getOverallScore(), equalTo(70.0)); + assertThat(aggregated.get(3).getJobs(), contains(new JobInfo("job_1", 70.0), + new JobInfo("job_2", 0.0))); + assertThat(aggregated.get(3).isInterim(), is(true)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollectorTests.java new file mode 100644 index 00000000000..809bb4c118d --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsCollectorTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.results.OverallBucket; + +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class OverallBucketsCollectorTests extends ESTestCase { + + public void testProcess_GivenEmpty() { + OverallBucketsCollector collector = new OverallBucketsCollector(); + collector.process(Collections.emptyList()); + assertThat(collector.finish().isEmpty(), is(true)); + } + + public void testProcess_GivenTwoBatches() { + OverallBucket b1 = mock(OverallBucket.class); + OverallBucket b2 = mock(OverallBucket.class); + OverallBucket b3 = mock(OverallBucket.class); + OverallBucket b4 = mock(OverallBucket.class); + OverallBucket b5 = mock(OverallBucket.class); + + OverallBucketsCollector collector = new OverallBucketsCollector(); + collector.process(Arrays.asList(b1, b2, b3)); + collector.process(Arrays.asList(b4, b5)); + + assertThat(collector.finish(), equalTo(Arrays.asList(b1, b2, b3, b4, b5))); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProviderTests.java similarity index 81% rename from plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionTests.java rename to plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProviderTests.java index a0cd9a62459..4fca6431bf4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetOverallBucketsActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/overallbuckets/OverallBucketsProviderTests.java @@ -3,14 +3,14 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.action; +package org.elasticsearch.xpack.ml.job.persistence.overallbuckets; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.action.GetOverallBucketsAction.TransportAction.TopNScores; +import org.elasticsearch.xpack.ml.job.persistence.overallbuckets.OverallBucketsProvider.TopNScores; import static org.hamcrest.Matchers.equalTo; -public class GetOverallBucketsActionTests extends ESTestCase { +public class OverallBucketsProviderTests extends ESTestCase { public void testTopNScores() { TopNScores topNScores = new TopNScores(3); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/OverallBucketTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/OverallBucketTests.java new file mode 100644 index 00000000000..c16d8c23ff1 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/OverallBucketTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.results; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.ml.job.config.JobTests; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class OverallBucketTests extends AbstractWireSerializingTestCase { + + @Override + protected OverallBucket createTestInstance() { + int jobCount = randomIntBetween(0, 10); + List jobs = new ArrayList<>(jobCount); + for (int i = 0; i < jobCount; ++i) { + jobs.add(new OverallBucket.JobInfo(JobTests.randomValidJobId(), randomDoubleBetween(0.0, 100.0, true))); + } + return new OverallBucket(new Date(randomNonNegativeLong()), + randomIntBetween(60, 24 * 3600), + randomDoubleBetween(0.0, 100.0, true), + jobs, + randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return OverallBucket::new; + } + + public void testCompareTo() { + OverallBucket.JobInfo jobInfo1 = new OverallBucket.JobInfo("aaa", 1.0); + OverallBucket.JobInfo jobInfo2 = new OverallBucket.JobInfo("aaa", 3.0); + OverallBucket.JobInfo jobInfo3 = new OverallBucket.JobInfo("bbb", 1.0); + assertThat(jobInfo1.compareTo(jobInfo1), equalTo(0)); + assertThat(jobInfo1.compareTo(jobInfo2), lessThan(0)); + assertThat(jobInfo1.compareTo(jobInfo3), lessThan(0)); + assertThat(jobInfo2.compareTo(jobInfo3), lessThan(0)); + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_overall_buckets.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_overall_buckets.json index f0df1388242..eaf3b2a233f 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_overall_buckets.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.get_overall_buckets.json @@ -19,6 +19,10 @@ "type": "int", "description": "The number of top job bucket scores to be used in the overall_score calculation" }, + "bucket_span": { + "type": "string", + "description": "The span of the overall buckets. Defaults to the longest job bucket_span" + }, "overall_score": { "type": "double", "description": "Returns overall buckets with overall scores higher than this value" @@ -37,7 +41,6 @@ }, "allow_no_jobs": { "type": "boolean", - "required": false, "description": "Whether to ignore if a wildcard expression matches no jobs. (This includes `_all` string or when no jobs have been specified)" } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_overall_buckets.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_overall_buckets.yml index 65851e4c726..75f35f31117 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_overall_buckets.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_overall_buckets.yml @@ -518,3 +518,99 @@ setup: xpack.ml.get_overall_buckets: job_id: "jobs-get-result-overall-buckets-*" end: "invalid" + +--- +"Test overall buckets given bucket_span": + - do: + xpack.ml.get_overall_buckets: + job_id: "jobs-get-result-overall-buckets-*" + bucket_span: "2h" + - match: { count: 2 } + - match: { overall_buckets.0.timestamp: 1464739200000 } + - match: { overall_buckets.0.bucket_span: 7200 } + - match: { overall_buckets.0.overall_score: 30.0 } + - length: { overall_buckets.0.jobs: 3} + - match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 } + - match: {overall_buckets.0.jobs.0.max_anomaly_score: 0.0 } + - match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 } + - match: {overall_buckets.0.jobs.1.max_anomaly_score: 0.0 } + - match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 } + - match: {overall_buckets.0.jobs.2.max_anomaly_score: 30.0 } + - match: { overall_buckets.0.is_interim: false } + - match: { overall_buckets.0.result_type: overall_bucket } + - match: { overall_buckets.1.timestamp: 1464746400000 } + - match: { overall_buckets.1.bucket_span: 7200 } + - match: { overall_buckets.1.overall_score: 60.0 } + - length: { overall_buckets.1.jobs: 3} + - match: {overall_buckets.1.jobs.0.job_id: jobs-get-result-overall-buckets-17 } + - match: {overall_buckets.1.jobs.0.max_anomaly_score: 60.0 } + - match: {overall_buckets.1.jobs.1.job_id: jobs-get-result-overall-buckets-30 } + - match: {overall_buckets.1.jobs.1.max_anomaly_score: 40.0 } + - match: {overall_buckets.1.jobs.2.job_id: jobs-get-result-overall-buckets-60 } + - match: {overall_buckets.1.jobs.2.max_anomaly_score: 20.0 } + - match: { overall_buckets.1.is_interim: true } + - match: { overall_buckets.1.result_type: overall_bucket } + +--- +"Test overall buckets given bucket_span and top_n is 2": + - do: + xpack.ml.get_overall_buckets: + job_id: "jobs-get-result-overall-buckets-*" + top_n: 2 + bucket_span: "2h" + + - match: { count: 2 } + - match: { overall_buckets.0.timestamp: 1464739200000 } + - match: { overall_buckets.0.bucket_span: 7200 } + - match: { overall_buckets.0.overall_score: 30.0 } + - length: { overall_buckets.0.jobs: 3} + - match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 } + - match: {overall_buckets.0.jobs.0.max_anomaly_score: 0.0 } + - match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 } + - match: {overall_buckets.0.jobs.1.max_anomaly_score: 0.0 } + - match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 } + - match: {overall_buckets.0.jobs.2.max_anomaly_score: 30.0 } + - match: { overall_buckets.0.is_interim: false } + - match: { overall_buckets.0.result_type: overall_bucket } + - match: { overall_buckets.1.timestamp: 1464746400000 } + - match: { overall_buckets.1.bucket_span: 7200 } + - match: { overall_buckets.1.overall_score: 50.0 } + - length: { overall_buckets.1.jobs: 3} + - match: {overall_buckets.1.jobs.0.job_id: jobs-get-result-overall-buckets-17 } + - match: {overall_buckets.1.jobs.0.max_anomaly_score: 60.0 } + - match: {overall_buckets.1.jobs.1.job_id: jobs-get-result-overall-buckets-30 } + - match: {overall_buckets.1.jobs.1.max_anomaly_score: 40.0 } + - match: {overall_buckets.1.jobs.2.job_id: jobs-get-result-overall-buckets-60 } + - match: {overall_buckets.1.jobs.2.max_anomaly_score: 20.0 } + - match: { overall_buckets.1.is_interim: true } + - match: { overall_buckets.1.result_type: overall_bucket } + +--- +"Test overall buckets given bucket_span and overall_score filter": + - do: + xpack.ml.get_overall_buckets: + job_id: "jobs-get-result-overall-buckets-*" + bucket_span: "2h" + overall_score: "41.0" + + - match: { count: 1 } + - match: { overall_buckets.0.timestamp: 1464746400000 } + - match: { overall_buckets.0.bucket_span: 7200 } + - match: { overall_buckets.0.overall_score: 60.0 } + - length: { overall_buckets.0.jobs: 3} + - match: {overall_buckets.0.jobs.0.job_id: jobs-get-result-overall-buckets-17 } + - match: {overall_buckets.0.jobs.0.max_anomaly_score: 60.0 } + - match: {overall_buckets.0.jobs.1.job_id: jobs-get-result-overall-buckets-30 } + - match: {overall_buckets.0.jobs.1.max_anomaly_score: 40.0 } + - match: {overall_buckets.0.jobs.2.job_id: jobs-get-result-overall-buckets-60 } + - match: {overall_buckets.0.jobs.2.max_anomaly_score: 20.0 } + - match: { overall_buckets.0.is_interim: true } + - match: { overall_buckets.0.result_type: overall_bucket } + +--- +"Test overall buckets given bucket_span is smaller than max job bucket_span": + - do: + catch: /.*Param \[bucket_span\] must be greater or equal to the max bucket_span \[60m\]*/ + xpack.ml.get_overall_buckets: + job_id: "jobs-get-result-overall-buckets-*" + bucket_span: "59m" diff --git a/qa/smoke-test-ml-with-security/build.gradle b/qa/smoke-test-ml-with-security/build.gradle index be6725c674c..d34c99af35a 100644 --- a/qa/smoke-test-ml-with-security/build.gradle +++ b/qa/smoke-test-ml-with-security/build.gradle @@ -52,6 +52,7 @@ integTestRunner { 'ml/jobs_get_result_overall_buckets/Test overall buckets given top_n is negative', 'ml/jobs_get_result_overall_buckets/Test overall buckets given invalid start param', 'ml/jobs_get_result_overall_buckets/Test overall buckets given invalid end param', + 'ml/jobs_get_result_overall_buckets/Test overall buckets given bucket_span is smaller than max job bucket_span', 'ml/jobs_get_stats/Test get job stats given missing job', 'ml/jobs_get_stats/Test no exception on get job stats with missing index', 'ml/job_groups/Test put job with empty group',