diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index 12aaf13de55..5679c18340d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -15,7 +15,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayDeque; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index d43f1d6af98..0394da6c53c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influence; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.AuditMessage; @@ -259,19 +258,6 @@ public class ElasticsearchMappings { .endObject() .endObject() - // per-partition max probabilities mapping - .startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName()) - .field(TYPE, NESTED) - .startObject(PROPERTIES) - .startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName()) - .field(TYPE, DOUBLE) - .endObject() - .endObject() - .endObject() - // Model Plot Output .startObject(ModelPlot.MODEL_FEATURE.getPreferredName()) .field(TYPE, KEYWORD) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 71877549c06..fb63d604138 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; @@ -73,7 +72,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.security.support.Exceptions; @@ -388,24 +386,9 @@ public class JobProvider { searchSourceBuilder.sort(Result.TIMESTAMP.getPreferredName(), query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC); } searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); - - MultiSearchRequest mrequest = new MultiSearchRequest(); - mrequest.indicesOptions(addIgnoreUnavailable(mrequest.indicesOptions())); - mrequest.add(searchRequest); - if (Strings.hasLength(query.getPartitionValue())) { - mrequest.add(createPartitionMaxNormailizedProbabilitiesRequest(jobId, query.getStart(), query.getEnd(), - query.getPartitionValue())); - } - - client.multiSearch(mrequest, ActionListener.wrap(mresponse -> { - MultiSearchResponse.Item item1 = mresponse.getResponses()[0]; - if (item1.isFailure()) { - errorHandler.accept(mapAuthFailure(item1.getFailure(), jobId, GetBucketsAction.NAME)); - return; - } - - SearchResponse searchResponse = item1.getResponse(); + client.search(searchRequest, ActionListener.wrap(searchResponse -> { SearchHits hits = searchResponse.getHits(); if (query.getTimestamp() != null) { if (hits.getTotalHits() == 0) { @@ -433,29 +416,15 @@ public class JobProvider { } QueryPage buckets = new QueryPage<>(results, searchResponse.getHits().getTotalHits(), Bucket.RESULTS_FIELD); - if (Strings.hasLength(query.getPartitionValue())) { - MultiSearchResponse.Item item2 = mresponse.getResponses()[1]; - if (item2.isFailure()) { - errorHandler.accept(item2.getFailure()); - return; - } - if (query.isExpand()) { - Iterator bucketsToExpand = buckets.results().stream() - .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); - return; - } + if (query.isExpand()) { + Iterator bucketsToExpand = buckets.results().stream() + .filter(bucket -> bucket.getRecordCount() > 0).iterator(); + expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); } else { - if (query.isExpand()) { - Iterator bucketsToExpand = buckets.results().stream() - .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); - return; - } + handler.accept(buckets); } - handler.accept(buckets); - }, errorHandler)); + }, e -> { errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME)); })); } private void expandBuckets(String jobId, BucketsQuery query, QueryPage buckets, Iterator bucketsToExpand, @@ -470,28 +439,6 @@ public class JobProvider { } } - private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd, - String partitionFieldValue) { - QueryBuilder timeRangeQuery = new ResultsFilterBuilder() - .timeRange(Result.TIMESTAMP.getPreferredName(), epochStart, epochEnd) - .build(); - - QueryBuilder boolQuery = new BoolQueryBuilder() - .filter(timeRangeQuery) - .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE)) - .filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)); - - FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); - String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.sort(sb); - sourceBuilder.query(boolQuery); - SearchRequest searchRequest = new SearchRequest(indexName); - searchRequest.source(sourceBuilder); - searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions())); - return searchRequest; - } - /** * Returns a {@link BatchedDocumentsIterator} that allows querying * and iterating over a large number of buckets of the given job. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java index 034c879ca05..dcd62706db4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java @@ -36,11 +36,18 @@ import static org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings.D */ public class JobRenormalizedResultsPersister extends AbstractComponent { + /** + * Execute bulk requests when they reach this size + */ + private static final int BULK_LIMIT = 10000; + + private final String jobId; private final Client client; private BulkRequest bulkRequest; - public JobRenormalizedResultsPersister(Settings settings, Client client) { + public JobRenormalizedResultsPersister(String jobId, Settings settings, Client client) { super(settings); + this.jobId = jobId; this.client = client; bulkRequest = new BulkRequest(); } @@ -70,6 +77,9 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { } catch (IOException e) { logger.error("Error serialising result", e); } + if (bulkRequest.numberOfActions() >= BULK_LIMIT) { + executeRequest(); + } } private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { @@ -80,10 +90,8 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { /** * Execute the bulk action - * - * @param jobId The job Id */ - public void executeRequest(String jobId) { + public void executeRequest() { if (bulkRequest.numberOfActions() == 0) { return; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index c9afeeca19d..8b305ed6a2f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import java.io.IOException; @@ -171,28 +170,6 @@ public class JobResultsPersister extends AbstractComponent { return this; } - /** - * Persist {@link PerPartitionMaxProbabilities} - * - * @param partitionProbabilities The probabilities to persist - * @return this - */ - public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) { - try (XContentBuilder builder = toXContentBuilder(partitionProbabilities)) { - logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), - partitionProbabilities.getId()); - bulkRequest.add( - new IndexRequest(indexName, DOC_TYPE, partitionProbabilities.getId()).source(builder)); - - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", - new Object[]{jobId}), e); - } - - return this; - } - /** * Execute the bulk action */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d75a70a05f6..681f7d1a329 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -293,8 +293,8 @@ public class AutodetectProcessManager extends AbstractComponent { ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), jobDataCountsPersister); - ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), - normalizerFactory); + ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, + new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); @@ -302,13 +302,12 @@ public class AutodetectProcessManager extends AbstractComponent { AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED)); - boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); ExecutorService autodetectWorkerExecutor; try { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); - autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization)); + autoDetectExecutorService.submit(() -> processor.process(process)); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index fea20eb954a..c3f77e490d4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import java.time.Duration; import java.util.Iterator; @@ -40,7 +39,7 @@ import java.util.concurrent.TimeoutException; /** * A runnable class that reads the autodetect process output in the - * {@link #process(AutodetectProcess, boolean)} method and persists parsed + * {@link #process(AutodetectProcess)} method and persists parsed * results via the {@linkplain JobResultsPersister} passed in the constructor. *

* Has methods to register and remove alert observers. @@ -89,8 +88,8 @@ public class AutoDetectResultProcessor { this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); } - public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { - Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId)); + public void process(AutodetectProcess process) { + Context context = new Context(jobId, persister.bulkPersisterBuilder(jobId)); // If a function call in this throws for some reason we don't want it // to kill the results reader thread as autodetect will be blocked @@ -175,9 +174,6 @@ public class AutoDetectResultProcessor { List records = result.getRecords(); if (records != null && !records.isEmpty()) { context.bulkResultsPersister.persistRecords(records); - if (context.isPerPartitionNormalization) { - context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records)); - } } List influencers = result.getInfluencers(); if (influencers != null && !influencers.isEmpty()) { @@ -306,14 +302,12 @@ public class AutoDetectResultProcessor { static class Context { private final String jobId; - private final boolean isPerPartitionNormalization; private JobResultsPersister.Builder bulkResultsPersister; boolean deleteInterimRequired; - Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { + Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) { this.jobId = jobId; - this.isPerPartitionNormalization = isPerPartitionNormalization; this.deleteInterimRequired = true; this.bulkResultsPersister = bulkResultsPersister; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java index 17e6e965bdd..1eaa550e9f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java @@ -31,11 +31,11 @@ abstract class AbstractLeafNormalizable extends Normalizable { @Override public final List getChildren(ChildType type) { - throw new IllegalStateException(getClass().getSimpleName() + " has no children"); + throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children"); } @Override public final boolean setMaxChildrenScore(ChildType childrenType, double maxScore) { - throw new IllegalStateException(getClass().getSimpleName() + " has no children"); + throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children"); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java index d78cc89b75c..51af9fb209c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java @@ -11,24 +11,20 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER; import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE; -import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.RECORD; public class BucketNormalizable extends Normalizable { - private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE); + private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, PARTITION_SCORE); private final Bucket bucket; - private List records = Collections.emptyList(); - public BucketNormalizable(Bucket bucket, String indexName) { super(indexName); this.bucket = Objects.requireNonNull(bucket); @@ -80,7 +76,7 @@ public class BucketNormalizable extends Normalizable { @Override public double getProbability() { - throw new IllegalStateException("Bucket is container only"); + throw new UnsupportedOperationException("Bucket is container only"); } @Override @@ -93,14 +89,6 @@ public class BucketNormalizable extends Normalizable { bucket.setAnomalyScore(normalizedScore); } - public List getRecords() { - return records; - } - - public void setRecords(List records) { - this.records = records; - } - @Override public List getChildrenTypes() { return CHILD_TYPES; @@ -124,9 +112,6 @@ public class BucketNormalizable extends Normalizable { .map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex())) .collect(Collectors.toList())); break; - case RECORD: - children.addAll(records); - break; case PARTITION_SCORE: children.addAll(bucket.getPartitionScores().stream() .map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex())) @@ -145,7 +130,6 @@ public class BucketNormalizable extends Normalizable { double oldScore = bucket.getAnomalyScore(); bucket.setAnomalyScore(maxScore); return maxScore != oldScore; - case RECORD: case PARTITION_SCORE: return false; default: @@ -156,7 +140,7 @@ public class BucketNormalizable extends Normalizable { @Override public void setParentScore(double parentScore) { - throw new IllegalStateException("Bucket has no parent"); + throw new UnsupportedOperationException("Bucket has no parent"); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java index afa23dbfe9b..2c929ff4f1a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java @@ -52,7 +52,7 @@ public class Normalizer { * normalizer */ public void normalize(Integer bucketSpan, boolean perPartitionNormalization, - List results, String quantilesState) { + List results, String quantilesState) { NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan, perPartitionNormalization, executorService); NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler(); @@ -123,7 +123,7 @@ public class Normalizer { * Updates the normalized scores on the results. */ private void mergeNormalizedScoresIntoResults(List normalizedScores, - List results) { + List results) { Iterator scoresIter = normalizedScores.iterator(); for (Normalizable result : results) { mergeRecursively(scoresIter, null, false, result); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java index 0a7e8662d2b..f8a929f9026 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java @@ -22,7 +22,7 @@ public class PartitionScoreNormalizable extends AbstractLeafNormalizable { @Override public String getId() { - throw new IllegalStateException("PartitionScore has no ID as is should not be persisted outside of the owning bucket"); + throw new UnsupportedOperationException("PartitionScore has no ID as it should not be persisted outside of the owning bucket"); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index bac6b8e148f..bfb1f852d31 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -15,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Influencer; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayList; @@ -32,9 +30,9 @@ public class ScoresUpdater { private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class); /** - * Target number of records to renormalize at a time + * Target number of buckets to renormalize at a time */ - private static final int TARGET_RECORDS_TO_RENORMALIZE = 100000; + private static final int TARGET_BUCKETS_TO_RENORMALIZE = 100000; // 30 days private static final long DEFAULT_RENORMALIZATION_WINDOW_MS = 2592000000L; @@ -44,24 +42,22 @@ public class ScoresUpdater { private static final long SECONDS_IN_DAY = 86400; private static final long MILLISECONDS_IN_SECOND = 1000; - private final Job job; + private final String jobId; private final JobProvider jobProvider; private final JobRenormalizedResultsPersister updatesPersister; private final NormalizerFactory normalizerFactory; private int bucketSpan; private long normalizationWindow; - private boolean perPartitionNormalization; private volatile boolean shutdown; public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister, NormalizerFactory normalizerFactory) { - this.job = job; + jobId = job.getId(); this.jobProvider = Objects.requireNonNull(jobProvider); updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister); this.normalizerFactory = Objects.requireNonNull(normalizerFactory); bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue(); normalizationWindow = getNormalizationWindowOrDefault(job); - perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig()); } /** @@ -79,162 +75,118 @@ public class ScoresUpdater { DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW * bucketSpan * MILLISECONDS_IN_SECOND); } - private static boolean getPerPartitionNormalizationOrDefault(AnalysisConfig analysisConfig) { - return (analysisConfig != null) && analysisConfig.getUsePerPartitionNormalization(); - } - /** * Update the anomaly score field on all previously persisted buckets * and all contained records */ public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) { - Normalizer normalizer = normalizerFactory.create(job.getId()); + Normalizer normalizer = normalizerFactory.create(jobId); int[] counts = {0, 0}; - updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, - perPartitionNormalization); - updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); + updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); + updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); + updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); - LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]); + // The updates will have been persisted in batches throughout the renormalization + // process - this call just catches any leftovers + updatesPersister.executeRequest(); + + LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", jobId, counts[0], counts[1]); } private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { BatchedDocumentsIterator> bucketsIterator = - jobProvider.newBatchedBucketsIterator(job.getId()) + jobProvider.newBatchedBucketsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); - // Make a list of buckets to be renormalized. - // This may be shorter than the original list of buckets for two - // reasons: - // 1) We don't bother with buckets that have raw score 0 and no - // records - // 2) We limit the total number of records to be not much more - // than 100000 List bucketsToRenormalize = new ArrayList<>(); - int batchRecordCount = 0; - int skipped = 0; while (bucketsIterator.hasNext() && shutdown == false) { - // Get a batch of buckets without their records to calculate - // how many buckets can be sensibly retrieved Deque> buckets = bucketsIterator.next(); if (buckets.isEmpty()) { + LOGGER.debug("[{}] No buckets to renormalize for job", jobId); break; } while (!buckets.isEmpty() && shutdown == false) { Result current = buckets.removeFirst(); - Bucket currentBucket = current.result; - if (currentBucket.isNormalizable()) { - BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.index); - List recordNormalizables = - bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime()); - batchRecordCount += recordNormalizables.size(); - bucketNormalizable.setRecords(recordNormalizables); - bucketsToRenormalize.add(bucketNormalizable); - - } else { - ++skipped; - } - - if (batchRecordCount >= TARGET_RECORDS_TO_RENORMALIZE) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, - batchRecordCount, skipped, counts, perPartitionNormalization); - - bucketsToRenormalize = new ArrayList<>(); - batchRecordCount = 0; - skipped = 0; + if (current.result.isNormalizable()) { + bucketsToRenormalize.add(new BucketNormalizable(current.result, current.index)); + if (bucketsToRenormalize.size() >= TARGET_BUCKETS_TO_RENORMALIZE) { + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); + bucketsToRenormalize.clear(); + } } } } if (!bucketsToRenormalize.isEmpty()) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, batchRecordCount, skipped, counts, - perPartitionNormalization); + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); } } - private List bucketRecordsAsNormalizables(long bucketTimeStamp) { - BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId()) - .timeRange(bucketTimeStamp, bucketTimeStamp + 1) - .includeInterim(false); - - List recordNormalizables = new ArrayList<>(); - while (recordsIterator.hasNext() && shutdown == false) { - for (Result record : recordsIterator.next() ) { - recordNormalizables.add(new RecordNormalizable(record.result, record.index)); - } - } - - return recordNormalizables; - } - private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) { return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs); } private void normalizeBuckets(Normalizer normalizer, List normalizableBuckets, - String quantilesState, int recordCount, int skipped, int[] counts, - boolean perPartitionNormalization) { - if (shutdown) { - return; + String quantilesState, int[] counts, boolean perPartitionNormalization) { + normalizer.normalize(bucketSpan, perPartitionNormalization, normalizableBuckets, quantilesState); + + for (BucketNormalizable bucketNormalizable : normalizableBuckets) { + if (bucketNormalizable.hadBigNormalizedUpdate()) { + updatesPersister.updateBucket(bucketNormalizable); + ++counts[0]; + } else { + ++counts[1]; + } } - - LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)", - job.getId(), normalizableBuckets.size(), recordCount, skipped); - - List asNormalizables = normalizableBuckets.stream().collect(Collectors.toList()); - normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); - - for (BucketNormalizable bn : normalizableBuckets) { - updateSingleBucket(counts, bn); - } - - updatesPersister.executeRequest(job.getId()); } - private void updateSingleBucket(int[] counts, BucketNormalizable bucketNormalizable) { - if (bucketNormalizable.hadBigNormalizedUpdate()) { - if (perPartitionNormalization) { - List anomalyRecords = bucketNormalizable.getRecords().stream() - .map(RecordNormalizable::getRecord).collect(Collectors.toList()); - PerPartitionMaxProbabilities ppProbs = new PerPartitionMaxProbabilities(anomalyRecords); - updatesPersister.updateResult(ppProbs.getId(), bucketNormalizable.getOriginatingIndex(), ppProbs); + private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs, + long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(jobId) + .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) + .includeInterim(false); + + while (recordsIterator.hasNext() && shutdown == false) { + Deque> records = recordsIterator.next(); + if (records.isEmpty()) { + LOGGER.debug("[{}] No records to renormalize for job", jobId); + break; } - updatesPersister.updateBucket(bucketNormalizable); - ++counts[0]; - } else { - ++counts[1]; + LOGGER.debug("[{}] Will renormalize a batch of {} records", jobId, records.size()); + List asNormalizables = records.stream() + .map(recordResultIndex -> new RecordNormalizable(recordResultIndex.result, recordResultIndex.index)) + .collect(Collectors.toList()); + normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); + + persistChanged(counts, asNormalizables); } - - persistChanged(counts, bucketNormalizable.getRecords()); } private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, - long windowExtensionMs, int[] counts) { - BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(job.getId()) + long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); while (influencersIterator.hasNext() && shutdown == false) { Deque> influencers = influencersIterator.next(); if (influencers.isEmpty()) { - LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); + LOGGER.debug("[{}] No influencers to renormalize for job", jobId); break; } - LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size()); + LOGGER.debug("[{}] Will renormalize a batch of {} influencers", jobId, influencers.size()); List asNormalizables = influencers.stream() - .map(influencerResultIndex -> - new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index)) + .map(influencerResultIndex -> new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index)) .collect(Collectors.toList()); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); persistChanged(counts, asNormalizables); } - - updatesPersister.executeRequest(job.getId()); } private void persistChanged(int[] counts, List asNormalizables) { @@ -242,7 +194,7 @@ public class ScoresUpdater { return; } - List toUpdate = asNormalizables.stream().filter(n -> n.hadBigNormalizedUpdate()).collect(Collectors.toList()); + List toUpdate = asNormalizables.stream().filter(Normalizable::hadBigNormalizedUpdate).collect(Collectors.toList()); counts[0] += toUpdate.size(); counts[1] += asNormalizables.size() - toUpdate.size(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java index e893bd5ab6f..a27e065682f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.results; +import org.elasticsearch.Version; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -93,7 +94,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { private boolean isInterim; private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; - private Map perPartitionMaxProbability = Collections.emptyMap(); private List partitionScores = Collections.emptyList(); public Bucket(String jobId, Date timestamp, long bucketSpan) { @@ -114,11 +114,9 @@ public class Bucket extends ToXContentToBytes implements Writeable { this.isInterim = other.isInterim; this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.processingTimeMs = other.processingTimeMs; - this.perPartitionMaxProbability = other.perPartitionMaxProbability; this.partitionScores = new ArrayList<>(other.partitionScores); } - @SuppressWarnings("unchecked") public Bucket(StreamInput in) throws IOException { jobId = in.readString(); timestamp = new Date(in.readLong()); @@ -131,7 +129,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { isInterim = in.readBoolean(); bucketInfluencers = in.readList(BucketInfluencer::new); processingTimeMs = in.readLong(); - perPartitionMaxProbability = (Map) in.readGenericValue(); + // bwc for perPartitionMaxProbability + if (in.getVersion().before(Version.V_5_5_0_UNRELEASED)) { + in.readGenericValue(); + } partitionScores = in.readList(PartitionScore::new); } @@ -148,7 +149,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { out.writeBoolean(isInterim); out.writeList(bucketInfluencers); out.writeLong(processingTimeMs); - out.writeGenericValue(perPartitionMaxProbability); + // bwc for perPartitionMaxProbability + if (out.getVersion().before(Version.V_5_5_0_UNRELEASED)) { + out.writeGenericValue(Collections.emptyMap()); + } out.writeList(partitionScores); } @@ -290,14 +294,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { partitionScores = Objects.requireNonNull(scores); } - public Map getPerPartitionMaxProbability() { - return perPartitionMaxProbability; - } - - public void setPerPartitionMaxProbability(Map perPartitionMaxProbability) { - this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability); - } - public double partitionInitialAnomalyScore(String partitionValue) { Optional first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue())) .findFirst(); @@ -315,7 +311,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { @Override public int hashCode() { return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, recordCount, records, - isInterim, bucketSpan, bucketInfluencers); + isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs); } /** @@ -338,18 +334,20 @@ public class Bucket extends ToXContentToBytes implements Writeable { && (this.recordCount == that.recordCount) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) && Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim) - && Objects.equals(this.bucketInfluencers, that.bucketInfluencers); + && Objects.equals(this.bucketInfluencers, that.bucketInfluencers) + && Objects.equals(this.partitionScores, that.partitionScores) + && (this.processingTimeMs == that.processingTimeMs); } /** - * This method encapsulated the logic for whether a bucket should be - * normalized. Buckets that have no records and a score of - * zero should not be normalized as their score will not change and they + * This method encapsulated the logic for whether a bucket should be normalized. + * Buckets that have a zero anomaly score themselves and no partition scores with + * non-zero score should not be normalized as their score will not change and they * will just add overhead. * * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - return anomalyScore > 0.0 || recordCount > 0; + return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java deleted file mode 100644 index f4b06b18203..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.results; - -import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.utils.time.TimeUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collector; -import java.util.stream.Collectors; - -/** - * When per-partition normalization is enabled this class represents - * the max anomalous probabilities of each partition per bucket. These values - * calculated from the bucket's anomaly records. - */ -public class PerPartitionMaxProbabilities extends ToXContentToBytes implements Writeable { - - /** - * Result type - */ - public static final String RESULT_TYPE_VALUE = "partition_normalized_probs"; - - /* - * Field Names - */ - public static final ParseField PER_PARTITION_MAX_PROBABILITIES = new ParseField("per_partition_max_probabilities"); - public static final ParseField MAX_RECORD_SCORE = new ParseField("max_record_score"); - - @SuppressWarnings("unchecked") - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> - new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (long) a[2], (List) a[3])); - - static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { - return new Date(p.longValue()); - } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { - return new Date(TimeUtils.dateStringToEpoch(p.text())); - } - throw new IllegalArgumentException( - "unexpected token [" + p.currentToken() + "] for [" + Result.TIMESTAMP.getPreferredName() + "]"); - }, Result.TIMESTAMP, ObjectParser.ValueType.VALUE); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN); - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES); - PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE); - } - - private final String jobId; - private final Date timestamp; - private final long bucketSpan; - private final List perPartitionMaxProbabilities; - - public PerPartitionMaxProbabilities(String jobId, Date timestamp, long bucketSpan, - List partitionProbabilities) { - this.jobId = jobId; - this.timestamp = timestamp; - this.bucketSpan = bucketSpan; - this.perPartitionMaxProbabilities = partitionProbabilities; - } - - public PerPartitionMaxProbabilities(List records) { - if (records.isEmpty()) { - throw new IllegalArgumentException("PerPartitionMaxProbabilities cannot be created from an empty list of records"); - } - this.jobId = records.get(0).getJobId(); - this.timestamp = records.get(0).getTimestamp(); - this.bucketSpan = records.get(0).getBucketSpan(); - this.perPartitionMaxProbabilities = calcMaxRecordScorePerPartition(records); - } - - public PerPartitionMaxProbabilities(StreamInput in) throws IOException { - jobId = in.readString(); - timestamp = new Date(in.readLong()); - bucketSpan = in.readLong(); - perPartitionMaxProbabilities = in.readList(PartitionProbability::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(jobId); - out.writeLong(timestamp.getTime()); - out.writeLong(bucketSpan); - out.writeList(perPartitionMaxProbabilities); - } - - public String getJobId() { - return jobId; - } - - public String getId() { - return jobId + "_" + RESULT_TYPE_VALUE + "_" + timestamp.getTime() + "_" + bucketSpan; - } - - public Date getTimestamp() { - return timestamp; - } - - public List getPerPartitionMaxProbabilities() { - return perPartitionMaxProbabilities; - } - - public double getMaxProbabilityForPartition(String partitionValue) { - Optional first = - perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst(); - - return first.isPresent() ? first.get().getMaxRecordScore() : 0.0; - } - - /** - * Box class for the stream collector function below - */ - private final class DoubleMaxBox { - private double value = 0.0; - - DoubleMaxBox() { - } - - public void accept(double d) { - if (d > value) { - value = d; - } - } - - public DoubleMaxBox combine(DoubleMaxBox other) { - return (this.value > other.value) ? this : other; - } - - public Double value() { - return this.value; - } - } - - private List calcMaxRecordScorePerPartition(List anomalyRecords) { - Map maxValueByPartition = anomalyRecords.stream().collect( - Collectors.groupingBy(AnomalyRecord::getPartitionFieldValue, - Collector.of(DoubleMaxBox::new, (m, ar) -> m.accept(ar.getRecordScore()), - DoubleMaxBox::combine, DoubleMaxBox::value))); - - List pProbs = new ArrayList<>(); - for (Map.Entry entry : maxValueByPartition.entrySet()) { - pProbs.add(new PartitionProbability(entry.getKey(), entry.getValue())); - } - - return pProbs; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(Job.ID.getPreferredName(), jobId); - builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); - builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan); - builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities); - builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); - builder.endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, bucketSpan); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof PerPartitionMaxProbabilities == false) { - return false; - } - - PerPartitionMaxProbabilities that = (PerPartitionMaxProbabilities) other; - - return Objects.equals(this.jobId, that.jobId) - && Objects.equals(this.timestamp, that.timestamp) - && this.bucketSpan == that.bucketSpan - && Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities); - } - - /** - * Class for partitionValue, maxRecordScore pairs - */ - public static class PartitionProbability extends ToXContentToBytes implements Writeable { - - public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("partitionProbability", - a -> new PartitionProbability((String) a[0], (double) a[1])); - - static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE); - PARSER.declareDouble(ConstructingObjectParser.constructorArg(), MAX_RECORD_SCORE); - } - - private final String partitionValue; - private final double maxRecordScore; - - PartitionProbability(String partitionName, double maxRecordScore) { - this.partitionValue = partitionName; - this.maxRecordScore = maxRecordScore; - } - - public PartitionProbability(StreamInput in) throws IOException { - partitionValue = in.readString(); - maxRecordScore = in.readDouble(); - } - - public String getPartitionValue() { - return partitionValue; - } - - public double getMaxRecordScore() { - return maxRecordScore; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(partitionValue); - out.writeDouble(maxRecordScore); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject() - .field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue) - .field(MAX_RECORD_SCORE.getPreferredName(), maxRecordScore) - .endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(partitionValue, maxRecordScore); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof PartitionProbability == false) { - return false; - } - - PartitionProbability that = (PartitionProbability) other; - - return Objects.equals(this.partitionValue, that.partitionValue) - && this.maxRecordScore == that.maxRecordScore; - } - } -} - - diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index 0fcb31b8c69..a05306de6b5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -141,9 +141,6 @@ public final class ReservedFieldNames { ModelSnapshot.LATEST_RESULT_TIME.getPreferredName(), ModelSnapshot.RETAIN.getPreferredName(), - PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), - PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName(), - Result.RESULT_TYPE.getPreferredName(), Result.TIMESTAMP.getPreferredName(), Result.IS_INTERIM.getPreferredName() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java index dac32c3dff2..a8148e02638 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetBucketActionResponseTests.java @@ -16,9 +16,7 @@ import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class GetBucketActionResponseTests extends AbstractStreamableTestCase { @@ -64,14 +62,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase perPartitionMaxProbability = new HashMap<>(size); - for (int i = 0; i < size; i++) { - perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble()); - } - bucket.setPerPartitionMaxProbability(perPartitionMaxProbability); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 1a1d11489f9..6d2c44bd0f5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -129,7 +129,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { Quantiles quantiles = createQuantiles(); builder.addQuantiles(quantiles); - resultProcessor.process(builder.buildTestProcess(), false); + resultProcessor.process(builder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build(); @@ -179,7 +179,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addFlushAcknowledgement(createFlushAcknowledgement()) .addBucket(nonInterimBucket); // and this will delete the interim results - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -212,7 +212,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addRecords(finalAnomalyRecords) .addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); @@ -237,7 +237,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { .addBucket(bucket) // bucket triggers persistence .addRecords(secondSetOfRecords); - resultProcessor.process(resultBuilder.buildTestProcess(), false); + resultProcessor.process(resultBuilder.buildTestProcess()); jobResultsPersister.commitResultWrites(JOB_ID); QueryPage persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java index fa37d65fee8..ce84f4eca7b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java @@ -33,7 +33,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { BucketNormalizable bn = createBucketNormalizable(); JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); persister.updateBucket(bn); - persister.executeRequest("foo"); + persister.executeRequest(); assertEquals(0, persister.getBulkRequest().numberOfActions()); } @@ -42,7 +42,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { when(bulkResponse.hasFailures()).thenReturn(false); Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build(); - return new JobRenormalizedResultsPersister(Settings.EMPTY, client); + return new JobRenormalizedResultsPersister("foo", Settings.EMPTY, client); } private BucketNormalizable createBucketNormalizable() { @@ -52,4 +52,4 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1)); return new BucketNormalizable(bucket, "foo-index"); } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 91691b41bc6..aab18bca312 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.junit.Before; import org.mockito.InOrder; @@ -72,7 +71,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(iterator.next()).thenReturn(autodetectResult); AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); verify(renormalizer, times(1)).waitUntilIdle(); assertEquals(0, processorUnderTest.completionLatch.getCount()); } @@ -82,7 +81,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -100,7 +99,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = true; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); @@ -118,7 +117,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); @@ -136,7 +135,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); @@ -147,7 +146,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getRecords()).thenReturn(records); processorUnderTest.processResult(context, result); - verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); verify(bulkBuilder, times(1)).persistRecords(records); verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); @@ -157,7 +155,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123); @@ -175,7 +173,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); @@ -191,7 +189,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -209,7 +207,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); @@ -232,7 +230,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelPlot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); @@ -246,7 +244,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSizeStats() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); @@ -261,7 +259,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSnapshot() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) @@ -281,7 +279,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); @@ -304,7 +302,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(iterator.next()).thenReturn(autodetectResult); AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); @@ -325,7 +323,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any()); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); verify(persister, times(2)).persistModelSizeStats(any()); } @@ -338,7 +336,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); processorUnderTest.setProcessKilled(); - processorUnderTest.process(process, randomBoolean()); + processorUnderTest.process(process); processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java index 1ec0af798b1..d39d20f4898 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java @@ -77,7 +77,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } @@ -86,7 +86,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, + expectThrows(UnsupportedOperationException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java index c1eab334a24..ba32d1d258f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.stream.Collectors; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; @@ -83,7 +82,7 @@ public class BucketNormalizableTests extends ESTestCase { } public void testGetProbability() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability()); + expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability()); } public void testGetNormalizedScore() { @@ -101,23 +100,17 @@ public class BucketNormalizableTests extends ESTestCase { public void testGetChildren() { BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); - bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) - .collect(Collectors.toList())); List children = bn.getChildren(); - assertEquals(6, children.size()); + assertEquals(4, children.size()); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON); assertTrue(children.get(1) instanceof BucketInfluencerNormalizable); assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON); - assertTrue(children.get(2) instanceof RecordNormalizable); - assertEquals(1.0, children.get(2).getNormalizedScore(), EPSILON); - assertTrue(children.get(3) instanceof RecordNormalizable); - assertEquals(2.0, children.get(3).getNormalizedScore(), EPSILON); - assertTrue(children.get(4) instanceof PartitionScoreNormalizable); - assertEquals(0.2, children.get(4).getNormalizedScore(), EPSILON); - assertTrue(children.get(5) instanceof PartitionScoreNormalizable); - assertEquals(0.4, children.get(5).getNormalizedScore(), EPSILON); + assertTrue(children.get(2) instanceof PartitionScoreNormalizable); + assertEquals(0.2, children.get(2).getNormalizedScore(), EPSILON); + assertTrue(children.get(3) instanceof PartitionScoreNormalizable); + assertEquals(0.4, children.get(3).getNormalizedScore(), EPSILON); } public void testGetChildren_GivenTypeBucketInfluencer() { @@ -131,24 +124,11 @@ public class BucketNormalizableTests extends ESTestCase { assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON); } - public void testGetChildren_GivenTypeRecord() { - BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); - bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) - .collect(Collectors.toList())); - List children = bn.getChildren(Normalizable.ChildType.RECORD); - - assertEquals(2, children.size()); - assertTrue(children.get(0) instanceof RecordNormalizable); - assertEquals(1.0, children.get(0).getNormalizedScore(), EPSILON); - assertTrue(children.get(1) instanceof RecordNormalizable); - assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON); - } - public void testSetMaxChildrenScore_GivenDifferentScores() { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 42.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 42.0)); assertEquals(95.0, bucket.getAnomalyScore(), EPSILON); } @@ -157,13 +137,13 @@ public class BucketNormalizableTests extends ESTestCase { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 2.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 2.0)); assertEquals(88.0, bucket.getAnomalyScore(), EPSILON); } public void testSetParentScore() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0)); + expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0)); } public void testResetBigChangeFlag() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java index b4ae0029f34..ac87ab9bf34 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java @@ -74,7 +74,7 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } @@ -83,7 +83,7 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index 8cbd149158d..7c005f2c956 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -83,6 +83,7 @@ public class ScoresUpdaterTests extends ESTestCase { scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory); givenProviderReturnsNoBuckets(); + givenProviderReturnsNoRecords(); givenProviderReturnsNoInfluencers(); givenNormalizerFactoryReturnsMock(); givenNormalizerRaisesBigChangeFlag(); @@ -153,10 +154,10 @@ public class ScoresUpdaterTests extends ESTestCase { scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - verifyNormalizerWasInvoked(1); + verifyNormalizerWasInvoked(2); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); - verify(jobRenormalizedResultsPersister, times(2)).executeRequest(anyString()); + verify(jobRenormalizedResultsPersister, times(1)).executeRequest(); } public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException { @@ -229,7 +230,7 @@ public class ScoresUpdaterTests extends ESTestCase { verifyNormalizerWasInvoked(1); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); - verify(jobRenormalizedResultsPersister, times(1)).executeRequest(anyString()); + verify(jobRenormalizedResultsPersister, times(1)).executeRequest(); } public void testUpdate_GivenShutdown() throws IOException { @@ -381,6 +382,10 @@ public class ScoresUpdaterTests extends ESTestCase { when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); } + private void givenProviderReturnsNoRecords() { + givenProviderReturnsRecords(new ArrayDeque<>()); + } + private void givenProviderReturnsRecords(Deque records) { Deque> batch = new ArrayDeque<>(); List>> batches = new ArrayList<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index ab185d55dfb..ec52d5453a0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -13,9 +13,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class BucketTests extends AbstractSerializingTestCase { @@ -61,14 +59,6 @@ public class BucketTests extends AbstractSerializingTestCase { } bucket.setPartitionScores(partitionScores); } - if (randomBoolean()) { - int size = randomInt(10); - Map perPartitionMaxProbability = new HashMap<>(size); - for (int i = 0; i < size; i++) { - perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble()); - } - bucket.setPerPartitionMaxProbability(perPartitionMaxProbability); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } @@ -244,11 +234,11 @@ public class BucketTests extends AbstractSerializingTestCase { assertFalse(bucket.isNormalizable()); } - public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() { + public void testIsNormalizable_GivenAnomalyScoreIsZeroAndPartitionsScoresAreNonZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123)); bucket.setAnomalyScore(0.0); - bucket.setRecordCount(1); + bucket.setPartitionScores(Collections.singletonList(new PartitionScore("n", "v", 50.0, 40.0, 0.01))); assertTrue(bucket.isNormalizable()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java deleted file mode 100644 index 14ac057d5ba..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilitiesTests.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.results; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import org.joda.time.DateTime; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCase { - - @Override - protected PerPartitionMaxProbabilities createTestInstance() { - int num = randomIntBetween(1, 10); - List pps = new ArrayList<>(); - for (int i=0; i instanceReader() { - return PerPartitionMaxProbabilities::new; - } - - @Override - protected PerPartitionMaxProbabilities parseInstance(XContentParser parser) { - return PerPartitionMaxProbabilities.PARSER.apply(parser, null); - } - - public void testCreateFromAListOfRecords() { - List records = new ArrayList<>(); - records.add(createAnomalyRecord("A", 20.0)); - records.add(createAnomalyRecord("A", 40.0)); - records.add(createAnomalyRecord("B", 90.0)); - records.add(createAnomalyRecord("B", 15.0)); - records.add(createAnomalyRecord("B", 45.0)); - - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records); - - List pProbs = ppMax.getPerPartitionMaxProbabilities(); - assertEquals(2, pProbs.size()); - for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) { - if (pProb.getPartitionValue().equals("A")) { - assertEquals(40.0, pProb.getMaxRecordScore(), 0.0001); - } else { - assertEquals(90.0, pProb.getMaxRecordScore(), 0.0001); - } - } - } - - public void testMaxProbabilityForPartition() { - List records = new ArrayList<>(); - records.add(createAnomalyRecord("A", 20.0)); - records.add(createAnomalyRecord("A", 40.0)); - records.add(createAnomalyRecord("B", 90.0)); - records.add(createAnomalyRecord("B", 15.0)); - records.add(createAnomalyRecord("B", 45.0)); - - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records); - - assertEquals(40.0, ppMax.getMaxProbabilityForPartition("A"), 0.0001); - assertEquals(90.0, ppMax.getMaxProbabilityForPartition("B"), 0.0001); - } - - public void testId() { - PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities("job-foo", new Date(100L), 300L, Collections.emptyList()); - assertEquals("job-foo_partition_normalized_probs_100_300", ppMax.getId()); - } - - private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double recordScore) { - AnomalyRecord record = new AnomalyRecord("foo", new Date(), 600); - record.setPartitionFieldValue(partitionFieldValue); - record.setRecordScore(recordScore); - return record; - } -}