From bc04bda8d62e1d2a44310255d5f2a2a452c8bf80 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 17 Jan 2017 13:11:57 +0000 Subject: [PATCH] Remember the index each result came from (elastic/elasticsearch#727) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Delete unused batched ModelSnapshot iterator * Pass source index with normalisable results * Refactor Normalizable * Rework persisting renormalised results * Spell normalize with a ā€˜zā€™ * Rename ResultIndex -> ResultWithIndex * Expand wildcard import * Make Normalisable child type an enum Original commit: elastic/x-pack-elasticsearch@52450abafd4341a936d2da82c103a66da2dfc35e --- .../org/elasticsearch/xpack/ml/MlPlugin.java | 3 +- .../ElasticsearchBatchedBucketsIterator.java | 5 +- ...asticsearchBatchedInfluencersIterator.java | 7 +- .../ElasticsearchBatchedRecordsIterator.java | 37 ++ .../ElasticsearchBatchedResultsIterator.java | 13 +- .../xpack/ml/job/persistence/JobProvider.java | 23 +- .../JobRenormalizedResultsPersister.java | 112 +++-- .../job/persistence/JobResultsPersister.java | 2 +- .../job/persistence/ResultsFilterBuilder.java | 5 + .../normalizer/AbstractLeafNormalizable.java | 13 +- .../BucketInfluencerNormalizable.java | 19 +- .../normalizer/BucketNormalizable.java | 71 ++- .../normalizer/InfluencerNormalizable.java | 19 +- .../job/process/normalizer/Normalizable.java | 70 ++- .../ml/job/process/normalizer/Normalizer.java | 2 +- .../process/normalizer/NormalizerResult.java | 2 +- .../PartitionScoreNormalizable.java | 19 +- .../normalizer/RecordNormalizable.java | 19 +- .../job/process/normalizer/ScoresUpdater.java | 143 +++--- .../xpack/ml/job/results/AnomalyRecord.java | 18 - .../xpack/ml/job/results/Bucket.java | 29 +- .../xpack/ml/job/results/Influencer.java | 18 - .../xpack/ml/job/results/PartitionScore.java | 17 +- .../ml/job/persistence/JobProviderTests.java | 5 +- .../JobRenormalizedResultsPersisterTests.java | 38 ++ .../MockBatchedDocumentsIterator.java | 16 +- .../BucketInfluencerNormalizableTests.java | 42 +- .../normalizer/BucketNormalizableTests.java | 65 +-- .../InfluencerNormalizableTests.java | 45 +- .../process/normalizer/NormalizerTests.java | 12 +- .../normalizer/ScoresUpdaterTests.java | 426 ++++++++---------- .../xpack/ml/job/results/BucketTests.java | 8 - 32 files changed, 690 insertions(+), 633 deletions(-) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index a706344198f..fbb37897ea2 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -171,8 +171,7 @@ public class MlPlugin extends Plugin implements ActionPlugin { NamedXContentRegistry xContentRegistry) { JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); JobProvider jobProvider = new JobProvider(client, 0); - JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings, - jobResultsPersister); + JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings, client); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java index 49a9ac338d8..2fec4c7511e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedBucketsIterator.java @@ -23,7 +23,7 @@ class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIte } @Override - protected Bucket map(SearchHit hit) { + protected ResultWithIndex map(SearchHit hit) { BytesReference source = hit.getSourceRef(); XContentParser parser; try { @@ -31,6 +31,7 @@ class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIte } catch (IOException e) { throw new ElasticsearchParseException("failed to parse bucket", e); } - return Bucket.PARSER.apply(parser, null); + Bucket bucket = Bucket.PARSER.apply(parser, null); + return new ResultWithIndex<>(hit.getIndex(), bucket); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java index da9754c08f7..2e499fbadc0 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedInfluencersIterator.java @@ -22,7 +22,7 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult } @Override - protected Influencer map(SearchHit hit) { + protected ResultWithIndex map(SearchHit hit) { BytesReference source = hit.getSourceRef(); XContentParser parser; try { @@ -31,6 +31,7 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult throw new ElasticsearchParseException("failed to parser influencer", e); } - return Influencer.PARSER.apply(parser, null); + Influencer influencer = Influencer.PARSER.apply(parser, null); + return new ResultWithIndex<>(hit.getIndex(), influencer); } -} +} \ No newline at end of file diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java new file mode 100644 index 00000000000..77aef798487 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedRecordsIterator.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; + +import java.io.IOException; + +class ElasticsearchBatchedRecordsIterator extends ElasticsearchBatchedResultsIterator { + + public ElasticsearchBatchedRecordsIterator(Client client, String jobId) { + super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE); + } + + @Override + protected ResultWithIndex map(SearchHit hit) { + BytesReference source = hit.getSourceRef(); + XContentParser parser; + try { + parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse record", e); + } + AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null); + return new ResultWithIndex<>(hit.getIndex(), record); + } +} \ No newline at end of file diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java index 6cc0aea7db9..eea8efa96b5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchBatchedResultsIterator.java @@ -9,7 +9,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.xpack.ml.job.results.Result; -abstract class ElasticsearchBatchedResultsIterator extends ElasticsearchBatchedDocumentsIterator { +public abstract class ElasticsearchBatchedResultsIterator + extends ElasticsearchBatchedDocumentsIterator> { public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType) { super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), @@ -20,4 +21,14 @@ abstract class ElasticsearchBatchedResultsIterator extends ElasticsearchBatch protected String getType() { return Result.TYPE.getPreferredName(); } + + public static class ResultWithIndex { + public final String indexName; + public final T result; + + public ResultWithIndex(String indexName, T result) { + this.indexName = indexName; + this.result = result; + } + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 0d3c9263d57..b3a69cdcba2 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -401,6 +401,7 @@ public class JobProvider { List partitionProbs = handlePartitionMaxNormailizedProbabilitiesResponse(item2.getResponse()); mergePartitionScoresIntoBucket(partitionProbs, buckets.results(), query.getPartitionValue()); + if (query.isExpand()) { Iterator bucketsToExpand = buckets.results().stream() .filter(bucket -> bucket.getRecordCount() > 0).iterator(); @@ -484,22 +485,29 @@ public class JobProvider { /** * Returns a {@link BatchedDocumentsIterator} that allows querying - * and iterating over a large number of buckets of the given job + * and iterating over a large number of buckets of the given job. + * The bucket and source indexes are returned by the iterator. * * @param jobId the id of the job for which buckets are requested * @return a bucket {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator newBatchedBucketsIterator(String jobId) { + public BatchedDocumentsIterator> newBatchedBucketsIterator(String jobId) { return new ElasticsearchBatchedBucketsIterator(client, jobId); } /** - * Expand a bucket to include the associated records. + * Returns a {@link BatchedDocumentsIterator} that allows querying + * and iterating over a large number of records in the given job + * The records and source indexes are returned by the iterator. * - * @param jobId the job id - * @param includeInterim Include interim results - * @param bucket The bucket to be expanded + * @param jobId the id of the job for which buckets are requested + * @return a record {@link BatchedDocumentsIterator} */ + public BatchedDocumentsIterator> + newBatchedRecordsIterator(String jobId) { + return new ElasticsearchBatchedRecordsIterator(client, jobId); + } + // TODO (norelease): Use scroll search instead of multiple searches with increasing from public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from, Consumer consumer, Consumer errorHandler) { @@ -753,7 +761,8 @@ public class JobProvider { * @param jobId the id of the job for which influencers are requested * @return an influencer {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator newBatchedInfluencersIterator(String jobId) { + public BatchedDocumentsIterator> + newBatchedInfluencersIterator(String jobId) { return new ElasticsearchBatchedInfluencersIterator(client, jobId); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java index 773d0fe07e6..b3109e969f5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java @@ -5,75 +5,97 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable; +import org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable; import org.elasticsearch.xpack.ml.job.results.Bucket; -import org.elasticsearch.xpack.ml.job.results.Influencer; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; +import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; +import org.elasticsearch.xpack.ml.job.results.Result; +import java.io.IOException; import java.util.List; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + /** * Interface for classes that update {@linkplain Bucket Buckets} * for a particular job with new normalized anomaly scores and * unusual scores. - * + *

* Renormalized results must already have an ID. */ public class JobRenormalizedResultsPersister extends AbstractComponent { - private final JobResultsPersister jobResultsPersister; + private final Client client; + private BulkRequest bulkRequest; - public JobRenormalizedResultsPersister(Settings settings, JobResultsPersister jobResultsPersister) { + public JobRenormalizedResultsPersister(Settings settings, Client client) { super(settings); - this.jobResultsPersister = jobResultsPersister; + this.client = client; + bulkRequest = new BulkRequest(); + } + + public void updateBucket(BucketNormalizable normalizable) { + updateResult(normalizable.getId(), normalizable.getOriginatingIndex(), normalizable.getBucket()); + updateBucketInfluencersStandalone(normalizable.getOriginatingIndex(), normalizable.getBucket().getBucketInfluencers()); + } + + private void updateBucketInfluencersStandalone(String indexName, List bucketInfluencers) { + if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { + for (BucketInfluencer bucketInfluencer : bucketInfluencers) { + updateResult(bucketInfluencer.getId(), indexName, bucketInfluencer); + } + } + } + + public void updateResults(List normalizables) { + for (Normalizable normalizable : normalizables) { + updateResult(normalizable.getId(), normalizable.getOriginatingIndex(), normalizable); + } + } + + public void updateResult(String id, String index, ToXContent resultDoc) { + try { + XContentBuilder content = toXContentBuilder(resultDoc); + bulkRequest.add(new IndexRequest(index, Result.TYPE.getPreferredName(), id).source(content)); + } catch (IOException e) { + logger.error("Error serialising result", e); + } + } + + private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { + XContentBuilder builder = jsonBuilder(); + obj.toXContent(builder, ToXContent.EMPTY_PARAMS); + return builder; } /** - * Update the bucket with the changes that may result - * due to renormalization. + * Execute the bulk action * - * @param bucket the bucket to update + * @param jobId The job Id */ - public void updateBucket(Bucket bucket) { - jobResultsPersister.bulkPersisterBuilder(bucket.getJobId()).persistBucket(bucket).executeRequest(); + public void executeRequest(String jobId) { + if (bulkRequest.numberOfActions() == 0) { + return; + } + logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); + + BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet(); + if (addRecordsResponse.hasFailures()) { + logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); + } } - /** - * Update the anomaly records for a particular job. - * The anomaly records are updated with the values in records and - * stored with the ID returned by {@link AnomalyRecord#getId()} - * - * @param jobId Id of the job to update - * @param records The updated records - */ - public void updateRecords(String jobId, List records) { - jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records).executeRequest(); - } - - /** - * Create a {@link PerPartitionMaxProbabilities} object from this list of records and persist - * with the given ID. - * - * @param jobId Id of the job to update - * @param records Source of the new {@link PerPartitionMaxProbabilities} object - */ - public void updatePerPartitionMaxProbabilities(String jobId, List records) { - PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records); - jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs).executeRequest(); - } - - /** - * Update the influencer for a particular job. - * The Influencer's are stored with the ID in {@link Influencer#getId()} - * - * @param jobId Id of the job to update - * @param influencers The updated influencers - */ - public void updateInfluencer(String jobId, List influencers) { - jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers).executeRequest(); + BulkRequest getBulkRequest() { + return bulkRequest; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 113296b60e2..710737b9c58 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -114,7 +114,7 @@ public class JobResultsPersister extends AbstractComponent { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer); - // Need consistent IDs to ensure overwriting on renormalisation + // Need consistent IDs to ensure overwriting on renormalization String id = bucketInfluencer.getId(); logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilder.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilder.java index a6d03697dc2..c615ff7af7d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilder.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilder.java @@ -11,6 +11,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayList; import java.util.List; @@ -88,6 +89,10 @@ class ResultsFilterBuilder { return this; } + ResultsFilterBuilder resultType(String resultType) { + return term(Result.RESULT_TYPE.getPreferredName(), resultType); + } + private void addQuery(QueryBuilder fb) { queries.add(fb); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java index 1dc6713d313..4bb3e5f6748 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/AbstractLeafNormalizable.java @@ -8,14 +8,19 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import java.util.Collections; import java.util.List; -abstract class AbstractLeafNormalizable implements Normalizable { +abstract class AbstractLeafNormalizable extends Normalizable { + + public AbstractLeafNormalizable(String indexName) { + super(indexName); + } + @Override public final boolean isContainerOnly() { return false; } @Override - public final List getChildrenTypes() { + public final List getChildrenTypes() { return Collections.emptyList(); } @@ -25,12 +30,12 @@ abstract class AbstractLeafNormalizable implements Normalizable { } @Override - public final List getChildren(int type) { + public final List getChildren(ChildType type) { throw new IllegalStateException(getClass().getSimpleName() + " has no children"); } @Override - public final boolean setMaxChildrenScore(int childrenType, double maxScore) { + public final boolean setMaxChildrenScore(ChildType childrenType, double maxScore) { throw new IllegalStateException(getClass().getSimpleName() + " has no children"); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizable.java index f761f02a1f5..00412a46944 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizable.java @@ -5,18 +5,26 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; +import java.io.IOException; import java.util.Objects; class BucketInfluencerNormalizable extends AbstractLeafNormalizable { private final BucketInfluencer bucketInfluencer; - public BucketInfluencerNormalizable(BucketInfluencer influencer) { + public BucketInfluencerNormalizable(BucketInfluencer influencer, String indexName) { + super(indexName); bucketInfluencer = Objects.requireNonNull(influencer); } + @Override + public String getId() { + return bucketInfluencer.getId(); + } + @Override public Level getLevel() { return BucketInfluencer.BUCKET_TIME.equals(bucketInfluencer.getInfluencerFieldName()) ? @@ -69,12 +77,7 @@ class BucketInfluencerNormalizable extends AbstractLeafNormalizable { } @Override - public void resetBigChangeFlag() { - // Do nothing - } - - @Override - public void raiseBigChangeFlag() { - // Do nothing + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return bucketInfluencer.toXContent(builder, params); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java index 2f9c91782d6..0ad9d4da99a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java @@ -5,27 +5,44 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.results.Bucket; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER; +import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE; +import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.RECORD; -class BucketNormalizable implements Normalizable { - private static final int BUCKET_INFLUENCER = 0; - private static final int RECORD = 1; - private static final int PARTITION_SCORE = 2; - private static final List CHILDREN_TYPES = - Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE); +public class BucketNormalizable extends Normalizable { + + private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE); private final Bucket bucket; - public BucketNormalizable(Bucket bucket) { + private List records = Collections.emptyList(); + + public BucketNormalizable(Bucket bucket, String indexName) { + super(indexName); this.bucket = Objects.requireNonNull(bucket); } + public Bucket getBucket() { + return bucket; + } + + @Override + public String getId() { + return bucket.getId(); + } + @Override public boolean isContainerOnly() { return true; @@ -76,35 +93,44 @@ class BucketNormalizable implements Normalizable { bucket.setAnomalyScore(normalizedScore); } + public List getRecords() { + return records; + } + + public void setRecords(List records) { + this.records = records; + } + @Override - public List getChildrenTypes() { - return CHILDREN_TYPES; + public List getChildrenTypes() { + return CHILD_TYPES; } @Override public List getChildren() { List children = new ArrayList<>(); - for (Integer type : getChildrenTypes()) { + for (ChildType type : getChildrenTypes()) { children.addAll(getChildren(type)); } return children; } @Override - public List getChildren(int type) { + public List getChildren(ChildType type) { List children = new ArrayList<>(); switch (type) { case BUCKET_INFLUENCER: - bucket.getBucketInfluencers().stream().forEach( - influencer -> children.add(new BucketInfluencerNormalizable(influencer))); + children.addAll(bucket.getBucketInfluencers().stream() + .map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex())) + .collect(Collectors.toList())); break; case RECORD: - bucket.getRecords().stream().forEach( - record -> children.add(new RecordNormalizable(record))); + children.addAll(records); break; case PARTITION_SCORE: - bucket.getPartitionScores().stream().forEach( - partitionScore -> children.add(new PartitionScoreNormalizable(partitionScore))); + children.addAll(bucket.getPartitionScores().stream() + .map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex())) + .collect(Collectors.toList())); break; default: throw new IllegalArgumentException("Invalid type: " + type); @@ -113,7 +139,7 @@ class BucketNormalizable implements Normalizable { } @Override - public boolean setMaxChildrenScore(int childrenType, double maxScore) { + public boolean setMaxChildrenScore(ChildType childrenType, double maxScore) { double oldScore = 0.0; switch (childrenType) { case BUCKET_INFLUENCER: @@ -138,12 +164,7 @@ class BucketNormalizable implements Normalizable { } @Override - public void resetBigChangeFlag() { - bucket.resetBigNormalizedUpdateFlag(); - } - - @Override - public void raiseBigChangeFlag() { - bucket.raiseBigNormalizedUpdateFlag(); + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return bucket.toXContent(builder, params); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizable.java index d26a2dc1a54..c7aa5e051bd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizable.java @@ -5,17 +5,25 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.results.Influencer; +import java.io.IOException; import java.util.Objects; class InfluencerNormalizable extends AbstractLeafNormalizable { private final Influencer influencer; - public InfluencerNormalizable(Influencer influencer) { + public InfluencerNormalizable(Influencer influencer, String indexName) { + super(indexName); this.influencer = Objects.requireNonNull(influencer); } + @Override + public String getId() { + return influencer.getId(); + } + @Override public Level getLevel() { return Level.INFLUENCER; @@ -67,12 +75,7 @@ class InfluencerNormalizable extends AbstractLeafNormalizable { } @Override - public void resetBigChangeFlag() { - influencer.resetBigNormalizedUpdateFlag(); - } - - @Override - public void raiseBigChangeFlag() { - influencer.raiseBigNormalizedUpdateFlag(); + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return influencer.toXContent(builder, params); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java index 834deb97f6e..21784122ee5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java @@ -5,9 +5,27 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; -import java.util.List; +import org.elasticsearch.common.xcontent.ToXContent; + +import java.util.List; +import java.util.Objects; + +public abstract class Normalizable implements ToXContent { + public enum ChildType {BUCKET_INFLUENCER, RECORD, PARTITION_SCORE}; + + private final String indexName; + private boolean hadBigNormalizedUpdate; + + public Normalizable(String indexName) { + this.indexName = Objects.requireNonNull(indexName); + } + + /** + * The document ID of the underlying result. + * @return The document Id string + */ + public abstract String getId(); -interface Normalizable { /** * A {@code Normalizable} may be the owner of scores or just a * container of other {@code Normalizable} objects. A container only @@ -16,40 +34,40 @@ interface Normalizable { * * @return true if this {@code Normalizable} is only a container */ - boolean isContainerOnly(); + abstract boolean isContainerOnly(); - Level getLevel(); + abstract Level getLevel(); - String getPartitionFieldName(); + abstract String getPartitionFieldName(); - String getPartitionFieldValue(); + abstract String getPartitionFieldValue(); - String getPersonFieldName(); + abstract String getPersonFieldName(); - String getFunctionName(); + abstract String getFunctionName(); - String getValueFieldName(); + abstract String getValueFieldName(); - double getProbability(); + abstract double getProbability(); - double getNormalizedScore(); + abstract double getNormalizedScore(); - void setNormalizedScore(double normalizedScore); + abstract void setNormalizedScore(double normalizedScore); - List getChildrenTypes(); + abstract List getChildrenTypes(); - List getChildren(); + abstract List getChildren(); - List getChildren(int type); + abstract List getChildren(ChildType type); /** * Set the aggregate normalized score for a type of children * - * @param childrenType the integer that corresponds to a children type + * @param type the child type * @param maxScore the aggregate normalized score of the children * @return true if the score has changed or false otherwise */ - boolean setMaxChildrenScore(int childrenType, double maxScore); + abstract boolean setMaxChildrenScore(ChildType type, double maxScore); /** * If this {@code Normalizable} holds the score of its parent, @@ -57,9 +75,21 @@ interface Normalizable { * * @param parentScore the score of the parent {@code Normalizable} */ - void setParentScore(double parentScore); + abstract void setParentScore(double parentScore); - void resetBigChangeFlag(); + public boolean hadBigNormalizedUpdate() { + return hadBigNormalizedUpdate; + } - void raiseBigChangeFlag(); + public void resetBigChangeFlag() { + hadBigNormalizedUpdate = false; + } + + public void raiseBigChangeFlag() { + hadBigNormalizedUpdate = true; + } + + public String getOriginatingIndex() { + return indexName; + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java index 75cb3f8ee51..afa23dbfe9b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java @@ -170,7 +170,7 @@ public class Normalizer { } } - for (Integer childrenType : result.getChildrenTypes()) { + for (Normalizable.ChildType childrenType : result.getChildrenTypes()) { List children = result.getChildren(childrenType); if (!children.isEmpty()) { double maxChildrenScore = 0.0; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerResult.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerResult.java index 1c2c17667b8..daac2cdf8eb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerResult.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerResult.java @@ -17,7 +17,7 @@ import java.io.IOException; import java.util.Objects; /** - * Parse the output of the normaliser process, for example: + * Parse the output of the normalizer process, for example: * * {"probability":0.01,"normalized_score":2.2} */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java index 88bcb0786f1..78c3455cebb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java @@ -5,18 +5,26 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.results.PartitionScore; +import java.io.IOException; import java.util.Objects; public class PartitionScoreNormalizable extends AbstractLeafNormalizable { private final PartitionScore score; - public PartitionScoreNormalizable(PartitionScore score) { + public PartitionScoreNormalizable(PartitionScore score, String indexName) { + super(indexName); this.score = Objects.requireNonNull(score); } + @Override + public String getId() { + throw new IllegalStateException("PartitionScore has no ID as is should not be persisted outside of the owning bucket"); + } + @Override public Level getLevel() { return Level.PARTITION; @@ -68,12 +76,7 @@ public class PartitionScoreNormalizable extends AbstractLeafNormalizable { } @Override - public void resetBigChangeFlag() { - score.resetBigNormalizedUpdateFlag(); - } - - @Override - public void raiseBigChangeFlag() { - score.raiseBigNormalizedUpdateFlag(); + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return score.toXContent(builder, params); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/RecordNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/RecordNormalizable.java index 3975a93a8b7..3980f37d6aa 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/RecordNormalizable.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/RecordNormalizable.java @@ -5,18 +5,26 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; +import java.io.IOException; import java.util.Objects; class RecordNormalizable extends AbstractLeafNormalizable { private final AnomalyRecord record; - public RecordNormalizable(AnomalyRecord record) { + public RecordNormalizable(AnomalyRecord record, String indexName) { + super(indexName); this.record = Objects.requireNonNull(record); } + @Override + public String getId() { + return record.getId(); + } + @Override public Level getLevel() { return Level.LEAF; @@ -69,12 +77,11 @@ class RecordNormalizable extends AbstractLeafNormalizable { } @Override - public void resetBigChangeFlag() { - record.resetBigNormalizedUpdateFlag(); + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return record.toXContent(builder, params); } - @Override - public void raiseBigChangeFlag() { - record.raiseBigNormalizedUpdateFlag(); + public AnomalyRecord getRecord() { + return record; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index 7c53b9b0f0b..7b830a898c6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -10,16 +10,19 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Influencer; +import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -89,45 +92,49 @@ public class ScoresUpdater { public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) { Normalizer normalizer = normalizerFactory.create(job.getId()); int[] counts = {0, 0}; - updateBuckets(normalizer, quantilesState, endBucketEpochMs, - windowExtensionMs, counts, perPartitionNormalization); - updateInfluencers(normalizer, quantilesState, endBucketEpochMs, - windowExtensionMs, counts); + updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, + perPartitionNormalization); + updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); LOGGER.info("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]); } private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { - BatchedDocumentsIterator bucketsIterator = + BatchedDocumentsIterator> bucketsIterator = jobProvider.newBatchedBucketsIterator(job.getId()) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); - // Make a list of buckets with their records to be renormalized. + // Make a list of buckets to be renormalized. // This may be shorter than the original list of buckets for two // reasons: // 1) We don't bother with buckets that have raw score 0 and no // records // 2) We limit the total number of records to be not much more // than 100000 - List bucketsToRenormalize = new ArrayList<>(); + List bucketsToRenormalize = new ArrayList<>(); int batchRecordCount = 0; int skipped = 0; while (bucketsIterator.hasNext()) { // Get a batch of buckets without their records to calculate // how many buckets can be sensibly retrieved - Deque buckets = bucketsIterator.next(); + Deque> buckets = bucketsIterator.next(); if (buckets.isEmpty()) { - LOGGER.debug("[{}] No buckets to renormalize for job", job.getId()); break; } while (!buckets.isEmpty()) { - Bucket currentBucket = buckets.removeFirst(); + ElasticsearchBatchedResultsIterator.ResultWithIndex current = buckets.removeFirst(); + Bucket currentBucket = current.result; if (currentBucket.isNormalizable()) { - bucketsToRenormalize.add(currentBucket); - batchRecordCount += jobProvider.expandBucket(job.getId(), false, currentBucket); + BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName); + List recordNormalizables = + bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime()); + batchRecordCount += recordNormalizables.size(); + bucketNormalizable.setRecords(recordNormalizables); + bucketsToRenormalize.add(bucketNormalizable); + } else { ++skipped; } @@ -143,78 +150,73 @@ public class ScoresUpdater { } } if (!bucketsToRenormalize.isEmpty()) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, - batchRecordCount, skipped, counts, perPartitionNormalization); + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, batchRecordCount, skipped, counts, + perPartitionNormalization); } } + private List bucketRecordsAsNormalizables(long bucketTimeStamp) { + BatchedDocumentsIterator> + recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId()) + .timeRange(bucketTimeStamp, bucketTimeStamp + 1); + + List recordNormalizables = new ArrayList<>(); + while (recordsIterator.hasNext()) { + for (ElasticsearchBatchedResultsIterator.ResultWithIndex record : recordsIterator.next() ) { + recordNormalizables.add(new RecordNormalizable(record.result, record.indexName)); + } + } + + return recordNormalizables; + } + private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) { return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs); } - private void normalizeBuckets(Normalizer normalizer, List buckets, String quantilesState, - int recordCount, int skipped, int[] counts, boolean perPartitionNormalization) { + private void normalizeBuckets(Normalizer normalizer, List normalizableBuckets, + String quantilesState, int recordCount, int skipped, int[] counts, + boolean perPartitionNormalization) { LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)", - job.getId(), buckets.size(), recordCount, skipped); + job.getId(), normalizableBuckets.size(), recordCount, skipped); - List asNormalizables = buckets.stream() - .map(bucket -> new BucketNormalizable(bucket)).collect(Collectors.toList()); + List asNormalizables = normalizableBuckets.stream() + .map(Function.identity()).collect(Collectors.toList()); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); - for (Bucket bucket : buckets) { - updateSingleBucket(bucket, counts, perPartitionNormalization); + for (BucketNormalizable bn : normalizableBuckets) { + updateSingleBucket(counts, bn); } + + updatesPersister.executeRequest(job.getId()); } - /** - * Update the anomaly score and unsual score fields on the bucket provided - * and all contained records - * - * @param counts Element 0 will be incremented if we update a document and - * element 1 if we don't - */ - private void updateSingleBucket(Bucket bucket, int[] counts, boolean perPartitionNormalization) { - updateBucketIfItHasBigChange(bucket, counts, perPartitionNormalization); - updateRecordsThatHaveBigChange(bucket, counts); - } - - private void updateBucketIfItHasBigChange(Bucket bucket, int[] counts, boolean perPartitionNormalization) { - if (bucket.hadBigNormalizedUpdate()) { + private void updateSingleBucket(int[] counts, BucketNormalizable bucketNormalizable) { + if (bucketNormalizable.hadBigNormalizedUpdate()) { if (perPartitionNormalization) { - updatesPersister.updatePerPartitionMaxProbabilities(job.getId(), bucket.getRecords()); + List anomalyRecords = bucketNormalizable.getRecords().stream() + .map(RecordNormalizable::getRecord).collect(Collectors.toList()); + PerPartitionMaxProbabilities ppProbs = new PerPartitionMaxProbabilities(anomalyRecords); + updatesPersister.updateResult(ppProbs.getId(), bucketNormalizable.getOriginatingIndex(), ppProbs); } - updatesPersister.updateBucket(bucket); + updatesPersister.updateBucket(bucketNormalizable); ++counts[0]; } else { ++counts[1]; } - } - private void updateRecordsThatHaveBigChange(Bucket bucket, int[] counts) { - List toUpdate = new ArrayList<>(); - - for (AnomalyRecord record : bucket.getRecords()) { - if (record.hadBigNormalizedUpdate()) { - toUpdate.add(record); - ++counts[0]; - } else { - ++counts[1]; - } - } - - if (!toUpdate.isEmpty()) { - updatesPersister.updateRecords(job.getId(), toUpdate); - } + persistChanged(counts, bucketNormalizable.getRecords()); } private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts) { - BatchedDocumentsIterator influencersIterator = jobProvider - .newBatchedInfluencersIterator(job.getId()) + BatchedDocumentsIterator> influencersIterator = + jobProvider.newBatchedInfluencersIterator(job.getId()) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); + while (influencersIterator.hasNext()) { - Deque influencers = influencersIterator.next(); + Deque> influencers = influencersIterator.next(); if (influencers.isEmpty()) { LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); break; @@ -222,21 +224,24 @@ public class ScoresUpdater { LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size()); List asNormalizables = influencers.stream() - .map(bucket -> new InfluencerNormalizable(bucket)).collect(Collectors.toList()); + .map(influencerResultIndex -> + new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.indexName)) + .collect(Collectors.toList()); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); - List toUpdate = new ArrayList<>(); - for (Influencer influencer : influencers) { - if (influencer.hadBigNormalizedUpdate()) { - toUpdate.add(influencer); - ++counts[0]; - } else { - ++counts[1]; - } - } - if (!toUpdate.isEmpty()) { - updatesPersister.updateInfluencer(job.getId(), toUpdate); - } + persistChanged(counts, asNormalizables); + } + + updatesPersister.executeRequest(job.getId()); + } + + private void persistChanged(int[] counts, List asNormalizables) { + List toUpdate = asNormalizables.stream().filter(n -> n.hadBigNormalizedUpdate()).collect(Collectors.toList()); + + counts[0] += toUpdate.size(); + counts[1] += asNormalizables.size() - toUpdate.size(); + if (!toUpdate.isEmpty()) { + updatesPersister.updateResults(toUpdate); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java index 68053c71661..d570dbfbb09 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java @@ -146,8 +146,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { private List influencers; - private boolean hadBigNormalizedUpdate; - public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); @@ -487,9 +485,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { @Override public int hashCode() { - - // hadBigNormalizedUpdate is deliberately excluded from the hash - return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore, normalizedProbability, initialNormalizedProbability, typical, actual, function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue, @@ -510,7 +505,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { AnomalyRecord that = (AnomalyRecord) other; - // hadBigNormalizedUpdate is deliberately excluded from the test return Objects.equals(this.jobId, that.jobId) && this.detectorIndex == that.detectorIndex && this.sequenceNum == that.sequenceNum @@ -536,16 +530,4 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { && Objects.equals(this.causes, that.causes) && Objects.equals(this.influencers, that.influencers); } - - public boolean hadBigNormalizedUpdate() { - return this.hadBigNormalizedUpdate; - } - - public void resetBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = false; - } - - public void raiseBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = true; - } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java index 21d69feca1d..57c34ff641d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java @@ -95,7 +95,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { private List records = new ArrayList<>(); private long eventCount; private boolean isInterim; - private boolean hadBigNormalizedUpdate; private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; private Map perPartitionMaxProbability = Collections.emptyMap(); @@ -118,7 +117,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { this.records = new ArrayList<>(other.records); this.eventCount = other.eventCount; this.isInterim = other.isInterim; - this.hadBigNormalizedUpdate = other.hadBigNormalizedUpdate; this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.processingTimeMs = other.processingTimeMs; this.perPartitionMaxProbability = other.perPartitionMaxProbability; @@ -330,7 +328,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { @Override public int hashCode() { - // hadBigNormalizedUpdate is deliberately excluded from the hash return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records, isInterim, bucketSpan, bucketInfluencers); } @@ -350,7 +347,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { Bucket that = (Bucket) other; - // hadBigNormalizedUpdate is deliberately excluded from the test return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp) && (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) @@ -359,38 +355,15 @@ public class Bucket extends ToXContentToBytes implements Writeable { && Objects.equals(this.bucketInfluencers, that.bucketInfluencers); } - public boolean hadBigNormalizedUpdate() { - return hadBigNormalizedUpdate; - } - - public void resetBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = false; - } - - public void raiseBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = true; - } - /** * This method encapsulated the logic for whether a bucket should be - * normalized. The decision depends on two factors. - * - * The first is whether the bucket has bucket influencers. Since bucket - * influencers were introduced, every bucket must have at least one bucket - * influencer. If it does not, it means it is a bucket persisted with an - * older version and should not be normalized. - * - * The second factor has to do with minimising the number of buckets that - * are sent for normalization. Buckets that have no records and a score of + * normalized. Buckets that have no records and a score of * zero should not be normalized as their score will not change and they * will just add overhead. * * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - if (bucketInfluencers.isEmpty()) { - return false; - } return anomalyScore > 0.0 || recordCount > 0; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java index 41ba9ca6a82..d5c3f0f20dd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java @@ -78,7 +78,6 @@ public class Influencer extends ToXContentToBytes implements Writeable { private double probability; private double initialAnomalyScore; private double anomalyScore; - private boolean hadBigNormalizedUpdate; private boolean isInterim; public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) { @@ -187,23 +186,8 @@ public class Influencer extends ToXContentToBytes implements Writeable { isInterim = value; } - public boolean hadBigNormalizedUpdate() { - return this.hadBigNormalizedUpdate; - } - - public void resetBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = false; - } - - public void raiseBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = true; - } - @Override public int hashCode() { - - // hadBigNormalizedUpdate is deliberately excluded from the hash - return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim, bucketSpan, sequenceNum); } @@ -223,8 +207,6 @@ public class Influencer extends ToXContentToBytes implements Writeable { } Influencer other = (Influencer) obj; - - // hadBigNormalizedUpdate is deliberately excluded from the test return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp) && Objects.equals(influenceField, other.influenceField) && Objects.equals(influenceValue, other.influenceValue) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/PartitionScore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/PartitionScore.java index 047cf76d6c0..097ed71aa24 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/PartitionScore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/results/PartitionScore.java @@ -24,7 +24,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable { private final double initialAnomalyScore; private double anomalyScore; private double probability; - private boolean hadBigNormalizedUpdate; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3], @@ -39,7 +38,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable { } public PartitionScore(String fieldName, String fieldValue, double initialAnomalyScore, double anomalyScore, double probability) { - hadBigNormalizedUpdate = false; partitionFieldName = fieldName; partitionFieldValue = fieldValue; this.initialAnomalyScore = initialAnomalyScore; @@ -121,22 +119,9 @@ public class PartitionScore extends ToXContentToBytes implements Writeable { PartitionScore that = (PartitionScore) other; - // hadBigNormalizedUpdate is deliberately excluded from the test - // as is id, which is generated by the datastore + // id is excluded from the test as it is generated by the datastore return Objects.equals(this.partitionFieldValue, that.partitionFieldValue) && Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability) && (this.initialAnomalyScore == that.initialAnomalyScore) && (this.anomalyScore == that.anomalyScore); } - - public boolean hadBigNormalizedUpdate() { - return hadBigNormalizedUpdate; - } - - public void resetBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = false; - } - - public void raiseBigNormalizedUpdateFlag() { - hadBigNormalizedUpdate = true; - } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index ecb040b7d05..2bf73157836 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -638,6 +638,7 @@ public class JobProviderTests extends ESTestCase { QueryPage recordPage = holder[0]; assertEquals(2L, recordPage.count()); List records = recordPage.results(); + assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001); assertEquals(33.3, records.get(0).getActual().get(0), 0.000001); assertEquals("irritable", records.get(0).getFunction()); @@ -700,9 +701,9 @@ public class JobProviderTests extends ESTestCase { Integer[] holder = new Integer[1]; provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new); int records = holder[0]; + // This is not realistic, but is an artifact of the fact that the mock - // query - // returns all the records, not a subset + // query returns all the records, not a subset assertEquals(1200L, records); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java new file mode 100644 index 00000000000..1779f1dfd97 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable; +import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; + +import java.util.Date; + +public class JobRenormalizedResultsPersisterTests extends ESTestCase { + + public void testUpdateBucket() { + Date now = new Date(); + Bucket bucket = new Bucket("foo", now, 1); + int sequenceNum = 0; + bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); + bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); + BucketNormalizable bn = new BucketNormalizable(bucket, "foo-index"); + + JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); + persister.updateBucket(bn); + + assertEquals(3, persister.getBulkRequest().numberOfActions()); + assertEquals("foo-index", persister.getBulkRequest().requests().get(0).index()); + } + + private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() { + Client client = new MockClientBuilder("cluster").build(); + return new JobRenormalizedResultsPersister(Settings.EMPTY, client); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java index f1da78d9b3c..1a57325a970 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockBatchedDocumentsIterator.java @@ -12,34 +12,20 @@ import java.util.NoSuchElementException; import static org.junit.Assert.assertEquals; public class MockBatchedDocumentsIterator implements BatchedDocumentsIterator { - private final Long startEpochMs; - private final Long endEpochMs; private final List> batches; private int index; private boolean wasTimeRangeCalled; private String interimFieldName; - public MockBatchedDocumentsIterator(long startEpochMs, long endEpochMs, List> batches) { - this((Long) startEpochMs, (Long) endEpochMs, batches); - } - public MockBatchedDocumentsIterator(List> batches) { - this(null, null, batches); - } - - private MockBatchedDocumentsIterator(Long startEpochMs, Long endEpochMs, List> batches) { this.batches = batches; index = 0; wasTimeRangeCalled = false; interimFieldName = ""; - this.startEpochMs = startEpochMs; - this.endEpochMs = endEpochMs; } @Override public BatchedDocumentsIterator timeRange(long startEpochMs, long endEpochMs) { - assertEquals(this.startEpochMs.longValue(), startEpochMs); - assertEquals(this.endEpochMs.longValue(), endEpochMs); wasTimeRangeCalled = true; return this; } @@ -52,7 +38,7 @@ public class MockBatchedDocumentsIterator implements BatchedDocumentsIterator @Override public Deque next() { - if ((startEpochMs != null && !wasTimeRangeCalled) || !hasNext()) { + if ((!wasTimeRangeCalled) || !hasNext()) { throw new NoSuchElementException(); } return batches.get(index++); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java index f89c0196efe..157020aef68 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketInfluencerNormalizableTests.java @@ -14,6 +14,7 @@ import java.util.Date; public class BucketInfluencerNormalizableTests extends ESTestCase { private static final double EPSILON = 0.0001; + private static final String INDEX_NAME = "foo-index"; private BucketInfluencer bucketInfluencer; @Before @@ -27,43 +28,43 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testIsContainerOnly() { - assertFalse(new BucketInfluencerNormalizable(bucketInfluencer).isContainerOnly()); + assertFalse(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).isContainerOnly()); } public void testGetLevel() { - assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer).getLevel()); + assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getLevel()); BucketInfluencer timeInfluencer = new BucketInfluencer("foo", new Date(), 600, 1); timeInfluencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME); - assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer).getLevel()); + assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer, INDEX_NAME).getLevel()); } public void testGetPartitionFieldName() { - assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getPartitionFieldName()); + assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getPartitionFieldName()); } public void testGetPersonFieldName() { - assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer).getPersonFieldName()); + assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getPersonFieldName()); } public void testGetFunctionName() { - assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getFunctionName()); + assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getFunctionName()); } public void testGetValueFieldName() { - assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getValueFieldName()); + assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getValueFieldName()); } public void testGetProbability() { - assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer).getProbability(), EPSILON); + assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getProbability(), EPSILON); } public void testGetNormalizedScore() { - assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer).getNormalizedScore(), EPSILON); + assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getNormalizedScore(), EPSILON); } public void testSetNormalizedScore() { - BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer); + BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME); normalizable.setNormalizedScore(99.0); @@ -72,23 +73,26 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testGetChildrenTypes() { - assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildrenTypes().isEmpty()); + assertTrue(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getChildrenTypes().isEmpty()); } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).getChildren(0)); + expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) + .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } public void testGetChildren() { - assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildren().isEmpty()); + assertTrue(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getChildren().isEmpty()); } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).setMaxChildrenScore(0, 42.0)); + expectThrows(IllegalStateException.class, + () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME) + .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } public void testSetParentScore() { - new BucketInfluencerNormalizable(bucketInfluencer).setParentScore(42.0); + new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).setParentScore(42.0); assertEquals("airline", bucketInfluencer.getInfluencerFieldName()); assertEquals(1.0, bucketInfluencer.getAnomalyScore(), EPSILON); @@ -98,10 +102,14 @@ public class BucketInfluencerNormalizableTests extends ESTestCase { } public void testResetBigChangeFlag() { - new BucketInfluencerNormalizable(bucketInfluencer).resetBigChangeFlag(); + BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME); + normalizable.resetBigChangeFlag(); + assertFalse(normalizable.hadBigNormalizedUpdate()); } public void testRaiseBigChangeFlag() { - new BucketInfluencerNormalizable(bucketInfluencer).raiseBigChangeFlag(); + BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME); + normalizable.raiseBigChangeFlag(); + assertTrue(normalizable.hadBigNormalizedUpdate()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java index 5de053a6d3c..5531df7e4c5 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; @@ -19,6 +20,7 @@ import org.junit.Before; public class BucketNormalizableTests extends ESTestCase { + private static final String INDEX_NAME = "foo-index"; private static final double EPSILON = 0.0001; private Bucket bucket; @@ -54,43 +56,43 @@ public class BucketNormalizableTests extends ESTestCase { } public void testIsContainerOnly() { - assertTrue(new BucketNormalizable(bucket).isContainerOnly()); + assertTrue(new BucketNormalizable(bucket, INDEX_NAME).isContainerOnly()); } public void testGetLevel() { - assertEquals(Level.ROOT, new BucketNormalizable(bucket).getLevel()); + assertEquals(Level.ROOT, new BucketNormalizable(bucket, INDEX_NAME).getLevel()); } public void testGetPartitionFieldName() { - assertNull(new BucketNormalizable(bucket).getPartitionFieldName()); + assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPartitionFieldName()); } public void testGetPartitionFieldValue() { - assertNull(new BucketNormalizable(bucket).getPartitionFieldValue()); + assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPartitionFieldValue()); } public void testGetPersonFieldName() { - assertNull(new BucketNormalizable(bucket).getPersonFieldName()); + assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPersonFieldName()); } public void testGetFunctionName() { - assertNull(new BucketNormalizable(bucket).getFunctionName()); + assertNull(new BucketNormalizable(bucket, INDEX_NAME).getFunctionName()); } public void testGetValueFieldName() { - assertNull(new BucketNormalizable(bucket).getValueFieldName()); + assertNull(new BucketNormalizable(bucket, INDEX_NAME).getValueFieldName()); } public void testGetProbability() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).getProbability()); + expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability()); } public void testGetNormalizedScore() { - assertEquals(88.0, new BucketNormalizable(bucket).getNormalizedScore(), EPSILON); + assertEquals(88.0, new BucketNormalizable(bucket, INDEX_NAME).getNormalizedScore(), EPSILON); } public void testSetNormalizedScore() { - BucketNormalizable normalizable = new BucketNormalizable(bucket); + BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME); normalizable.setNormalizedScore(99.0); @@ -99,8 +101,11 @@ public class BucketNormalizableTests extends ESTestCase { } public void testGetChildren() { - List children = new BucketNormalizable(bucket).getChildren(); + BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); + bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) + .collect(Collectors.toList())); + List children = bn.getChildren(); assertEquals(6, children.size()); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON); @@ -117,7 +122,8 @@ public class BucketNormalizableTests extends ESTestCase { } public void testGetChildren_GivenTypeBucketInfluencer() { - List children = new BucketNormalizable(bucket).getChildren(0); + BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); + List children = bn.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER); assertEquals(2, children.size()); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); @@ -127,7 +133,10 @@ public class BucketNormalizableTests extends ESTestCase { } public void testGetChildren_GivenTypeRecord() { - List children = new BucketNormalizable(bucket).getChildren(1); + BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); + bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME)) + .collect(Collectors.toList())); + List children = bn.getChildren(Normalizable.ChildType.RECORD); assertEquals(2, children.size()); assertTrue(children.get(0) instanceof RecordNormalizable); @@ -136,53 +145,45 @@ public class BucketNormalizableTests extends ESTestCase { assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON); } - public void testGetChildren_GivenInvalidType() { - expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).getChildren(3)); - } - public void testSetMaxChildrenScore_GivenDifferentScores() { - BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket); + BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); - assertTrue(bucketNormalizable.setMaxChildrenScore(0, 95.0)); - assertTrue(bucketNormalizable.setMaxChildrenScore(1, 42.0)); + assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0)); + assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 42.0)); assertEquals(95.0, bucket.getAnomalyScore(), EPSILON); assertEquals(42.0, bucket.getMaxNormalizedProbability(), EPSILON); } public void testSetMaxChildrenScore_GivenSameScores() { - BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket); + BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); - assertFalse(bucketNormalizable.setMaxChildrenScore(0, 88.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(1, 2.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0)); + assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 2.0)); assertEquals(88.0, bucket.getAnomalyScore(), EPSILON); assertEquals(2.0, bucket.getMaxNormalizedProbability(), EPSILON); } - public void testSetMaxChildrenScore_GivenInvalidType() { - expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).setMaxChildrenScore(3, 95.0)); - } - public void testSetParentScore() { - expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).setParentScore(42.0)); + expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0)); } public void testResetBigChangeFlag() { - BucketNormalizable normalizable = new BucketNormalizable(bucket); + BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME); normalizable.raiseBigChangeFlag(); normalizable.resetBigChangeFlag(); - assertFalse(bucket.hadBigNormalizedUpdate()); + assertFalse(normalizable.hadBigNormalizedUpdate()); } public void testRaiseBigChangeFlag() { - BucketNormalizable normalizable = new BucketNormalizable(bucket); + BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME); normalizable.resetBigChangeFlag(); normalizable.raiseBigChangeFlag(); - assertTrue(bucket.hadBigNormalizedUpdate()); + assertTrue(normalizable.hadBigNormalizedUpdate()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java index 00777aecf6a..9056a71cb7b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/InfluencerNormalizableTests.java @@ -12,6 +12,7 @@ import org.junit.Before; import java.util.Date; public class InfluencerNormalizableTests extends ESTestCase { + private static final String INDEX_NAME = "foo-index"; private static final double EPSILON = 0.0001; private Influencer influencer; @@ -24,43 +25,43 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testIsContainerOnly() { - assertFalse(new InfluencerNormalizable(influencer).isContainerOnly()); + assertFalse(new InfluencerNormalizable(influencer, INDEX_NAME).isContainerOnly()); } public void testGetLevel() { - assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer).getLevel()); + assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer, INDEX_NAME).getLevel()); } public void testGetPartitionFieldName() { - assertNull(new InfluencerNormalizable(influencer).getPartitionFieldName()); + assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getPartitionFieldName()); } public void testGetPartitionFieldValue() { - assertNull(new InfluencerNormalizable(influencer).getPartitionFieldValue()); + assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getPartitionFieldValue()); } public void testGetPersonFieldName() { - assertEquals("airline", new InfluencerNormalizable(influencer).getPersonFieldName()); + assertEquals("airline", new InfluencerNormalizable(influencer, INDEX_NAME).getPersonFieldName()); } public void testGetFunctionName() { - assertNull(new InfluencerNormalizable(influencer).getFunctionName()); + assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getFunctionName()); } public void testGetValueFieldName() { - assertNull(new InfluencerNormalizable(influencer).getValueFieldName()); + assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getValueFieldName()); } public void testGetProbability() { - assertEquals(0.05, new InfluencerNormalizable(influencer).getProbability(), EPSILON); + assertEquals(0.05, new InfluencerNormalizable(influencer, INDEX_NAME).getProbability(), EPSILON); } public void testGetNormalizedScore() { - assertEquals(1.0, new InfluencerNormalizable(influencer).getNormalizedScore(), EPSILON); + assertEquals(1.0, new InfluencerNormalizable(influencer, INDEX_NAME).getNormalizedScore(), EPSILON); } public void testSetNormalizedScore() { - InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); + InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME); normalizable.setNormalizedScore(99.0); @@ -69,40 +70,38 @@ public class InfluencerNormalizableTests extends ESTestCase { } public void testGetChildrenTypes() { - assertTrue(new InfluencerNormalizable(influencer).getChildrenTypes().isEmpty()); + assertTrue(new InfluencerNormalizable(influencer, INDEX_NAME).getChildrenTypes().isEmpty()); } public void testGetChildren_ByType() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).getChildren(0)); + expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + .getChildren(Normalizable.ChildType.BUCKET_INFLUENCER)); } public void testGetChildren() { - assertTrue(new InfluencerNormalizable(influencer).getChildren().isEmpty()); + assertTrue(new InfluencerNormalizable(influencer, INDEX_NAME).getChildren().isEmpty()); } public void testSetMaxChildrenScore() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setMaxChildrenScore(0, 42.0)); + expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME) + .setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0)); } public void testSetParentScore() { - expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setParentScore(42.0)); + expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME).setParentScore(42.0)); } public void testResetBigChangeFlag() { - InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); + InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME); normalizable.raiseBigChangeFlag(); - normalizable.resetBigChangeFlag(); - - assertFalse(influencer.hadBigNormalizedUpdate()); + assertFalse(normalizable.hadBigNormalizedUpdate()); } public void testRaiseBigChangeFlag() { - InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); + InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME); normalizable.resetBigChangeFlag(); - normalizable.raiseBigChangeFlag(); - - assertTrue(influencer.hadBigNormalizedUpdate()); + assertTrue(normalizable.hadBigNormalizedUpdate()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java index 4a8b4c0c887..123e4fc6a0c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java @@ -11,10 +11,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Date; -import java.util.Deque; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -28,6 +25,7 @@ import static org.mockito.Mockito.when; public class NormalizerTests extends ESTestCase { private static final String JOB_ID = "foo"; + private static final String INDEX_NAME = "foo-index"; private static final String QUANTILES_STATE = "someState"; private static final int BUCKET_SPAN = 600; private static final double INITIAL_SCORE = 2.0; @@ -57,12 +55,8 @@ public class NormalizerTests extends ESTestCase { Bucket bucket = generateBucket(new Date(0)); bucket.setAnomalyScore(0.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE)); - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - - List asNormalizables = buckets.stream() - .map(b -> new BucketNormalizable(b)).collect(Collectors.toList()); + List asNormalizables = Arrays.asList(new BucketNormalizable(bucket, INDEX_NAME)); normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE); assertEquals(1, asNormalizables.size()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index 24534a4109e..e213f2254d2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -5,18 +5,9 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.Deque; @@ -27,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator; @@ -36,13 +28,28 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.junit.Before; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.elasticsearch.mock.orig.Mockito.doAnswer; +import static org.elasticsearch.mock.orig.Mockito.never; +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.elasticsearch.mock.orig.Mockito.mock; +import static org.elasticsearch.mock.orig.Mockito.verify; +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; public class ScoresUpdaterTests extends ESTestCase { private static final String JOB_ID = "foo"; private static final String QUANTILES_STATE = "someState"; private static final long DEFAULT_BUCKET_SPAN = 3600; private static final long DEFAULT_START_TIME = 0; - private static final long DEFAULT_END_TIME = 3600; private JobProvider jobProvider = mock(JobProvider.class); private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class); @@ -57,6 +64,7 @@ public class ScoresUpdaterTests extends ESTestCase { } @Before + @SuppressWarnings("unchecked") public void setUpMocks() throws IOException { MockitoAnnotations.initMocks(this); @@ -72,9 +80,10 @@ public class ScoresUpdaterTests extends ESTestCase { scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory); - givenProviderReturnsNoBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME); - givenProviderReturnsNoInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME); + givenProviderReturnsNoBuckets(); + givenProviderReturnsNoInfluencers(); givenNormalizerFactoryReturnsMock(); + givenNormalizerRaisesBigChangeFlag(); } public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException { @@ -83,136 +92,70 @@ public class ScoresUpdaterTests extends ESTestCase { bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.7, 0.0)); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); + givenProviderReturnsBuckets(buckets); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); verifyNormalizerWasInvoked(0); - verifyBucketWasNotUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); + verifyNothingWasUpdated(); } - public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException { + public void testUpdate_GivenTwoBucketsOnlyOneUpdated() throws IOException { Bucket bucket = generateBucket(new Date(0)); + bucket.setAnomalyScore(30.0); + bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0)); + Deque buckets = new ArrayDeque<>(); + buckets.add(bucket); + bucket = generateBucket(new Date(1000)); bucket.setAnomalyScore(0.0); - bucket.setBucketInfluencers(new ArrayList<>()); - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); + + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(new ArrayDeque<>()); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - verifyNormalizerWasInvoked(0); - verifyBucketWasNotUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); + verifyNormalizerWasInvoked(1); + verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); } - public void testUpdate_GivenSingleBucketWithoutBigChangeAndNoRecords() throws IOException { + public void testUpdate_GivenSingleBucketWithAnomalyScoreAndNoRecords() throws IOException { Bucket bucket = generateBucket(new Date(0)); + bucket.setAnomalyScore(42.0); + bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); + bucket.setMaxNormalizedProbability(50.0); + + Deque buckets = new ArrayDeque<>(); + buckets.add(bucket); + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(new ArrayDeque<>()); + + scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + + verifyNormalizerWasInvoked(1); + verifyBucketWasUpdated(1); + } + + public void testUpdate_GivenSingleBucketAndRecords() throws IOException { + Bucket bucket = generateBucket(new Date(DEFAULT_START_TIME)); bucket.setAnomalyScore(30.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0)); - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); - - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - - verifyNormalizerWasInvoked(1); - verifyBucketWasNotUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); - } - - public void testUpdate_GivenSingleBucketWithoutBigChangeAndRecordsWithoutBigChange() throws IOException { - Bucket bucket = generateBucket(new Date(0)); - bucket.setAnomalyScore(30.0); - bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0)); - List records = new ArrayList<>(); - AnomalyRecord record1 = createRecordWithoutBigChange(); - AnomalyRecord record2 = createRecordWithoutBigChange(); + Deque records = new ArrayDeque<>(); + AnomalyRecord record1 = createRecord(); + AnomalyRecord record2 = createRecord(); records.add(record1); records.add(record2); - bucket.setRecords(records); - bucket.setRecordCount(2); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(records); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); verifyNormalizerWasInvoked(1); - verifyBucketWasNotUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); - } - - public void testUpdate_GivenSingleBucketWithBigChangeAndNoRecords() throws IOException { - Bucket bucket = generateBucket(new Date(0)); - bucket.setAnomalyScore(42.0); - bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); - bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); - - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); - - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - - verifyNormalizerWasInvoked(1); - verifyBucketWasUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); - } - - public void testUpdate_GivenSingleBucketWithoutBigChangeAndSomeRecordsWithBigChange() throws IOException { - Bucket bucket = generateBucket(new Date(0)); - bucket.setAnomalyScore(42.0); - bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); - bucket.setMaxNormalizedProbability(50.0); - - List records = new ArrayList<>(); - AnomalyRecord record1 = createRecordWithBigChange(); - AnomalyRecord record2 = createRecordWithoutBigChange(); - AnomalyRecord record3 = createRecordWithBigChange(); - records.add(record1); - records.add(record2); - records.add(record3); - bucket.setRecords(records); - - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); - - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - - verifyNormalizerWasInvoked(1); - verifyBucketWasNotUpdated(bucket); - verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3)); - } - - public void testUpdate_GivenSingleBucketWithBigChangeAndSomeRecordsWithBigChange() throws IOException { - Bucket bucket = generateBucket(new Date(0)); - bucket.setAnomalyScore(42.0); - bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); - bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); - List records = new ArrayList<>(); - AnomalyRecord record1 = createRecordWithBigChange(); - AnomalyRecord record2 = createRecordWithoutBigChange(); - AnomalyRecord record3 = createRecordWithBigChange(); - records.add(record1); - records.add(record2); - records.add(record3); - bucket.setRecords(records); - - Deque buckets = new ArrayDeque<>(); - buckets.add(bucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); - - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); - - verifyNormalizerWasInvoked(1); - verifyBucketWasUpdated(bucket); - verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3)); + verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); + verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); + verify(jobRenormalizedResultsPersister, times(2)).executeRequest(anyString()); } public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException { @@ -222,7 +165,6 @@ public class ScoresUpdaterTests extends ESTestCase { bucket.setAnomalyScore(42.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); batch1.add(bucket); } @@ -230,11 +172,11 @@ public class ScoresUpdaterTests extends ESTestCase { secondBatchBucket.addBucketInfluencer(createTimeBucketInfluencer(secondBatchBucket.getTimestamp(), 0.04, 42.0)); secondBatchBucket.setAnomalyScore(42.0); secondBatchBucket.setMaxNormalizedProbability(50.0); - secondBatchBucket.raiseBigNormalizedUpdateFlag(); Deque batch2 = new ArrayDeque<>(); batch2.add(secondBatchBucket); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch1, batch2); + givenProviderReturnsBuckets(batch1, batch2); + givenProviderReturnsRecords(new ArrayDeque<>()); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); @@ -242,13 +184,7 @@ public class ScoresUpdaterTests extends ESTestCase { // Batch 1 - Just verify first and last were updated as Mockito // is forbiddingly slow when tring to verify all 10000 - verifyBucketWasUpdated(batch1.getFirst()); - verifyBucketRecordsWereNotUpdated(batch1.getFirst().getId()); - verifyBucketWasUpdated(batch1.getLast()); - verifyBucketRecordsWereNotUpdated(batch1.getLast().getId()); - - verifyBucketWasUpdated(secondBatchBucket); - verifyBucketRecordsWereNotUpdated(secondBatchBucket.getId()); + verifyBucketWasUpdated(10001); } public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondNormalization() throws IOException { @@ -256,109 +192,101 @@ public class ScoresUpdaterTests extends ESTestCase { bucket1.setAnomalyScore(42.0); bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0)); bucket1.setMaxNormalizedProbability(50.0); - bucket1.raiseBigNormalizedUpdateFlag(); - when(jobProvider.expandBucket(JOB_ID, false, bucket1)).thenReturn(100000); + List> records = new ArrayList<>(); + Date date = new Date(); + for (int i=0; i<100000; i++) { + records.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i))); + } Bucket bucket2 = generateBucket(new Date(10000 * 1000)); bucket2.addBucketInfluencer(createTimeBucketInfluencer(bucket2.getTimestamp(), 0.04, 42.0)); bucket2.setAnomalyScore(42.0); bucket2.setMaxNormalizedProbability(50.0); - bucket2.raiseBigNormalizedUpdateFlag(); Deque batch = new ArrayDeque<>(); batch.add(bucket1); batch.add(bucket2); - givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch); + givenProviderReturnsBuckets(batch); + + + List>> recordBatches = new ArrayList<>(); + recordBatches.add(new ArrayDeque<>(records)); + BatchedDocumentsIterator> recordIter = + new MockBatchedDocumentsIterator<>(recordBatches); + when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); verifyNormalizerWasInvoked(2); - - verifyBucketWasUpdated(bucket1); - verifyBucketRecordsWereNotUpdated(bucket1.getId()); - verifyBucketWasUpdated(bucket2); - verifyBucketRecordsWereNotUpdated(bucket2.getId()); } public void testUpdate_GivenInfluencerWithBigChange() throws IOException { Influencer influencer = new Influencer(JOB_ID, "n", "v", new Date(DEFAULT_START_TIME), 600, 1); - influencer.raiseBigNormalizedUpdateFlag(); Deque influencers = new ArrayDeque<>(); influencers.add(influencer); - givenProviderReturnsInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME, influencers); + givenProviderReturnsInfluencers(influencers); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); verifyNormalizerWasInvoked(1); - verifyInfluencerWasUpdated(influencer); + verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); + verify(jobRenormalizedResultsPersister, times(1)).executeRequest(anyString()); } public void testDefaultRenormalizationWindowBasedOnTime() throws IOException { - Bucket bucket = generateBucket(new Date(0)); + Bucket bucket = generateBucket(new Date(2509200000L)); bucket.setAnomalyScore(42.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); - givenProviderReturnsBuckets(2509200000L, 2595600000L, buckets); - givenProviderReturnsNoInfluencers(2509200000L, 2595600000L); + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(new ArrayDeque<>()); + givenProviderReturnsNoInfluencers(); scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false); verifyNormalizerWasInvoked(1); - verifyBucketWasUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); + verifyBucketWasUpdated(1); } public void testManualRenormalizationWindow() throws IOException { - - Bucket bucket = generateBucket(new Date(0)); + Bucket bucket = generateBucket(new Date(3600000)); bucket.setAnomalyScore(42.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); - givenProviderReturnsBuckets(3600000, 90000000L, buckets); - givenProviderReturnsNoInfluencers(3600000, 90000000L); + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(new ArrayDeque<>()); + givenProviderReturnsNoInfluencers(); scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false); verifyNormalizerWasInvoked(1); - verifyBucketWasUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); + verifyBucketWasUpdated(1); } public void testManualRenormalizationWindow_GivenExtension() throws IOException { - Bucket bucket = generateBucket(new Date(0)); + Bucket bucket = generateBucket(new Date(2700000)); bucket.setAnomalyScore(42.0); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.setMaxNormalizedProbability(50.0); - bucket.raiseBigNormalizedUpdateFlag(); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); - givenProviderReturnsBuckets(2700000, 90000000L, buckets); - givenProviderReturnsNoInfluencers(2700000, 90000000L); + givenProviderReturnsBuckets(buckets); + givenProviderReturnsRecords(new ArrayDeque<>()); + givenProviderReturnsNoInfluencers(); scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false); verifyNormalizerWasInvoked(1); - verifyBucketWasUpdated(bucket); - verifyBucketRecordsWereNotUpdated(bucket.getId()); - } - - private void verifyNormalizerWasInvoked(int times) throws IOException { - int bucketSpan = job.getAnalysisConfig() == null ? 0 - : job.getAnalysisConfig().getBucketSpan().intValue(); - verify(normalizer, times(times)).normalize( - eq(bucketSpan), eq(false), anyListOf(Normalizable.class), - eq(QUANTILES_STATE)); + verifyBucketWasUpdated(1); } private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) { @@ -369,80 +297,106 @@ public class ScoresUpdaterTests extends ESTestCase { return influencer; } - private void givenNormalizerFactoryReturnsMock() { - when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer); - } - private void givenProviderReturnsNoBuckets(long startTime, long endTime) { - givenBuckets(startTime, endTime, Collections.emptyList()); - } - - private void givenProviderReturnsBuckets(long startTime, long endTime, Deque batch1, Deque batch2) { - List> batches = new ArrayList<>(); - batches.add(new ArrayDeque<>(batch1)); - batches.add(new ArrayDeque<>(batch2)); - givenBuckets(startTime, endTime, batches); - } - - private void givenProviderReturnsBuckets(long startTime, long endTime, Deque buckets) { - List> batches = new ArrayList<>(); - batches.add(new ArrayDeque<>(buckets)); - givenBuckets(startTime, endTime, batches); - } - - private void givenBuckets(long startTime, long endTime, List> batches) { - BatchedDocumentsIterator iterator = new MockBatchedDocumentsIterator<>(startTime, - endTime, batches); - when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(iterator); - } - - private void givenProviderReturnsNoInfluencers(long startTime, long endTime) { - givenProviderReturnsInfluencers(startTime, endTime, new ArrayDeque<>()); - } - - private void givenProviderReturnsInfluencers(long startTime, long endTime, - Deque influencers) { - List> batches = new ArrayList<>(); - batches.add(new ArrayDeque<>(influencers)); - BatchedDocumentsIterator iterator = new MockBatchedDocumentsIterator<>( - startTime, endTime, batches); - when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); - } - - private void verifyBucketWasUpdated(Bucket bucket) { - verify(jobRenormalizedResultsPersister).updateBucket(bucket); - } - - private void verifyRecordsWereUpdated(String bucketId, List records) { - verify(jobRenormalizedResultsPersister).updateRecords(bucketId, records); - } - - private void verifyBucketWasNotUpdated(Bucket bucket) { - verify(jobRenormalizedResultsPersister, never()).updateBucket(bucket); - } - - private void verifyBucketRecordsWereNotUpdated(String bucketId) { - verify(jobRenormalizedResultsPersister, never()).updateRecords(eq(bucketId), - anyListOf(AnomalyRecord.class)); - } - - private static AnomalyRecord createRecordWithoutBigChange() { - return createRecord(false); - } - - private static AnomalyRecord createRecordWithBigChange() { - return createRecord(true); - } - - private static AnomalyRecord createRecord(boolean hadBigChange) { + private static AnomalyRecord createRecord() { AnomalyRecord anomalyRecord = mock(AnomalyRecord.class); - when(anomalyRecord.hadBigNormalizedUpdate()).thenReturn(hadBigChange); when(anomalyRecord.getId()).thenReturn("someId"); return anomalyRecord; } - private void verifyInfluencerWasUpdated(Influencer influencer) { - List list = new ArrayList<>(); - list.add(influencer); - verify(jobRenormalizedResultsPersister).updateInfluencer(eq(JOB_ID), eq(list)); + private void givenNormalizerFactoryReturnsMock() { + when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer); + } + private void givenProviderReturnsNoBuckets() { + givenBuckets(Collections.emptyList()); + } + + @SuppressWarnings("unchecked") + private void givenNormalizerRaisesBigChangeFlag() { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + List normalizables = (List) invocationOnMock.getArguments()[2]; + for (Normalizable normalizable : normalizables) { + normalizable.raiseBigChangeFlag(); + for (Normalizable child : normalizable.getChildren()) { + child.raiseBigChangeFlag(); + } + } + return null; + } + }).when(normalizer).normalize(anyInt(), anyBoolean(), anyList(), anyString()); + } + + private void givenProviderReturnsBuckets(Deque batch1, Deque batch2) { + List> batches = new ArrayList<>(); + batches.add(new ArrayDeque<>(batch1)); + batches.add(new ArrayDeque<>(batch2)); + givenBuckets(batches); + } + + private void givenProviderReturnsBuckets(Deque buckets) { + List> batches = new ArrayList<>(); + batches.add(new ArrayDeque<>(buckets)); + givenBuckets(batches); + } + + private void givenBuckets(List> batches) { + List>> batchesWithIndex = new ArrayList<>(); + for (Deque deque : batches) { + Deque> queueWithIndex = new ArrayDeque<>(); + for (Bucket bucket : deque) { + queueWithIndex.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", bucket)); + } + batchesWithIndex.add(queueWithIndex); + } + + BatchedDocumentsIterator> bucketIter = + new MockBatchedDocumentsIterator<>(batchesWithIndex); + when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); + } + + private void givenProviderReturnsRecords(Deque records) { + Deque> batch = new ArrayDeque<>(); + List>> batches = new ArrayList<>(); + for (AnomalyRecord record : records) { + batch.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", record)); + } + batches.add(batch); + + BatchedDocumentsIterator> recordIter = + new MockBatchedDocumentsIterator<>(batches); + when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); + } + + private void givenProviderReturnsNoInfluencers() { + givenProviderReturnsInfluencers(new ArrayDeque<>()); + } + + private void givenProviderReturnsInfluencers(Deque influencers) { + List>> batches = new ArrayList<>(); + Deque> queue = new ArrayDeque<>(); + for (Influencer inf : influencers) { + queue.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", inf)); + } + batches.add(queue); + BatchedDocumentsIterator> iterator = + new MockBatchedDocumentsIterator<>(batches); + when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); + } + + private void verifyNormalizerWasInvoked(int times) throws IOException { + int bucketSpan = job.getAnalysisConfig() == null ? 0 : job.getAnalysisConfig().getBucketSpan().intValue(); + verify(normalizer, times(times)).normalize( + eq(bucketSpan), eq(false), anyListOf(Normalizable.class), + eq(QUANTILES_STATE)); + } + + private void verifyNothingWasUpdated() { + verify(jobRenormalizedResultsPersister, never()).updateBucket(any()); + verify(jobRenormalizedResultsPersister, never()).updateResults(any()); + } + + private void verifyBucketWasUpdated(int bucketCount) { + verify(jobRenormalizedResultsPersister, times(bucketCount)).updateBucket(any()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index 076a5ed4a71..33b5105c612 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -251,14 +251,6 @@ public class BucketTests extends AbstractSerializingTestCase { assertEquals(bucket1.hashCode(), bucket2.hashCode()); } - public void testIsNormalizable_GivenEmptyBucketInfluencers() { - Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.setBucketInfluencers(Collections.emptyList()); - bucket.setAnomalyScore(90.0); - - assertFalse(bucket.isNormalizable()); - } - public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));