From 1bfc864193136ffe223c5abf1259ed83782a4a1f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 25 May 2017 08:50:56 +0100 Subject: [PATCH] [ML] Normalize records and buckets separately (elastic/x-pack-elasticsearch#1524) Previously we used to normalize records with their buckets. This required nested scrolls: an outer scroll over buckets, then a nested scroll for records in each bucket. This was fragile. The new approach is to simply scroll first through buckets, then through records. This is made possible because we no longer store max_record_score on buckets nor bucket anomaly_score on records. While making these changes I noticed that the PerPartitionMaxProbabilities class was redundant (because it was storing max_record_score in the case of per-partition normalization), so I removed it. I also removed a redundant Map from the Bucket class and fixed its equals() and hashCode() methods. relates elastic/x-pack-elasticsearch#1115 Original commit: elastic/x-pack-elasticsearch@efbee635736a7bb4978e89b1c10a02f0d8df5a6a --- .../persistence/BatchedDocumentsIterator.java | 1 - .../persistence/ElasticsearchMappings.java | 14 - .../xpack/ml/job/persistence/JobProvider.java | 69 +---- .../JobRenormalizedResultsPersister.java | 16 +- .../job/persistence/JobResultsPersister.java | 23 -- .../autodetect/AutodetectProcessManager.java | 7 +- .../output/AutoDetectResultProcessor.java | 14 +- .../normalizer/AbstractLeafNormalizable.java | 4 +- .../normalizer/BucketNormalizable.java | 22 +- .../ml/job/process/normalizer/Normalizer.java | 4 +- .../PartitionScoreNormalizable.java | 2 +- .../job/process/normalizer/ScoresUpdater.java | 160 ++++------ .../xpack/ml/job/results/Bucket.java | 36 ++- .../results/PerPartitionMaxProbabilities.java | 276 ------------------ .../ml/job/results/ReservedFieldNames.java | 3 - .../action/GetBucketActionResponseTests.java | 10 - .../AutodetectResultProcessorIT.java | 8 +- .../JobRenormalizedResultsPersisterTests.java | 6 +- .../AutoDetectResultProcessorTests.java | 34 +-- .../BucketInfluencerNormalizableTests.java | 4 +- .../normalizer/BucketNormalizableTests.java | 38 +-- .../InfluencerNormalizableTests.java | 4 +- .../normalizer/ScoresUpdaterTests.java | 11 +- .../xpack/ml/job/results/BucketTests.java | 14 +- .../PerPartitionMaxProbabilitiesTests.java | 88 ------ 25 files changed, 154 insertions(+), 714 deletions(-) delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index 12aaf13de55..5679c18340d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -15,7 +15,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayDeque; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index d43f1d6af98..0394da6c53c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influence; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.AuditMessage; @@ -259,19 +258,6 @@ public class ElasticsearchMappings { .endObject() .endObject() - // per-partition max probabilities mapping - .startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName()) - .field(TYPE, NESTED) - .startObject(PROPERTIES) - .startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName()) - .field(TYPE, DOUBLE) - .endObject() - .endObject() - .endObject() - // Model Plot Output .startObject(ModelPlot.MODEL_FEATURE.getPreferredName()) .field(TYPE, KEYWORD) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 71877549c06..fb63d604138 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; @@ -73,7 +72,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.security.support.Exceptions; @@ -388,24 +386,9 @@ public class JobProvider { searchSourceBuilder.sort(Result.TIMESTAMP.getPreferredName(), query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC); } searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); - - MultiSearchRequest mrequest = new MultiSearchRequest(); - mrequest.indicesOptions(addIgnoreUnavailable(mrequest.indicesOptions())); - mrequest.add(searchRequest); - if (Strings.hasLength(query.getPartitionValue())) { - mrequest.add(createPartitionMaxNormailizedProbabilitiesRequest(jobId, query.getStart(), query.getEnd(), - query.getPartitionValue())); - } - - client.multiSearch(mrequest, ActionListener.wrap(mresponse -> { - MultiSearchResponse.Item item1 = mresponse.getResponses()[0]; - if (item1.isFailure()) { - errorHandler.accept(mapAuthFailure(item1.getFailure(), jobId, GetBucketsAction.NAME)); - return; - } - - SearchResponse searchResponse = item1.getResponse(); + client.search(searchRequest, ActionListener.wrap(searchResponse -> { SearchHits hits = searchResponse.getHits(); if (query.getTimestamp() != null) { if (hits.getTotalHits() == 0) { @@ -433,29 +416,15 @@ public class JobProvider { } QueryPage buckets = new QueryPage<>(results, searchResponse.getHits().getTotalHits(), Bucket.RESULTS_FIELD); - if (Strings.hasLength(query.getPartitionValue())) { - MultiSearchResponse.Item item2 = mresponse.getResponses()[1]; - if (item2.isFailure()) { - errorHandler.accept(item2.getFailure()); - return; - } - if (query.isExpand()) { - Iterator bucketsToExpand = buckets.results().stream() - .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); - return; - } + if (query.isExpand()) { + Iterator bucketsToExpand = buckets.results().stream() + .filter(bucket -> bucket.getRecordCount() > 0).iterator(); + expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); } else { - if (query.isExpand()) { - Iterator bucketsToExpand = buckets.results().stream() - .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); - return; - } + handler.accept(buckets); } - handler.accept(buckets); - }, errorHandler)); + }, e -> { errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME)); })); } private void expandBuckets(String jobId, BucketsQuery query, QueryPage buckets, Iterator bucketsToExpand, @@ -470,28 +439,6 @@ public class JobProvider { } } - private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd, - String partitionFieldValue) { - QueryBuilder timeRangeQuery = new ResultsFilterBuilder() - .timeRange(Result.TIMESTAMP.getPreferredName(), epochStart, epochEnd) - .build(); - - QueryBuilder boolQuery = new BoolQueryBuilder() - .filter(timeRangeQuery) - .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE)) - .filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)); - - FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); - String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.sort(sb); - sourceBuilder.query(boolQuery); - SearchRequest searchRequest = new SearchRequest(indexName); - searchRequest.source(sourceBuilder); - searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); - return searchRequest; - } - /** * Returns a {@link BatchedDocumentsIterator} that allows querying * and iterating over a large number of buckets of the given job. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java index 034c879ca05..dcd62706db4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java @@ -36,11 +36,18 @@ import static org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings.D */ public class JobRenormalizedResultsPersister extends AbstractComponent { + /** + * Execute bulk requests when they reach this size + */ + private static final int BULK_LIMIT = 10000; + + private final String jobId; private final Client client; private BulkRequest bulkRequest; - public JobRenormalizedResultsPersister(Settings settings, Client client) { + public JobRenormalizedResultsPersister(String jobId, Settings settings, Client client) { super(settings); + this.jobId = jobId; this.client = client; bulkRequest = new BulkRequest(); } @@ -70,6 +77,9 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { } catch (IOException e) { logger.error("Error serialising result", e); } + if (bulkRequest.numberOfActions() >= BULK_LIMIT) { + executeRequest(); + } } private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { @@ -80,10 +90,8 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { /** * Execute the bulk action - * - * @param jobId The job Id */ - public void executeRequest(String jobId) { + public void executeRequest() { if (bulkRequest.numberOfActions() == 0) { return; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index c9afeeca19d..8b305ed6a2f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import java.io.IOException; @@ -171,28 +170,6 @@ public class JobResultsPersister extends AbstractComponent { return this; } - /** - * Persist {@link PerPartitionMaxProbabilities} - * - * @param partitionProbabilities The probabilities to persist - * @return this - */ - public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) { - try (XContentBuilder builder = toXContentBuilder(partitionProbabilities)) { - logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), - partitionProbabilities.getId()); - bulkRequest.add( - new IndexRequest(indexName, DOC_TYPE, partitionProbabilities.getId()).source(builder)); - - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", - new Object[]{jobId}), e); - } - - return this; - } - /** * Execute the bulk action */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d75a70a05f6..681f7d1a329 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -293,8 +293,8 @@ public class AutodetectProcessManager extends AbstractComponent { ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), jobDataCountsPersister); - ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), - normalizerFactory); + ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, + new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); @@ -302,13 +302,12 @@ public class AutodetectProcessManager extends AbstractComponent { AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); - boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); ExecutorService autodetectWorkerExecutor; try { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); - autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + autoDetectExecutorService.submit(() -> processor.process(process)); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index fea20eb954a..c3f77e490d4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import java.time.Duration; import java.util.Iterator; @@ -40,7 +39,7 @@ import java.util.concurrent.TimeoutException; /** * A runnable class that reads the autodetect process output in the - * {@link #process(AutodetectProcess, boolean)} method and persists parsed + * {@link #process(AutodetectProcess)} method and persists parsed * results via the {@linkplain JobResultsPersister} passed in the constructor. *

* Has methods to register and remove alert observers. @@ -89,8 +88,8 @@ public class AutoDetectResultProcessor { this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); } - public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { - Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); + public void process(AutodetectProcess process) { + Context context = new Context(jobId, persister.bulkPersisterBuilder(jobId)); // If a function call in this throws for some reason we don't want it // to kill the results reader thread as autodetect will be blocked @@ -175,9 +174,6 @@ public class AutoDetectResultProcessor { List records = result.getRecords(); if (records != null && !records.isEmpty()) { context.bulkResultsPersister.persistRecords(records); - if (context.isPerPartitionNormalization) { - context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records)); - } } List influencers = result.getInfluencers(); if (influencers != null && !influencers.isEmpty()) { @@ -306,14 +302,12 @@ public class AutoDetectResultProcessor { static class Context { private final String jobId; - private final boolean isPerPartitionNormalization; private JobResultsPersister.Builder bulkResultsPersister; boolean deleteInterimRequired; - Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { + Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) { this.jobId = jobId; - this.isPerPartitionNormalization = isPerPartitionNormalization; this.deleteInterimRequired = true; this.bulkResultsPersister = bulkResultsPersister; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java index 17e6e965bdd..1eaa550e9f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java @@ -31,11 +31,11 @@ abstract class AbstractLeafNormalizable extends Normalizable { @Override public final List getChildren(ChildType type) { - throw new IllegalStateException(getClass().getSimpleName() + " has no children"); + throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children"); } @Override public final boolean setMaxChildrenScore(ChildType childrenType, double maxScore) { - throw new IllegalStateException(getClass().getSimpleName() + " has no children"); + throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children"); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java index d78cc89b75c..51af9fb209c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java @@ -11,24 +11,20 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER; import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE; -import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.RECORD; public class BucketNormalizable extends Normalizable { - private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE); + private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, PARTITION_SCORE); private final Bucket bucket; - private List records = Collections.emptyList(); - public BucketNormalizable(Bucket bucket, String indexName) { super(indexName); this.bucket = Objects.requireNonNull(bucket); @@ -80,7 +76,7 @@ public class BucketNormalizable extends Normalizable { @Override public double getProbability() { - throw new IllegalStateException("Bucket is container only"); + throw new UnsupportedOperationException("Bucket is container only"); } @Override @@ -93,14 +89,6 @@ public class BucketNormalizable extends Normalizable { bucket.setAnomalyScore(normalizedScore); } - public List getRecords() { - return records; - } - - public void setRecords(List records) { - this.records = records; - } - @Override public List getChildrenTypes() { return CHILD_TYPES; @@ -124,9 +112,6 @@ public class BucketNormalizable extends Normalizable { .map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex())) .collect(Collectors.toList())); break; - case RECORD: - children.addAll(records); - break; case PARTITION_SCORE: children.addAll(bucket.getPartitionScores().stream() .map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex())) @@ -145,7 +130,6 @@ public class BucketNormalizable extends Normalizable { double oldScore = bucket.getAnomalyScore(); bucket.setAnomalyScore(maxScore); return maxScore != oldScore; - case RECORD: case PARTITION_SCORE: return false; default: @@ -156,7 +140,7 @@ public class BucketNormalizable extends Normalizable { @Override public void setParentScore(double parentScore) { - throw new IllegalStateException("Bucket has no parent"); + throw new UnsupportedOperationException("Bucket has no parent"); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java index afa23dbfe9b..2c929ff4f1a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java @@ -52,7 +52,7 @@ public class Normalizer { * normalizer */ public void normalize(Integer bucketSpan, boolean perPartitionNormalization, - List results, String quantilesState) { + List results, String quantilesState) { NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan, perPartitionNormalization, executorService); NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler(); @@ -123,7 +123,7 @@ public class Normalizer { * Updates the normalized scores on the results. */ private void mergeNormalizedScoresIntoResults(List normalizedScores, - List results) { + List results) { Iterator scoresIter = normalizedScores.iterator(); for (Normalizable result : results) { mergeRecursively(scoresIter, null, false, result); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java index 0a7e8662d2b..f8a929f9026 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java @@ -22,7 +22,7 @@ public class PartitionScoreNormalizable extends AbstractLeafNormalizable { @Override public String getId() { - throw new IllegalStateException("PartitionScore has no ID as is should not be persisted outside of the owning bucket"); + throw new UnsupportedOperationException("PartitionScore has no ID as it should not be persisted outside of the owning bucket"); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index bac6b8e148f..bfb1f852d31 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -15,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Influencer; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayList; @@ -32,9 +30,9 @@ public class ScoresUpdater { private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class); /** - * Target number of records to renormalize at a time + * Target number of buckets to renormalize at a time */ - private static final int TARGET_RECORDS_TO_RENORMALIZE = 100000; + private static final int TARGET_BUCKETS_TO_RENORMALIZE = 100000; // 30 days private static final long DEFAULT_RENORMALIZATION_WINDOW_MS = 2592000000L; @@ -44,24 +42,22 @@ public class ScoresUpdater { private static final long SECONDS_IN_DAY = 86400; private static final long MILLISECONDS_IN_SECOND = 1000; - private final Job job; + private final String jobId; private final JobProvider jobProvider; private final JobRenormalizedResultsPersister updatesPersister; private final NormalizerFactory normalizerFactory; private int bucketSpan; private long normalizationWindow; - private boolean perPartitionNormalization; private volatile boolean shutdown; public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister, NormalizerFactory normalizerFactory) { - this.job = job; + jobId = job.getId(); this.jobProvider = Objects.requireNonNull(jobProvider); updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister); this.normalizerFactory = Objects.requireNonNull(normalizerFactory); bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue(); normalizationWindow = getNormalizationWindowOrDefault(job); - perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig()); } /** @@ -79,162 +75,118 @@ public class ScoresUpdater { DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW * bucketSpan * MILLISECONDS_IN_SECOND); } - private static boolean getPerPartitionNormalizationOrDefault(AnalysisConfig analysisConfig) { - return (analysisConfig != null) && analysisConfig.getUsePerPartitionNormalization(); - } - /** * Update the anomaly score field on all previously persisted buckets * and all contained records */ public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) { - Normalizer normalizer = normalizerFactory.create(job.getId()); + Normalizer normalizer = normalizerFactory.create(jobId); int[] counts = {0, 0}; - updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, - perPartitionNormalization); - updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); + updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); + updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); + updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); - LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]); + // The updates will have been persisted in batches throughout the renormalization + // process - this call just catches any leftovers + updatesPersister.executeRequest(); + + LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", jobId, counts[0], counts[1]); } private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { BatchedDocumentsIterator> bucketsIterator = - jobProvider.newBatchedBucketsIterator(job.getId()) + jobProvider.newBatchedBucketsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); - // Make a list of buckets to be renormalized. - // This may be shorter than the original list of buckets for two - // reasons: - // 1) We don't bother with buckets that have raw score 0 and no - // records - // 2) We limit the total number of records to be not much more - // than 100000 List bucketsToRenormalize = new ArrayList<>(); - int batchRecordCount = 0; - int skipped = 0; while (bucketsIterator.hasNext() && shutdown == false) { - // Get a batch of buckets without their records to calculate - // how many buckets can be sensibly retrieved Deque> buckets = bucketsIterator.next(); if (buckets.isEmpty()) { + LOGGER.debug("[{}] No buckets to renormalize for job", jobId); break; } while (!buckets.isEmpty() && shutdown == false) { Result current = buckets.removeFirst(); - Bucket currentBucket = current.result; - if (currentBucket.isNormalizable()) { - BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.index); - List recordNormalizables = - bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime()); - batchRecordCount += recordNormalizables.size(); - bucketNormalizable.setRecords(recordNormalizables); - bucketsToRenormalize.add(bucketNormalizable); - - } else { - ++skipped; - } - - if (batchRecordCount >= TARGET_RECORDS_TO_RENORMALIZE) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, - batchRecordCount, skipped, counts, perPartitionNormalization); - - bucketsToRenormalize = new ArrayList<>(); - batchRecordCount = 0; - skipped = 0; + if (current.result.isNormalizable()) { + bucketsToRenormalize.add(new BucketNormalizable(current.result, current.index)); + if (bucketsToRenormalize.size() >= TARGET_BUCKETS_TO_RENORMALIZE) { + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); + bucketsToRenormalize.clear(); + } } } } if (!bucketsToRenormalize.isEmpty()) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, batchRecordCount, skipped, counts, - perPartitionNormalization); + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); } } - private List bucketRecordsAsNormalizables(long bucketTimeStamp) { - BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId()) - .timeRange(bucketTimeStamp, bucketTimeStamp + 1) - .includeInterim(false); - - List recordNormalizables = new ArrayList<>(); - while (recordsIterator.hasNext() && shutdown == false) { - for (Result record : recordsIterator.next() ) { - recordNormalizables.add(new RecordNormalizable(record.result, record.index)); - } - } - - return recordNormalizables; - } - private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) { return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs); } private void normalizeBuckets(Normalizer normalizer, List normalizableBuckets, - String quantilesState, int recordCount, int skipped, int[] counts, - boolean perPartitionNormalization) { - if (shutdown) { - return; + String quantilesState, int[] counts, boolean perPartitionNormalization) { + normalizer.normalize(bucketSpan, perPartitionNormalization, normalizableBuckets, quantilesState); + + for (BucketNormalizable bucketNormalizable : normalizableBuckets) { + if (bucketNormalizable.hadBigNormalizedUpdate()) { + updatesPersister.updateBucket(bucketNormalizable); + ++counts[0]; + } else { + ++counts[1]; + } } - - LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)", - job.getId(), normalizableBuckets.size(), recordCount, skipped); - - List asNormalizables = normalizableBuckets.stream().collect(Collectors.toList()); - normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); - - for (BucketNormalizable bn : normalizableBuckets) { - updateSingleBucket(counts, bn); - } - - updatesPersister.executeRequest(job.getId()); } - private void updateSingleBucket(int[] counts, BucketNormalizable bucketNormalizable) { - if (bucketNormalizable.hadBigNormalizedUpdate()) { - if (perPartitionNormalization) { - List anomalyRecords = bucketNormalizable.getRecords().stream() - .map(RecordNormalizable::getRecord).collect(Collectors.toList()); - PerPartitionMaxProbabilities ppProbs = new PerPartitionMaxProbabilities(anomalyRecords); - updatesPersister.updateResult(ppProbs.getId(), bucketNormalizable.getOriginatingIndex(), ppProbs); + private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs, + long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(jobId) + .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) + .includeInterim(false); + + while (recordsIterator.hasNext() && shutdown == false) { + Deque> records = recordsIterator.next(); + if (records.isEmpty()) { + LOGGER.debug("[{}] No records to renormalize for job", jobId); + break; } - updatesPersister.updateBucket(bucketNormalizable); - ++counts[0]; - } else { - ++counts[1]; + LOGGER.debug("[{}] Will renormalize a batch of {} records", jobId, records.size()); + List asNormalizables = records.stream() + .map(recordResultIndex -> new RecordNormalizable(recordResultIndex.result, recordResultIndex.index)) + .collect(Collectors.toList()); + normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); + + persistChanged(counts, asNormalizables); } - - persistChanged(counts, bucketNormalizable.getRecords()); } private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, - long windowExtensionMs, int[] counts) { - BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(job.getId()) + long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); while (influencersIterator.hasNext() && shutdown == false) { Deque> influencers = influencersIterator.next(); if (influencers.isEmpty()) { - LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); + LOGGER.debug("[{}] No influencers to renormalize for job", jobId); break; } - LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size()); + LOGGER.debug("[{}] Will renormalize a batch of {} influencers", jobId, influencers.size()); List asNormalizables = influencers.stream() - .map(influencerResultIndex -> - new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index)) + .map(influencerResultIndex -> new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index)) .collect(Collectors.toList()); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); persistChanged(counts, asNormalizables); } - - updatesPersister.executeRequest(job.getId()); } private void persistChanged(int[] counts, List asNormalizables) { @@ -242,7 +194,7 @@ public class ScoresUpdater { return; } - List toUpdate = asNormalizables.stream().filter(n -> n.hadBigNormalizedUpdate()).collect(Collectors.toList()); + List toUpdate = asNormalizables.stream().filter(Normalizable::hadBigNormalizedUpdate).collect(Collectors.toList()); counts[0] += toUpdate.size(); counts[1] += asNormalizables.size() - toUpdate.size(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java index e893bd5ab6f..a27e065682f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.results; +import org.elasticsearch.Version; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -93,7 +94,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { private boolean isInterim; private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; - private Map perPartitionMaxProbability = Collections.emptyMap(); private List partitionScores = Collections.emptyList(); public Bucket(String jobId, Date timestamp, long bucketSpan) { @@ -114,11 +114,9 @@ public class Bucket extends ToXContentToBytes implements Writeable { this.isInterim = other.isInterim; this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.processingTimeMs = other.processingTimeMs; - this.perPartitionMaxProbability = other.perPartitionMaxProbability; this.partitionScores = new ArrayList<>(other.partitionScores); } - @SuppressWarnings("unchecked") public Bucket(StreamInput in) throws IOException { jobId = in.readString(); timestamp = new Date(in.readLong()); @@ -131,7 +129,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { isInterim = in.readBoolean(); bucketInfluencers = in.readList(BucketInfluencer::new); processingTimeMs = in.readLong(); - perPartitionMaxProbability = (Map) in.readGenericValue(); + // bwc for perPartitionMaxProbability + if (in.getVersion().before(Version.V_5_5_0_UNRELEASED)) { + in.readGenericValue(); + } partitionScores = in.readList(PartitionScore::new); } @@ -148,7 +149,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { out.writeBoolean(isInterim); out.writeList(bucketInfluencers); out.writeLong(processingTimeMs); - out.writeGenericValue(perPartitionMaxProbability); + // bwc for perPartitionMaxProbability + if (out.getVersion().before(Version.V_5_5_0_UNRELEASED)) { + out.writeGenericValue(Collections.emptyMap()); + } out.writeList(partitionScores); } @@ -290,14 +294,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { partitionScores = Objects.requireNonNull(scores); } - public Map getPerPartitionMaxProbability() { - return perPartitionMaxProbability; - } - - public void setPerPartitionMaxProbability(Map perPartitionMaxProbability) { - this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability); - } - public double partitionInitialAnomalyScore(String partitionValue) { Optional first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue())) .findFirst(); @@ -315,7 +311,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { @Override public int hashCode() { return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, recordCount, records, - isInterim, bucketSpan, bucketInfluencers); + isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs); } /** @@ -338,18 +334,20 @@ public class Bucket extends ToXContentToBytes implements Writeable { && (this.recordCount == that.recordCount) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) && Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim) - && Objects.equals(this.bucketInfluencers, that.bucketInfluencers); + && Objects.equals(this.bucketInfluencers, that.bucketInfluencers) + && Objects.equals(this.partitionScores, that.partitionScores) + && (this.processingTimeMs == that.processingTimeMs); } /** - * This method encapsulated the logic for whether a bucket should be - * normalized. Buckets that have no records and a score of - * zero should not be normalized as their score will not change and they + * This method encapsulated the logic for whether a bucket should be normalized. + * Buckets that have a zero anomaly score themselves and no partition scores with + * non-zero score should not be normalized as their score will not change and they * will just add overhead. * * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - return anomalyScore > 0.0 || recordCount > 0; + return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java deleted file mode 100644 index f4b06b18203..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.results; - -import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.utils.time.TimeUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collector; -import java.util.stream.Collectors; - -/** - * When per-partition normalization is enabled this class represents - * the max anomalous probabilities of each partition per bucket. These values - * calculated from the bucket's anomaly records. - */ -public class PerPartitionMaxProbabilities extends ToXContentToBytes implements Writeable { - - /** - * Result type - */ - public static final String RESULT_TYPE_VALUE = "partition_normalized_probs"; - - /* - * Field Names - */ - public static final ParseField PER_PARTITION_MAX_PROBABILITIES = new ParseField("per_partition_max_probabilities"); - public static final ParseField MAX_RECORD_SCORE = new ParseField("max_record_score"); - - @SuppressWarnings("unchecked") - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> - new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (long) a[2], (List) a[3])); - - static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { - return new Date(p.longValue()); - } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { - return new Date(TimeUtils.dateStringToEpoch(p.text())); - } - throw new IllegalArgumentException( - "unexpected token [" + p.currentToken() + "] for [" + Result.TIMESTAMP.getPreferredName() + "]"); - }, Result.TIMESTAMP, ObjectParser.ValueType.VALUE); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN); - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES); - PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE); - } - - private final String jobId; - private final Date timestamp; - private final long bucketSpan; - private final List perPartitionMaxProbabilities; - - public PerPartitionMaxProbabilities(String jobId, Date timestamp, long bucketSpan, - List partitionProbabilities) { - this.jobId = jobId; - this.timestamp = timestamp; - this.bucketSpan = bucketSpan; - this.perPartitionMaxProbabilities = partitionProbabilities; - } - - public PerPartitionMaxProbabilities(List records) { - if (records.isEmpty()) { - throw new IllegalArgumentException("PerPartitionMaxProbabilities cannot be created from an empty list of records"); - } - this.jobId = records.get(0).getJobId(); - this.timestamp = records.get(0).getTimestamp(); - this.bucketSpan = records.get(0).getBucketSpan(); - this.perPartitionMaxProbabilities = calcMaxRecordScorePerPartition(records); - } - - public PerPartitionMaxProbabilities(StreamInput in) throws IOException { - jobId = in.readString(); - timestamp = new Date(in.readLong()); - bucketSpan = in.readLong(); - perPartitionMaxProbabilities = in.readList(PartitionProbability::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(jobId); - out.writeLong(timestamp.getTime()); - out.writeLong(bucketSpan); - out.writeList(perPartitionMaxProbabilities); - } - - public String getJobId() { - return jobId; - } - - public String getId() { - return jobId + "_" + RESULT_TYPE_VALUE + "_" + timestamp.getTime() + "_" + bucketSpan; - } - - public Date getTimestamp() { - return timestamp; - } - - public List getPerPartitionMaxProbabilities() { - return perPartitionMaxProbabilities; - } - - public double getMaxProbabilityForPartition(String partitionValue) { - Optional first = - perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst(); - - return first.isPresent() ? first.get().getMaxRecordScore() : 0.0; - } - - /** - * Box class for the stream collector function below - */ - private final class DoubleMaxBox { - private double value = 0.0; - - DoubleMaxBox() { - } - - public void accept(double d) { - if (d > value) { - value = d; - } - } - - public DoubleMaxBox combine(DoubleMaxBox other) { - return (this.value > other.value) ? this : other; - } - - public Double value() { - return this.value; - } - } - - private List calcMaxRecordScorePerPartition(List anomalyRecords) { - Map maxValueByPartition = anomalyRecords.stream().collect( - Collectors.groupingBy(AnomalyRecord::getPartitionFieldValue, - Collector.of(DoubleMaxBox::new, (m, ar) -> m.accept(ar.getRecordScore()), - DoubleMaxBox::combine, DoubleMaxBox::value))); - - List pProbs = new ArrayList<>(); - for (Map.Entry entry : maxValueByPartition.entrySet()) { - pProbs.add(new PartitionProbability(entry.getKey(), entry.getValue())); - } - - return pProbs; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(Job.ID.getPreferredName(), jobId); - builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); - builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan); - builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities); - builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); - builder.endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, bucketSpan); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof PerPartitionMaxProbabilities == false) { - return false; - } - - PerPartitionMaxProbabilities that = (PerPartitionMaxProbabilities) other; - - return Objects.equals(this.jobId, that.jobId) - && Objects.equals(this.timestamp, that.timestamp) - && this.bucketSpan == that.bucketSpan - && Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities); - } - - /** - * Class for partitionValue, maxRecordScore pairs - */ - public static class PartitionProbability extends ToXContentToBytes implements Writeable { - - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("partitionProbability", - a -> new PartitionProbability((String) a[0], (double) a[1])); - - static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE); - PARSER.declareDouble(ConstructingObjectParser.constructorArg(), MAX_RECORD_SCORE); - } - - private final String partitionValue; - private final double maxRecordScore; - - PartitionProbability(String partitionName, double maxRecordScore) { - this.partitionValue = partitionName; - this.maxRecordScore = maxRecordScore; - } - - public PartitionProbability(StreamInput in) throws IOException { - partitionValue = in.readString(); - maxRecordScore = in.readDouble(); - } - - public String getPartitionValue() { - return partitionValue; - } - - public double getMaxRecordScore() { - return maxRecordScore; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(partitionValue); - out.writeDouble(maxRecordScore); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject() - .field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue) - .field(MAX_RECORD_SCORE.getPreferredName(), maxRecordScore) - .endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(partitionValue, maxRecordScore); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof PartitionProbability == false) { - return false; - } - - PartitionProbability that = (PartitionProbability) other; - - return Objects.equals(this.partitionValue, that.partitionValue) - && this.maxRecordScore == that.maxRecordScore; - } - } -} - - diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index 0fcb31b8c69..a05306de6b5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -141,9 +141,6 @@ public final class ReservedFieldNames { ModelSnapshot.LATEST_RESULT_TIME.getPreferredName(), ModelSnapshot.RETAIN.getPreferredName(), - PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), - PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName(), - Result.RESULT_TYPE.getPreferredName(), Result.TIMESTAMP.getPreferredName(), Result.IS_INTERIM.getPreferredName() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java index dac32c3dff2..a8148e02638 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java @@ -16,9 +16,7 @@ import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class GetBucketActionResponseTests extends AbstractStreamableTestCase { @@ -64,14 +62,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase perPartitionMaxProbability = new HashMap<>(size); - for (int i = 0; i < size; i++) { - perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble()); - } - bucket.setPerPartitionMaxProbability(perPartitionMaxProbability); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 1a1d11489f9..6d2c44bd0f5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -129,7 +129,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { Quantiles quantiles = createQuantiles(); builder.addQuantiles(quantiles); - resultProcessor.process(builder.buildTestProcess(), false); + resultProcessor.process(builder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build(); @@ -179,7 +179,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addFlushAcknowledgement(createFlushAcknowledgement()) .addBucket(nonInterimBucket); // and this will delete the interim results - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -212,7 +212,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addRecords(finalAnomalyRecords) .addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -237,7 +237,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addBucket(bucket) // bucket triggers persistence .addRecords(secondSetOfRecords); - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java index fa37d65fee8..ce84f4eca7b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java @@ -33,7 +33,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { BucketNormalizable bn = createBucketNormalizable(); JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); persister.updateBucket(bn); - persister.executeRequest("foo"); + persister.executeRequest(); assertEquals(0, persister.getBulkRequest().numberOfActions()); } @@ -42,7 +42,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { when(bulkResponse.hasFailures()).thenReturn(false); Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build(); - return new JobRenormalizedResultsPersister(Settings.EMPTY, client); + return new JobRenormalizedResultsPersister("foo", Settings.EMPTY, client); } private BucketNormalizable createBucketNormalizable() { @@ -52,4 +52,4 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1)); return new BucketNormalizable(bucket, "foo-index"); } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 91691b41bc6..aab18bca312 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.junit.Before; import org.mockito.InOrder; @@ -72,7 +71,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(iterator.next()).thenReturn(autodetectResult); AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); verify(renormalizer, times(1)).waitUntilIdle(); assertEquals(0, processorUnderTest.completionLatch.getCount()); } @@ -82,7 +81,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -100,7 +99,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = true; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -118,7 +117,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); @@ -136,7 +135,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); @@ -147,7 +146,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getRecords()).thenReturn(records); processorUnderTest.processResult(context, result); - verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); verify(bulkBuilder, times(1)).persistRecords(records); verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); @@ -157,7 +155,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123); @@ -175,7 +173,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); @@ -191,7 +189,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -209,7 +207,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -232,7 +230,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelPlot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); @@ -246,7 +244,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSizeStats() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); @@ -261,7 +259,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSnapshot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) @@ -281,7 +279,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); @@ -304,7 +302,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(iterator.next()).thenReturn(autodetectResult); AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); @@ -325,7 +323,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any()); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); verify(persister, times(2)).persistModelSizeStats(any()); } @@ -338,7 +336,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); processorUnderTest.setProcessKilled(); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java index 1ec0af798b1..d39d20f4898 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java @@ -77,7 +77,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } @@ -86,7 +86,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, + expectThrows(UnsupportedOperationException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java index c1eab334a24..ba32d1d258f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.stream.Collectors; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; @@ -83,7 +82,7 @@ public class BucketNormalizableTests extends ESTestCase { } public void testGetProbability() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability()); + expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability()); } public void testGetNormalizedScore() { @@ -101,23 +100,17 @@ public class BucketNormalizableTests extends ESTestCase { public void testGetChildren() { BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); - bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) - .collect(Collectors.toList())); List children = bn.getChildren(); - assertEquals(6, children.size()); + assertEquals(4, children.size()); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON); assertTrue(children.get(1) instanceof BucketInfluencerNormalizable); assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON); - assertTrue(children.get(2) instanceof RecordNormalizable); - assertEquals(1.0, children.get(2).getNormalizedScore(), EPSILON); - assertTrue(children.get(3) instanceof RecordNormalizable); - assertEquals(2.0, children.get(3).getNormalizedScore(), EPSILON); - assertTrue(children.get(4) instanceof PartitionScoreNormalizable); - assertEquals(0.2, children.get(4).getNormalizedScore(), EPSILON); - assertTrue(children.get(5) instanceof PartitionScoreNormalizable); - assertEquals(0.4, children.get(5).getNormalizedScore(), EPSILON); + assertTrue(children.get(2) instanceof PartitionScoreNormalizable); + assertEquals(0.2, children.get(2).getNormalizedScore(), EPSILON); + assertTrue(children.get(3) instanceof PartitionScoreNormalizable); + assertEquals(0.4, children.get(3).getNormalizedScore(), EPSILON); } public void testGetChildren_GivenTypeBucketInfluencer() { @@ -131,24 +124,11 @@ public class BucketNormalizableTests extends ESTestCase { assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON); } - public void testGetChildren_GivenTypeRecord() { - BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); - bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) - .collect(Collectors.toList())); - List children = bn.getChildren(Normalizable.ChildType.RECORD); - - assertEquals(2, children.size()); - assertTrue(children.get(0) instanceof RecordNormalizable); - assertEquals(1.0, children.get(0).getNormalizedScore(), EPSILON); - assertTrue(children.get(1) instanceof RecordNormalizable); - assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON); - } - public void testSetMaxChildrenScore_GivenDifferentScores() { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 42.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 42.0)); assertEquals(95.0, bucket.getAnomalyScore(), EPSILON); } @@ -157,13 +137,13 @@ public class BucketNormalizableTests extends ESTestCase { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 2.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 2.0)); assertEquals(88.0, bucket.getAnomalyScore(), EPSILON); } public void testSetParentScore() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0)); + expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0)); } public void testResetBigChangeFlag() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java index b4ae0029f34..ac87ab9bf34 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java @@ -74,7 +74,7 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } @@ -83,7 +83,7 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index 8cbd149158d..7c005f2c956 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -83,6 +83,7 @@ public class ScoresUpdaterTests extends ESTestCase { scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory); givenProviderReturnsNoBuckets(); + givenProviderReturnsNoRecords(); givenProviderReturnsNoInfluencers(); givenNormalizerFactoryReturnsMock(); givenNormalizerRaisesBigChangeFlag(); @@ -153,10 +154,10 @@ public class ScoresUpdaterTests extends ESTestCase { scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - verifyNormalizerWasInvoked(1); + verifyNormalizerWasInvoked(2); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); - verify(jobRenormalizedResultsPersister, times(2)).executeRequest(anyString()); + verify(jobRenormalizedResultsPersister, times(1)).executeRequest(); } public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException { @@ -229,7 +230,7 @@ public class ScoresUpdaterTests extends ESTestCase { verifyNormalizerWasInvoked(1); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); - verify(jobRenormalizedResultsPersister, times(1)).executeRequest(anyString()); + verify(jobRenormalizedResultsPersister, times(1)).executeRequest(); } public void testUpdate_GivenShutdown() throws IOException { @@ -381,6 +382,10 @@ public class ScoresUpdaterTests extends ESTestCase { when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); } + private void givenProviderReturnsNoRecords() { + givenProviderReturnsRecords(new ArrayDeque<>()); + } + private void givenProviderReturnsRecords(Deque records) { Deque> batch = new ArrayDeque<>(); List>> batches = new ArrayList<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index ab185d55dfb..ec52d5453a0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -13,9 +13,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class BucketTests extends AbstractSerializingTestCase { @@ -61,14 +59,6 @@ public class BucketTests extends AbstractSerializingTestCase { } bucket.setPartitionScores(partitionScores); } - if (randomBoolean()) { - int size = randomInt(10); - Map perPartitionMaxProbability = new HashMap<>(size); - for (int i = 0; i < size; i++) { - perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble()); - } - bucket.setPerPartitionMaxProbability(perPartitionMaxProbability); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } @@ -244,11 +234,11 @@ public class BucketTests extends AbstractSerializingTestCase { assertFalse(bucket.isNormalizable()); } - public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() { + public void testIsNormalizable_GivenAnomalyScoreIsZeroAndPartitionsScoresAreNonZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123)); bucket.setAnomalyScore(0.0); - bucket.setRecordCount(1); + bucket.setPartitionScores(Collections.singletonList(new PartitionScore("n", "v", 50.0, 40.0, 0.01))); assertTrue(bucket.isNormalizable()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java deleted file mode 100644 index 14ac057d5ba..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.results; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import org.joda.time.DateTime; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCase { - - @Override - protected PerPartitionMaxProbabilities createTestInstance() { - int num = randomIntBetween(1, 10); - List pps = new ArrayList<>(); - for (int i=0; i instanceReader() { - return PerPartitionMaxProbabilities::new; - } - - @Override - protected PerPartitionMaxProbabilities parseInstance(XContentParser parser) { - return PerPartitionMaxProbabilities.PARSER.apply(parser, null); - } - - public void testCreateFromAListOfRecords() { - List records = new ArrayList<>(); - records.add(createAnomalyRecord("A", 20.0)); - records.add(createAnomalyRecord("A", 40.0)); - records.add(createAnomalyRecord("B", 90.0)); - records.add(createAnomalyRecord("B", 15.0)); - records.add(createAnomalyRecord("B", 45.0)); - - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records); - - List pProbs = ppMax.getPerPartitionMaxProbabilities(); - assertEquals(2, pProbs.size()); - for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) { - if (pProb.getPartitionValue().equals("A")) { - assertEquals(40.0, pProb.getMaxRecordScore(), 0.0001); - } else { - assertEquals(90.0, pProb.getMaxRecordScore(), 0.0001); - } - } - } - - public void testMaxProbabilityForPartition() { - List records = new ArrayList<>(); - records.add(createAnomalyRecord("A", 20.0)); - records.add(createAnomalyRecord("A", 40.0)); - records.add(createAnomalyRecord("B", 90.0)); - records.add(createAnomalyRecord("B", 15.0)); - records.add(createAnomalyRecord("B", 45.0)); - - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records); - - assertEquals(40.0, ppMax.getMaxProbabilityForPartition("A"), 0.0001); - assertEquals(90.0, ppMax.getMaxProbabilityForPartition("B"), 0.0001); - } - - public void testId() { - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities("job-foo", new Date(100L), 300L, Collections.emptyList()); - assertEquals("job-foo_partition_normalized_probs_100_300", ppMax.getId()); - } - - private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double recordScore) { - AnomalyRecord record = new AnomalyRecord("foo", new Date(), 600); - record.setPartitionFieldValue(partitionFieldValue); - record.setRecordScore(recordScore); - return record; - } -}