Remember the index each result came from (elastic/elasticsearch#727)

* Delete unused batched ModelSnapshot iterator

* Pass source index with normalisable results

* Refactor Normalizable

* Rework persisting renormalised results

* Spell normalize with a ‘z’

* Rename ResultIndex -> ResultWithIndex

* Expand wildcard import

* Make Normalisable child type an enum

Original commit: elastic/x-pack-elasticsearch@52450abafd
This commit is contained in:
David Kyle 2017-01-17 13:11:57 +00:00 committed by GitHub
parent 1d891965c1
commit bc04bda8d6
32 changed files with 690 additions and 633 deletions

View File

@ -171,8 +171,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
NamedXContentRegistry xContentRegistry) { NamedXContentRegistry xContentRegistry) {
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
JobProvider jobProvider = new JobProvider(client, 0); JobProvider jobProvider = new JobProvider(client, 0);
JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings, JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings, client);
jobResultsPersister);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService); JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);

View File

@ -23,7 +23,7 @@ class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIte
} }
@Override @Override
protected Bucket map(SearchHit hit) { protected ResultWithIndex<Bucket> map(SearchHit hit) {
BytesReference source = hit.getSourceRef(); BytesReference source = hit.getSourceRef();
XContentParser parser; XContentParser parser;
try { try {
@ -31,6 +31,7 @@ class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedResultsIte
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e); throw new ElasticsearchParseException("failed to parse bucket", e);
} }
return Bucket.PARSER.apply(parser, null); Bucket bucket = Bucket.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), bucket);
} }
} }

View File

@ -22,7 +22,7 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult
} }
@Override @Override
protected Influencer map(SearchHit hit) { protected ResultWithIndex<Influencer> map(SearchHit hit) {
BytesReference source = hit.getSourceRef(); BytesReference source = hit.getSourceRef();
XContentParser parser; XContentParser parser;
try { try {
@ -31,6 +31,7 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult
throw new ElasticsearchParseException("failed to parser influencer", e); throw new ElasticsearchParseException("failed to parser influencer", e);
} }
return Influencer.PARSER.apply(parser, null); Influencer influencer = Influencer.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), influencer);
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import java.io.IOException;
class ElasticsearchBatchedRecordsIterator extends ElasticsearchBatchedResultsIterator<AnomalyRecord> {
public ElasticsearchBatchedRecordsIterator(Client client, String jobId) {
super(client, jobId, AnomalyRecord.RESULT_TYPE_VALUE);
}
@Override
protected ResultWithIndex<AnomalyRecord> map(SearchHit hit) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse record", e);
}
AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), record);
}
}

View File

@ -9,7 +9,8 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
abstract class ElasticsearchBatchedResultsIterator<T> extends ElasticsearchBatchedDocumentsIterator<T> { public abstract class ElasticsearchBatchedResultsIterator<T>
extends ElasticsearchBatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<T>> {
public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType) { public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId),
@ -20,4 +21,14 @@ abstract class ElasticsearchBatchedResultsIterator<T> extends ElasticsearchBatch
protected String getType() { protected String getType() {
return Result.TYPE.getPreferredName(); return Result.TYPE.getPreferredName();
} }
public static class ResultWithIndex<T> {
public final String indexName;
public final T result;
public ResultWithIndex(String indexName, T result) {
this.indexName = indexName;
this.result = result;
}
}
} }

View File

@ -401,6 +401,7 @@ public class JobProvider {
List<PerPartitionMaxProbabilities> partitionProbs = List<PerPartitionMaxProbabilities> partitionProbs =
handlePartitionMaxNormailizedProbabilitiesResponse(item2.getResponse()); handlePartitionMaxNormailizedProbabilitiesResponse(item2.getResponse());
mergePartitionScoresIntoBucket(partitionProbs, buckets.results(), query.getPartitionValue()); mergePartitionScoresIntoBucket(partitionProbs, buckets.results(), query.getPartitionValue());
if (query.isExpand()) { if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream() Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator(); .filter(bucket -> bucket.getRecordCount() > 0).iterator();
@ -484,22 +485,29 @@ public class JobProvider {
/** /**
* Returns a {@link BatchedDocumentsIterator} that allows querying * Returns a {@link BatchedDocumentsIterator} that allows querying
* and iterating over a large number of buckets of the given job * and iterating over a large number of buckets of the given job.
* The bucket and source indexes are returned by the iterator.
* *
* @param jobId the id of the job for which buckets are requested * @param jobId the id of the job for which buckets are requested
* @return a bucket {@link BatchedDocumentsIterator} * @return a bucket {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<Bucket> newBatchedBucketsIterator(String jobId) { public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> newBatchedBucketsIterator(String jobId) {
return new ElasticsearchBatchedBucketsIterator(client, jobId); return new ElasticsearchBatchedBucketsIterator(client, jobId);
} }
/** /**
* Expand a bucket to include the associated records. * Returns a {@link BatchedDocumentsIterator} that allows querying
* and iterating over a large number of records in the given job
* The records and source indexes are returned by the iterator.
* *
* @param jobId the job id * @param jobId the id of the job for which buckets are requested
* @param includeInterim Include interim results * @return a record {@link BatchedDocumentsIterator}
* @param bucket The bucket to be expanded
*/ */
public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>
newBatchedRecordsIterator(String jobId) {
return new ElasticsearchBatchedRecordsIterator(client, jobId);
}
// TODO (norelease): Use scroll search instead of multiple searches with increasing from // TODO (norelease): Use scroll search instead of multiple searches with increasing from
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from, public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from,
Consumer<Integer> consumer, Consumer<Exception> errorHandler) { Consumer<Integer> consumer, Consumer<Exception> errorHandler) {
@ -753,7 +761,8 @@ public class JobProvider {
* @param jobId the id of the job for which influencers are requested * @param jobId the id of the job for which influencers are requested
* @return an influencer {@link BatchedDocumentsIterator} * @return an influencer {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<Influencer> newBatchedInfluencersIterator(String jobId) { public BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>>
newBatchedInfluencersIterator(String jobId) {
return new ElasticsearchBatchedInfluencersIterator(client, jobId); return new ElasticsearchBatchedInfluencersIterator(client, jobId);
} }

View File

@ -5,75 +5,97 @@
*/ */
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable;
import org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException;
import java.util.List; import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/** /**
* Interface for classes that update {@linkplain Bucket Buckets} * Interface for classes that update {@linkplain Bucket Buckets}
* for a particular job with new normalized anomaly scores and * for a particular job with new normalized anomaly scores and
* unusual scores. * unusual scores.
* * <p>
* Renormalized results must already have an ID. * Renormalized results must already have an ID.
*/ */
public class JobRenormalizedResultsPersister extends AbstractComponent { public class JobRenormalizedResultsPersister extends AbstractComponent {
private final JobResultsPersister jobResultsPersister; private final Client client;
private BulkRequest bulkRequest;
public JobRenormalizedResultsPersister(Settings settings, JobResultsPersister jobResultsPersister) { public JobRenormalizedResultsPersister(Settings settings, Client client) {
super(settings); super(settings);
this.jobResultsPersister = jobResultsPersister; this.client = client;
bulkRequest = new BulkRequest();
}
public void updateBucket(BucketNormalizable normalizable) {
updateResult(normalizable.getId(), normalizable.getOriginatingIndex(), normalizable.getBucket());
updateBucketInfluencersStandalone(normalizable.getOriginatingIndex(), normalizable.getBucket().getBucketInfluencers());
}
private void updateBucketInfluencersStandalone(String indexName, List<BucketInfluencer> bucketInfluencers) {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
updateResult(bucketInfluencer.getId(), indexName, bucketInfluencer);
}
}
}
public void updateResults(List<Normalizable> normalizables) {
for (Normalizable normalizable : normalizables) {
updateResult(normalizable.getId(), normalizable.getOriginatingIndex(), normalizable);
}
}
public void updateResult(String id, String index, ToXContent resultDoc) {
try {
XContentBuilder content = toXContentBuilder(resultDoc);
bulkRequest.add(new IndexRequest(index, Result.TYPE.getPreferredName(), id).source(content));
} catch (IOException e) {
logger.error("Error serialising result", e);
}
}
private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
} }
/** /**
* Update the bucket with the changes that may result * Execute the bulk action
* due to renormalization.
* *
* @param bucket the bucket to update * @param jobId The job Id
*/ */
public void updateBucket(Bucket bucket) { public void executeRequest(String jobId) {
jobResultsPersister.bulkPersisterBuilder(bucket.getJobId()).persistBucket(bucket).executeRequest(); if (bulkRequest.numberOfActions() == 0) {
return;
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse addRecordsResponse = client.bulk(bulkRequest).actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
}
} }
/** BulkRequest getBulkRequest() {
* Update the anomaly records for a particular job. return bulkRequest;
* The anomaly records are updated with the values in <code>records</code> and
* stored with the ID returned by {@link AnomalyRecord#getId()}
*
* @param jobId Id of the job to update
* @param records The updated records
*/
public void updateRecords(String jobId, List<AnomalyRecord> records) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records).executeRequest();
}
/**
* Create a {@link PerPartitionMaxProbabilities} object from this list of records and persist
* with the given ID.
*
* @param jobId Id of the job to update
* @param records Source of the new {@link PerPartitionMaxProbabilities} object
*/
public void updatePerPartitionMaxProbabilities(String jobId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records);
jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs).executeRequest();
}
/**
* Update the influencer for a particular job.
* The Influencer's are stored with the ID in {@link Influencer#getId()}
*
* @param jobId Id of the job to update
* @param influencers The updated influencers
*/
public void updateInfluencer(String jobId, List<Influencer> influencers) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers).executeRequest();
} }
} }

View File

@ -114,7 +114,7 @@ public class JobResultsPersister extends AbstractComponent {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer); XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer);
// Need consistent IDs to ensure overwriting on renormalisation // Need consistent IDs to ensure overwriting on renormalization
String id = bucketInfluencer.getId(); String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);

View File

@ -11,6 +11,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -88,6 +89,10 @@ class ResultsFilterBuilder {
return this; return this;
} }
ResultsFilterBuilder resultType(String resultType) {
return term(Result.RESULT_TYPE.getPreferredName(), resultType);
}
private void addQuery(QueryBuilder fb) { private void addQuery(QueryBuilder fb) {
queries.add(fb); queries.add(fb);
} }

View File

@ -8,14 +8,19 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
abstract class AbstractLeafNormalizable implements Normalizable { abstract class AbstractLeafNormalizable extends Normalizable {
public AbstractLeafNormalizable(String indexName) {
super(indexName);
}
@Override @Override
public final boolean isContainerOnly() { public final boolean isContainerOnly() {
return false; return false;
} }
@Override @Override
public final List<Integer> getChildrenTypes() { public final List<ChildType> getChildrenTypes() {
return Collections.emptyList(); return Collections.emptyList();
} }
@ -25,12 +30,12 @@ abstract class AbstractLeafNormalizable implements Normalizable {
} }
@Override @Override
public final List<Normalizable> getChildren(int type) { public final List<Normalizable> getChildren(ChildType type) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children"); throw new IllegalStateException(getClass().getSimpleName() + " has no children");
} }
@Override @Override
public final boolean setMaxChildrenScore(int childrenType, double maxScore) { public final boolean setMaxChildrenScore(ChildType childrenType, double maxScore) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children"); throw new IllegalStateException(getClass().getSimpleName() + " has no children");
} }
} }

View File

@ -5,18 +5,26 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
class BucketInfluencerNormalizable extends AbstractLeafNormalizable { class BucketInfluencerNormalizable extends AbstractLeafNormalizable {
private final BucketInfluencer bucketInfluencer; private final BucketInfluencer bucketInfluencer;
public BucketInfluencerNormalizable(BucketInfluencer influencer) { public BucketInfluencerNormalizable(BucketInfluencer influencer, String indexName) {
super(indexName);
bucketInfluencer = Objects.requireNonNull(influencer); bucketInfluencer = Objects.requireNonNull(influencer);
} }
@Override
public String getId() {
return bucketInfluencer.getId();
}
@Override @Override
public Level getLevel() { public Level getLevel() {
return BucketInfluencer.BUCKET_TIME.equals(bucketInfluencer.getInfluencerFieldName()) ? return BucketInfluencer.BUCKET_TIME.equals(bucketInfluencer.getInfluencerFieldName()) ?
@ -69,12 +77,7 @@ class BucketInfluencerNormalizable extends AbstractLeafNormalizable {
} }
@Override @Override
public void resetBigChangeFlag() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Do nothing return bucketInfluencer.toXContent(builder, params);
}
@Override
public void raiseBigChangeFlag() {
// Do nothing
} }
} }

View File

@ -5,27 +5,44 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.RECORD;
class BucketNormalizable implements Normalizable { public class BucketNormalizable extends Normalizable {
private static final int BUCKET_INFLUENCER = 0;
private static final int RECORD = 1; private static final List<ChildType> CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE);
private static final int PARTITION_SCORE = 2;
private static final List<Integer> CHILDREN_TYPES =
Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE);
private final Bucket bucket; private final Bucket bucket;
public BucketNormalizable(Bucket bucket) { private List<RecordNormalizable> records = Collections.emptyList();
public BucketNormalizable(Bucket bucket, String indexName) {
super(indexName);
this.bucket = Objects.requireNonNull(bucket); this.bucket = Objects.requireNonNull(bucket);
} }
public Bucket getBucket() {
return bucket;
}
@Override
public String getId() {
return bucket.getId();
}
@Override @Override
public boolean isContainerOnly() { public boolean isContainerOnly() {
return true; return true;
@ -76,35 +93,44 @@ class BucketNormalizable implements Normalizable {
bucket.setAnomalyScore(normalizedScore); bucket.setAnomalyScore(normalizedScore);
} }
public List<RecordNormalizable> getRecords() {
return records;
}
public void setRecords(List<RecordNormalizable> records) {
this.records = records;
}
@Override @Override
public List<Integer> getChildrenTypes() { public List<ChildType> getChildrenTypes() {
return CHILDREN_TYPES; return CHILD_TYPES;
} }
@Override @Override
public List<Normalizable> getChildren() { public List<Normalizable> getChildren() {
List<Normalizable> children = new ArrayList<>(); List<Normalizable> children = new ArrayList<>();
for (Integer type : getChildrenTypes()) { for (ChildType type : getChildrenTypes()) {
children.addAll(getChildren(type)); children.addAll(getChildren(type));
} }
return children; return children;
} }
@Override @Override
public List<Normalizable> getChildren(int type) { public List<Normalizable> getChildren(ChildType type) {
List<Normalizable> children = new ArrayList<>(); List<Normalizable> children = new ArrayList<>();
switch (type) { switch (type) {
case BUCKET_INFLUENCER: case BUCKET_INFLUENCER:
bucket.getBucketInfluencers().stream().forEach( children.addAll(bucket.getBucketInfluencers().stream()
influencer -> children.add(new BucketInfluencerNormalizable(influencer))); .map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex()))
.collect(Collectors.toList()));
break; break;
case RECORD: case RECORD:
bucket.getRecords().stream().forEach( children.addAll(records);
record -> children.add(new RecordNormalizable(record)));
break; break;
case PARTITION_SCORE: case PARTITION_SCORE:
bucket.getPartitionScores().stream().forEach( children.addAll(bucket.getPartitionScores().stream()
partitionScore -> children.add(new PartitionScoreNormalizable(partitionScore))); .map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex()))
.collect(Collectors.toList()));
break; break;
default: default:
throw new IllegalArgumentException("Invalid type: " + type); throw new IllegalArgumentException("Invalid type: " + type);
@ -113,7 +139,7 @@ class BucketNormalizable implements Normalizable {
} }
@Override @Override
public boolean setMaxChildrenScore(int childrenType, double maxScore) { public boolean setMaxChildrenScore(ChildType childrenType, double maxScore) {
double oldScore = 0.0; double oldScore = 0.0;
switch (childrenType) { switch (childrenType) {
case BUCKET_INFLUENCER: case BUCKET_INFLUENCER:
@ -138,12 +164,7 @@ class BucketNormalizable implements Normalizable {
} }
@Override @Override
public void resetBigChangeFlag() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
bucket.resetBigNormalizedUpdateFlag(); return bucket.toXContent(builder, params);
}
@Override
public void raiseBigChangeFlag() {
bucket.raiseBigNormalizedUpdateFlag();
} }
} }

View File

@ -5,17 +5,25 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
class InfluencerNormalizable extends AbstractLeafNormalizable { class InfluencerNormalizable extends AbstractLeafNormalizable {
private final Influencer influencer; private final Influencer influencer;
public InfluencerNormalizable(Influencer influencer) { public InfluencerNormalizable(Influencer influencer, String indexName) {
super(indexName);
this.influencer = Objects.requireNonNull(influencer); this.influencer = Objects.requireNonNull(influencer);
} }
@Override
public String getId() {
return influencer.getId();
}
@Override @Override
public Level getLevel() { public Level getLevel() {
return Level.INFLUENCER; return Level.INFLUENCER;
@ -67,12 +75,7 @@ class InfluencerNormalizable extends AbstractLeafNormalizable {
} }
@Override @Override
public void resetBigChangeFlag() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
influencer.resetBigNormalizedUpdateFlag(); return influencer.toXContent(builder, params);
}
@Override
public void raiseBigChangeFlag() {
influencer.raiseBigNormalizedUpdateFlag();
} }
} }

View File

@ -5,9 +5,27 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import java.util.List; import org.elasticsearch.common.xcontent.ToXContent;
import java.util.List;
import java.util.Objects;
public abstract class Normalizable implements ToXContent {
public enum ChildType {BUCKET_INFLUENCER, RECORD, PARTITION_SCORE};
private final String indexName;
private boolean hadBigNormalizedUpdate;
public Normalizable(String indexName) {
this.indexName = Objects.requireNonNull(indexName);
}
/**
* The document ID of the underlying result.
* @return The document Id string
*/
public abstract String getId();
interface Normalizable {
/** /**
* A {@code Normalizable} may be the owner of scores or just a * A {@code Normalizable} may be the owner of scores or just a
* container of other {@code Normalizable} objects. A container only * container of other {@code Normalizable} objects. A container only
@ -16,40 +34,40 @@ interface Normalizable {
* *
* @return true if this {@code Normalizable} is only a container * @return true if this {@code Normalizable} is only a container
*/ */
boolean isContainerOnly(); abstract boolean isContainerOnly();
Level getLevel(); abstract Level getLevel();
String getPartitionFieldName(); abstract String getPartitionFieldName();
String getPartitionFieldValue(); abstract String getPartitionFieldValue();
String getPersonFieldName(); abstract String getPersonFieldName();
String getFunctionName(); abstract String getFunctionName();
String getValueFieldName(); abstract String getValueFieldName();
double getProbability(); abstract double getProbability();
double getNormalizedScore(); abstract double getNormalizedScore();
void setNormalizedScore(double normalizedScore); abstract void setNormalizedScore(double normalizedScore);
List<Integer> getChildrenTypes(); abstract List<ChildType> getChildrenTypes();
List<Normalizable> getChildren(); abstract List<Normalizable> getChildren();
List<Normalizable> getChildren(int type); abstract List<Normalizable> getChildren(ChildType type);
/** /**
* Set the aggregate normalized score for a type of children * Set the aggregate normalized score for a type of children
* *
* @param childrenType the integer that corresponds to a children type * @param type the child type
* @param maxScore the aggregate normalized score of the children * @param maxScore the aggregate normalized score of the children
* @return true if the score has changed or false otherwise * @return true if the score has changed or false otherwise
*/ */
boolean setMaxChildrenScore(int childrenType, double maxScore); abstract boolean setMaxChildrenScore(ChildType type, double maxScore);
/** /**
* If this {@code Normalizable} holds the score of its parent, * If this {@code Normalizable} holds the score of its parent,
@ -57,9 +75,21 @@ interface Normalizable {
* *
* @param parentScore the score of the parent {@code Normalizable} * @param parentScore the score of the parent {@code Normalizable}
*/ */
void setParentScore(double parentScore); abstract void setParentScore(double parentScore);
void resetBigChangeFlag(); public boolean hadBigNormalizedUpdate() {
return hadBigNormalizedUpdate;
}
void raiseBigChangeFlag(); public void resetBigChangeFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigChangeFlag() {
hadBigNormalizedUpdate = true;
}
public String getOriginatingIndex() {
return indexName;
}
} }

View File

@ -170,7 +170,7 @@ public class Normalizer {
} }
} }
for (Integer childrenType : result.getChildrenTypes()) { for (Normalizable.ChildType childrenType : result.getChildrenTypes()) {
List<Normalizable> children = result.getChildren(childrenType); List<Normalizable> children = result.getChildren(childrenType);
if (!children.isEmpty()) { if (!children.isEmpty()) {
double maxChildrenScore = 0.0; double maxChildrenScore = 0.0;

View File

@ -17,7 +17,7 @@ import java.io.IOException;
import java.util.Objects; import java.util.Objects;
/** /**
* Parse the output of the normaliser process, for example: * Parse the output of the normalizer process, for example:
* *
* {"probability":0.01,"normalized_score":2.2} * {"probability":0.01,"normalized_score":2.2}
*/ */

View File

@ -5,18 +5,26 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.results.PartitionScore; import org.elasticsearch.xpack.ml.job.results.PartitionScore;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
public class PartitionScoreNormalizable extends AbstractLeafNormalizable { public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
private final PartitionScore score; private final PartitionScore score;
public PartitionScoreNormalizable(PartitionScore score) { public PartitionScoreNormalizable(PartitionScore score, String indexName) {
super(indexName);
this.score = Objects.requireNonNull(score); this.score = Objects.requireNonNull(score);
} }
@Override
public String getId() {
throw new IllegalStateException("PartitionScore has no ID as is should not be persisted outside of the owning bucket");
}
@Override @Override
public Level getLevel() { public Level getLevel() {
return Level.PARTITION; return Level.PARTITION;
@ -68,12 +76,7 @@ public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
} }
@Override @Override
public void resetBigChangeFlag() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
score.resetBigNormalizedUpdateFlag(); return score.toXContent(builder, params);
}
@Override
public void raiseBigChangeFlag() {
score.raiseBigNormalizedUpdateFlag();
} }
} }

View File

@ -5,18 +5,26 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
class RecordNormalizable extends AbstractLeafNormalizable { class RecordNormalizable extends AbstractLeafNormalizable {
private final AnomalyRecord record; private final AnomalyRecord record;
public RecordNormalizable(AnomalyRecord record) { public RecordNormalizable(AnomalyRecord record, String indexName) {
super(indexName);
this.record = Objects.requireNonNull(record); this.record = Objects.requireNonNull(record);
} }
@Override
public String getId() {
return record.getId();
}
@Override @Override
public Level getLevel() { public Level getLevel() {
return Level.LEAF; return Level.LEAF;
@ -69,12 +77,11 @@ class RecordNormalizable extends AbstractLeafNormalizable {
} }
@Override @Override
public void resetBigChangeFlag() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
record.resetBigNormalizedUpdateFlag(); return record.toXContent(builder, params);
} }
@Override public AnomalyRecord getRecord() {
public void raiseBigChangeFlag() { return record;
record.raiseBigNormalizedUpdateFlag();
} }
} }

View File

@ -10,16 +10,19 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.AnalysisConfig; import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -89,45 +92,49 @@ public class ScoresUpdater {
public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) { public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) {
Normalizer normalizer = normalizerFactory.create(job.getId()); Normalizer normalizer = normalizerFactory.create(job.getId());
int[] counts = {0, 0}; int[] counts = {0, 0};
updateBuckets(normalizer, quantilesState, endBucketEpochMs, updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts,
windowExtensionMs, counts, perPartitionNormalization); perPartitionNormalization);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs, updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts);
windowExtensionMs, counts);
LOGGER.info("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]); LOGGER.info("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]);
} }
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<Bucket> bucketsIterator = BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> bucketsIterator =
jobProvider.newBatchedBucketsIterator(job.getId()) jobProvider.newBatchedBucketsIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
// Make a list of buckets with their records to be renormalized. // Make a list of buckets to be renormalized.
// This may be shorter than the original list of buckets for two // This may be shorter than the original list of buckets for two
// reasons: // reasons:
// 1) We don't bother with buckets that have raw score 0 and no // 1) We don't bother with buckets that have raw score 0 and no
// records // records
// 2) We limit the total number of records to be not much more // 2) We limit the total number of records to be not much more
// than 100000 // than 100000
List<Bucket> bucketsToRenormalize = new ArrayList<>(); List<BucketNormalizable> bucketsToRenormalize = new ArrayList<>();
int batchRecordCount = 0; int batchRecordCount = 0;
int skipped = 0; int skipped = 0;
while (bucketsIterator.hasNext()) { while (bucketsIterator.hasNext()) {
// Get a batch of buckets without their records to calculate // Get a batch of buckets without their records to calculate
// how many buckets can be sensibly retrieved // how many buckets can be sensibly retrieved
Deque<Bucket> buckets = bucketsIterator.next(); Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> buckets = bucketsIterator.next();
if (buckets.isEmpty()) { if (buckets.isEmpty()) {
LOGGER.debug("[{}] No buckets to renormalize for job", job.getId());
break; break;
} }
while (!buckets.isEmpty()) { while (!buckets.isEmpty()) {
Bucket currentBucket = buckets.removeFirst(); ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket> current = buckets.removeFirst();
Bucket currentBucket = current.result;
if (currentBucket.isNormalizable()) { if (currentBucket.isNormalizable()) {
bucketsToRenormalize.add(currentBucket); BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName);
batchRecordCount += jobProvider.expandBucket(job.getId(), false, currentBucket); List<RecordNormalizable> recordNormalizables =
bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime());
batchRecordCount += recordNormalizables.size();
bucketNormalizable.setRecords(recordNormalizables);
bucketsToRenormalize.add(bucketNormalizable);
} else { } else {
++skipped; ++skipped;
} }
@ -143,78 +150,73 @@ public class ScoresUpdater {
} }
} }
if (!bucketsToRenormalize.isEmpty()) { if (!bucketsToRenormalize.isEmpty()) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, batchRecordCount, skipped, counts,
batchRecordCount, skipped, counts, perPartitionNormalization); perPartitionNormalization);
} }
} }
private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) {
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>
recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId())
.timeRange(bucketTimeStamp, bucketTimeStamp + 1);
List<RecordNormalizable> recordNormalizables = new ArrayList<>();
while (recordsIterator.hasNext()) {
for (ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord> record : recordsIterator.next() ) {
recordNormalizables.add(new RecordNormalizable(record.result, record.indexName));
}
}
return recordNormalizables;
}
private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) { private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) {
return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs); return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs);
} }
private void normalizeBuckets(Normalizer normalizer, List<Bucket> buckets, String quantilesState, private void normalizeBuckets(Normalizer normalizer, List<BucketNormalizable> normalizableBuckets,
int recordCount, int skipped, int[] counts, boolean perPartitionNormalization) { String quantilesState, int recordCount, int skipped, int[] counts,
boolean perPartitionNormalization) {
LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)", LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)",
job.getId(), buckets.size(), recordCount, skipped); job.getId(), normalizableBuckets.size(), recordCount, skipped);
List<Normalizable> asNormalizables = buckets.stream() List<Normalizable> asNormalizables = normalizableBuckets.stream()
.map(bucket -> new BucketNormalizable(bucket)).collect(Collectors.toList()); .map(Function.identity()).collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
for (Bucket bucket : buckets) { for (BucketNormalizable bn : normalizableBuckets) {
updateSingleBucket(bucket, counts, perPartitionNormalization); updateSingleBucket(counts, bn);
} }
updatesPersister.executeRequest(job.getId());
} }
/** private void updateSingleBucket(int[] counts, BucketNormalizable bucketNormalizable) {
* Update the anomaly score and unsual score fields on the bucket provided if (bucketNormalizable.hadBigNormalizedUpdate()) {
* and all contained records
*
* @param counts Element 0 will be incremented if we update a document and
* element 1 if we don't
*/
private void updateSingleBucket(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
updateBucketIfItHasBigChange(bucket, counts, perPartitionNormalization);
updateRecordsThatHaveBigChange(bucket, counts);
}
private void updateBucketIfItHasBigChange(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
if (bucket.hadBigNormalizedUpdate()) {
if (perPartitionNormalization) { if (perPartitionNormalization) {
updatesPersister.updatePerPartitionMaxProbabilities(job.getId(), bucket.getRecords()); List<AnomalyRecord> anomalyRecords = bucketNormalizable.getRecords().stream()
.map(RecordNormalizable::getRecord).collect(Collectors.toList());
PerPartitionMaxProbabilities ppProbs = new PerPartitionMaxProbabilities(anomalyRecords);
updatesPersister.updateResult(ppProbs.getId(), bucketNormalizable.getOriginatingIndex(), ppProbs);
} }
updatesPersister.updateBucket(bucket); updatesPersister.updateBucket(bucketNormalizable);
++counts[0]; ++counts[0];
} else { } else {
++counts[1]; ++counts[1];
} }
}
private void updateRecordsThatHaveBigChange(Bucket bucket, int[] counts) { persistChanged(counts, bucketNormalizable.getRecords());
List<AnomalyRecord> toUpdate = new ArrayList<>();
for (AnomalyRecord record : bucket.getRecords()) {
if (record.hadBigNormalizedUpdate()) {
toUpdate.add(record);
++counts[0];
} else {
++counts[1];
}
}
if (!toUpdate.isEmpty()) {
updatesPersister.updateRecords(job.getId(), toUpdate);
}
} }
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts) { long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Influencer> influencersIterator = jobProvider BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> influencersIterator =
.newBatchedInfluencersIterator(job.getId()) jobProvider.newBatchedInfluencersIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs); .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
while (influencersIterator.hasNext()) { while (influencersIterator.hasNext()) {
Deque<Influencer> influencers = influencersIterator.next(); Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> influencers = influencersIterator.next();
if (influencers.isEmpty()) { if (influencers.isEmpty()) {
LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
break; break;
@ -222,21 +224,24 @@ public class ScoresUpdater {
LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size()); LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size());
List<Normalizable> asNormalizables = influencers.stream() List<Normalizable> asNormalizables = influencers.stream()
.map(bucket -> new InfluencerNormalizable(bucket)).collect(Collectors.toList()); .map(influencerResultIndex ->
new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.indexName))
.collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
List<Influencer> toUpdate = new ArrayList<>(); persistChanged(counts, asNormalizables);
for (Influencer influencer : influencers) { }
if (influencer.hadBigNormalizedUpdate()) {
toUpdate.add(influencer); updatesPersister.executeRequest(job.getId());
++counts[0]; }
} else {
++counts[1]; private void persistChanged(int[] counts, List<? extends Normalizable> asNormalizables) {
} List<Normalizable> toUpdate = asNormalizables.stream().filter(n -> n.hadBigNormalizedUpdate()).collect(Collectors.toList());
}
if (!toUpdate.isEmpty()) { counts[0] += toUpdate.size();
updatesPersister.updateInfluencer(job.getId(), toUpdate); counts[1] += asNormalizables.size() - toUpdate.size();
} if (!toUpdate.isEmpty()) {
updatesPersister.updateResults(toUpdate);
} }
} }
} }

View File

@ -146,8 +146,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
private List<Influence> influencers; private List<Influence> influencers;
private boolean hadBigNormalizedUpdate;
public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId; this.jobId = jobId;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
@ -487,9 +485,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
@Override @Override
public int hashCode() { public int hashCode() {
// hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore, return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore,
normalizedProbability, initialNormalizedProbability, typical, actual, normalizedProbability, initialNormalizedProbability, typical, actual,
function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue, function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue,
@ -510,7 +505,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
AnomalyRecord that = (AnomalyRecord) other; AnomalyRecord that = (AnomalyRecord) other;
// hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(this.jobId, that.jobId) return Objects.equals(this.jobId, that.jobId)
&& this.detectorIndex == that.detectorIndex && this.detectorIndex == that.detectorIndex
&& this.sequenceNum == that.sequenceNum && this.sequenceNum == that.sequenceNum
@ -536,16 +530,4 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.causes, that.causes) && Objects.equals(this.causes, that.causes)
&& Objects.equals(this.influencers, that.influencers); && Objects.equals(this.influencers, that.influencers);
} }
public boolean hadBigNormalizedUpdate() {
return this.hadBigNormalizedUpdate;
}
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
} }

View File

@ -95,7 +95,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private List<AnomalyRecord> records = new ArrayList<>(); private List<AnomalyRecord> records = new ArrayList<>();
private long eventCount; private long eventCount;
private boolean isInterim; private boolean isInterim;
private boolean hadBigNormalizedUpdate;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs; private long processingTimeMs;
private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap(); private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap();
@ -118,7 +117,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
this.records = new ArrayList<>(other.records); this.records = new ArrayList<>(other.records);
this.eventCount = other.eventCount; this.eventCount = other.eventCount;
this.isInterim = other.isInterim; this.isInterim = other.isInterim;
this.hadBigNormalizedUpdate = other.hadBigNormalizedUpdate;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs; this.processingTimeMs = other.processingTimeMs;
this.perPartitionMaxProbability = other.perPartitionMaxProbability; this.perPartitionMaxProbability = other.perPartitionMaxProbability;
@ -330,7 +328,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
@Override @Override
public int hashCode() { public int hashCode() {
// hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records, return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records,
isInterim, bucketSpan, bucketInfluencers); isInterim, bucketSpan, bucketInfluencers);
} }
@ -350,7 +347,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
Bucket that = (Bucket) other; Bucket that = (Bucket) other;
// hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp) return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp)
&& (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan) && (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan)
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
@ -359,38 +355,15 @@ public class Bucket extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers); && Objects.equals(this.bucketInfluencers, that.bucketInfluencers);
} }
public boolean hadBigNormalizedUpdate() {
return hadBigNormalizedUpdate;
}
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
/** /**
* This method encapsulated the logic for whether a bucket should be * This method encapsulated the logic for whether a bucket should be
* normalized. The decision depends on two factors. * normalized. Buckets that have no records and a score of
*
* The first is whether the bucket has bucket influencers. Since bucket
* influencers were introduced, every bucket must have at least one bucket
* influencer. If it does not, it means it is a bucket persisted with an
* older version and should not be normalized.
*
* The second factor has to do with minimising the number of buckets that
* are sent for normalization. Buckets that have no records and a score of
* zero should not be normalized as their score will not change and they * zero should not be normalized as their score will not change and they
* will just add overhead. * will just add overhead.
* *
* @return true if the bucket should be normalized or false otherwise * @return true if the bucket should be normalized or false otherwise
*/ */
public boolean isNormalizable() { public boolean isNormalizable() {
if (bucketInfluencers.isEmpty()) {
return false;
}
return anomalyScore > 0.0 || recordCount > 0; return anomalyScore > 0.0 || recordCount > 0;
} }
} }

View File

@ -78,7 +78,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
private double probability; private double probability;
private double initialAnomalyScore; private double initialAnomalyScore;
private double anomalyScore; private double anomalyScore;
private boolean hadBigNormalizedUpdate;
private boolean isInterim; private boolean isInterim;
public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) { public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) {
@ -187,23 +186,8 @@ public class Influencer extends ToXContentToBytes implements Writeable {
isInterim = value; isInterim = value;
} }
public boolean hadBigNormalizedUpdate() {
return this.hadBigNormalizedUpdate;
}
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
@Override @Override
public int hashCode() { public int hashCode() {
// hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim, return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim,
bucketSpan, sequenceNum); bucketSpan, sequenceNum);
} }
@ -223,8 +207,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
} }
Influencer other = (Influencer) obj; Influencer other = (Influencer) obj;
// hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp) return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp)
&& Objects.equals(influenceField, other.influenceField) && Objects.equals(influenceField, other.influenceField)
&& Objects.equals(influenceValue, other.influenceValue) && Objects.equals(influenceValue, other.influenceValue)

View File

@ -24,7 +24,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
private final double initialAnomalyScore; private final double initialAnomalyScore;
private double anomalyScore; private double anomalyScore;
private double probability; private double probability;
private boolean hadBigNormalizedUpdate;
public static final ConstructingObjectParser<PartitionScore, Void> PARSER = new ConstructingObjectParser<>( public static final ConstructingObjectParser<PartitionScore, Void> PARSER = new ConstructingObjectParser<>(
PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3], PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3],
@ -39,7 +38,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
} }
public PartitionScore(String fieldName, String fieldValue, double initialAnomalyScore, double anomalyScore, double probability) { public PartitionScore(String fieldName, String fieldValue, double initialAnomalyScore, double anomalyScore, double probability) {
hadBigNormalizedUpdate = false;
partitionFieldName = fieldName; partitionFieldName = fieldName;
partitionFieldValue = fieldValue; partitionFieldValue = fieldValue;
this.initialAnomalyScore = initialAnomalyScore; this.initialAnomalyScore = initialAnomalyScore;
@ -121,22 +119,9 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
PartitionScore that = (PartitionScore) other; PartitionScore that = (PartitionScore) other;
// hadBigNormalizedUpdate is deliberately excluded from the test // id is excluded from the test as it is generated by the datastore
// as is id, which is generated by the datastore
return Objects.equals(this.partitionFieldValue, that.partitionFieldValue) return Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability) && Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability)
&& (this.initialAnomalyScore == that.initialAnomalyScore) && (this.anomalyScore == that.anomalyScore); && (this.initialAnomalyScore == that.initialAnomalyScore) && (this.anomalyScore == that.anomalyScore);
} }
public boolean hadBigNormalizedUpdate() {
return hadBigNormalizedUpdate;
}
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
} }

View File

@ -638,6 +638,7 @@ public class JobProviderTests extends ESTestCase {
QueryPage<AnomalyRecord> recordPage = holder[0]; QueryPage<AnomalyRecord> recordPage = holder[0];
assertEquals(2L, recordPage.count()); assertEquals(2L, recordPage.count());
List<AnomalyRecord> records = recordPage.results(); List<AnomalyRecord> records = recordPage.results();
assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001); assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001);
assertEquals(33.3, records.get(0).getActual().get(0), 0.000001); assertEquals(33.3, records.get(0).getActual().get(0), 0.000001);
assertEquals("irritable", records.get(0).getFunction()); assertEquals("irritable", records.get(0).getFunction());
@ -700,9 +701,9 @@ public class JobProviderTests extends ESTestCase {
Integer[] holder = new Integer[1]; Integer[] holder = new Integer[1];
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new); provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new);
int records = holder[0]; int records = holder[0];
// This is not realistic, but is an artifact of the fact that the mock // This is not realistic, but is an artifact of the fact that the mock
// query // query returns all the records, not a subset
// returns all the records, not a subset
assertEquals(1200L, records); assertEquals(1200L, records);
} }

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import java.util.Date;
public class JobRenormalizedResultsPersisterTests extends ESTestCase {
public void testUpdateBucket() {
Date now = new Date();
Bucket bucket = new Bucket("foo", now, 1);
int sequenceNum = 0;
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
BucketNormalizable bn = new BucketNormalizable(bucket, "foo-index");
JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister();
persister.updateBucket(bn);
assertEquals(3, persister.getBulkRequest().numberOfActions());
assertEquals("foo-index", persister.getBulkRequest().requests().get(0).index());
}
private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() {
Client client = new MockClientBuilder("cluster").build();
return new JobRenormalizedResultsPersister(Settings.EMPTY, client);
}
}

View File

@ -12,34 +12,20 @@ import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class MockBatchedDocumentsIterator<T> implements BatchedDocumentsIterator<T> { public class MockBatchedDocumentsIterator<T> implements BatchedDocumentsIterator<T> {
private final Long startEpochMs;
private final Long endEpochMs;
private final List<Deque<T>> batches; private final List<Deque<T>> batches;
private int index; private int index;
private boolean wasTimeRangeCalled; private boolean wasTimeRangeCalled;
private String interimFieldName; private String interimFieldName;
public MockBatchedDocumentsIterator(long startEpochMs, long endEpochMs, List<Deque<T>> batches) {
this((Long) startEpochMs, (Long) endEpochMs, batches);
}
public MockBatchedDocumentsIterator(List<Deque<T>> batches) { public MockBatchedDocumentsIterator(List<Deque<T>> batches) {
this(null, null, batches);
}
private MockBatchedDocumentsIterator(Long startEpochMs, Long endEpochMs, List<Deque<T>> batches) {
this.batches = batches; this.batches = batches;
index = 0; index = 0;
wasTimeRangeCalled = false; wasTimeRangeCalled = false;
interimFieldName = ""; interimFieldName = "";
this.startEpochMs = startEpochMs;
this.endEpochMs = endEpochMs;
} }
@Override @Override
public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) { public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
assertEquals(this.startEpochMs.longValue(), startEpochMs);
assertEquals(this.endEpochMs.longValue(), endEpochMs);
wasTimeRangeCalled = true; wasTimeRangeCalled = true;
return this; return this;
} }
@ -52,7 +38,7 @@ public class MockBatchedDocumentsIterator<T> implements BatchedDocumentsIterator
@Override @Override
public Deque<T> next() { public Deque<T> next() {
if ((startEpochMs != null && !wasTimeRangeCalled) || !hasNext()) { if ((!wasTimeRangeCalled) || !hasNext()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
return batches.get(index++); return batches.get(index++);

View File

@ -14,6 +14,7 @@ import java.util.Date;
public class BucketInfluencerNormalizableTests extends ESTestCase { public class BucketInfluencerNormalizableTests extends ESTestCase {
private static final double EPSILON = 0.0001; private static final double EPSILON = 0.0001;
private static final String INDEX_NAME = "foo-index";
private BucketInfluencer bucketInfluencer; private BucketInfluencer bucketInfluencer;
@Before @Before
@ -27,43 +28,43 @@ public class BucketInfluencerNormalizableTests extends ESTestCase {
} }
public void testIsContainerOnly() { public void testIsContainerOnly() {
assertFalse(new BucketInfluencerNormalizable(bucketInfluencer).isContainerOnly()); assertFalse(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).isContainerOnly());
} }
public void testGetLevel() { public void testGetLevel() {
assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer).getLevel()); assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getLevel());
BucketInfluencer timeInfluencer = new BucketInfluencer("foo", new Date(), 600, 1); BucketInfluencer timeInfluencer = new BucketInfluencer("foo", new Date(), 600, 1);
timeInfluencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME); timeInfluencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer).getLevel()); assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer, INDEX_NAME).getLevel());
} }
public void testGetPartitionFieldName() { public void testGetPartitionFieldName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getPartitionFieldName()); assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getPartitionFieldName());
} }
public void testGetPersonFieldName() { public void testGetPersonFieldName() {
assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer).getPersonFieldName()); assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getPersonFieldName());
} }
public void testGetFunctionName() { public void testGetFunctionName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getFunctionName()); assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getFunctionName());
} }
public void testGetValueFieldName() { public void testGetValueFieldName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getValueFieldName()); assertNull(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getValueFieldName());
} }
public void testGetProbability() { public void testGetProbability() {
assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer).getProbability(), EPSILON); assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getProbability(), EPSILON);
} }
public void testGetNormalizedScore() { public void testGetNormalizedScore() {
assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer).getNormalizedScore(), EPSILON); assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getNormalizedScore(), EPSILON);
} }
public void testSetNormalizedScore() { public void testSetNormalizedScore() {
BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer); BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME);
normalizable.setNormalizedScore(99.0); normalizable.setNormalizedScore(99.0);
@ -72,23 +73,26 @@ public class BucketInfluencerNormalizableTests extends ESTestCase {
} }
public void testGetChildrenTypes() { public void testGetChildrenTypes() {
assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildrenTypes().isEmpty()); assertTrue(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getChildrenTypes().isEmpty());
} }
public void testGetChildren_ByType() { public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).getChildren(0)); expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME)
.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER));
} }
public void testGetChildren() { public void testGetChildren() {
assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildren().isEmpty()); assertTrue(new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).getChildren().isEmpty());
} }
public void testSetMaxChildrenScore() { public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).setMaxChildrenScore(0, 42.0)); expectThrows(IllegalStateException.class,
() -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME)
.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0));
} }
public void testSetParentScore() { public void testSetParentScore() {
new BucketInfluencerNormalizable(bucketInfluencer).setParentScore(42.0); new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME).setParentScore(42.0);
assertEquals("airline", bucketInfluencer.getInfluencerFieldName()); assertEquals("airline", bucketInfluencer.getInfluencerFieldName());
assertEquals(1.0, bucketInfluencer.getAnomalyScore(), EPSILON); assertEquals(1.0, bucketInfluencer.getAnomalyScore(), EPSILON);
@ -98,10 +102,14 @@ public class BucketInfluencerNormalizableTests extends ESTestCase {
} }
public void testResetBigChangeFlag() { public void testResetBigChangeFlag() {
new BucketInfluencerNormalizable(bucketInfluencer).resetBigChangeFlag(); BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME);
normalizable.resetBigChangeFlag();
assertFalse(normalizable.hadBigNormalizedUpdate());
} }
public void testRaiseBigChangeFlag() { public void testRaiseBigChangeFlag() {
new BucketInfluencerNormalizable(bucketInfluencer).raiseBigChangeFlag(); BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME);
normalizable.raiseBigChangeFlag();
assertTrue(normalizable.hadBigNormalizedUpdate());
} }
} }

View File

@ -9,6 +9,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
@ -19,6 +20,7 @@ import org.junit.Before;
public class BucketNormalizableTests extends ESTestCase { public class BucketNormalizableTests extends ESTestCase {
private static final String INDEX_NAME = "foo-index";
private static final double EPSILON = 0.0001; private static final double EPSILON = 0.0001;
private Bucket bucket; private Bucket bucket;
@ -54,43 +56,43 @@ public class BucketNormalizableTests extends ESTestCase {
} }
public void testIsContainerOnly() { public void testIsContainerOnly() {
assertTrue(new BucketNormalizable(bucket).isContainerOnly()); assertTrue(new BucketNormalizable(bucket, INDEX_NAME).isContainerOnly());
} }
public void testGetLevel() { public void testGetLevel() {
assertEquals(Level.ROOT, new BucketNormalizable(bucket).getLevel()); assertEquals(Level.ROOT, new BucketNormalizable(bucket, INDEX_NAME).getLevel());
} }
public void testGetPartitionFieldName() { public void testGetPartitionFieldName() {
assertNull(new BucketNormalizable(bucket).getPartitionFieldName()); assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPartitionFieldName());
} }
public void testGetPartitionFieldValue() { public void testGetPartitionFieldValue() {
assertNull(new BucketNormalizable(bucket).getPartitionFieldValue()); assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPartitionFieldValue());
} }
public void testGetPersonFieldName() { public void testGetPersonFieldName() {
assertNull(new BucketNormalizable(bucket).getPersonFieldName()); assertNull(new BucketNormalizable(bucket, INDEX_NAME).getPersonFieldName());
} }
public void testGetFunctionName() { public void testGetFunctionName() {
assertNull(new BucketNormalizable(bucket).getFunctionName()); assertNull(new BucketNormalizable(bucket, INDEX_NAME).getFunctionName());
} }
public void testGetValueFieldName() { public void testGetValueFieldName() {
assertNull(new BucketNormalizable(bucket).getValueFieldName()); assertNull(new BucketNormalizable(bucket, INDEX_NAME).getValueFieldName());
} }
public void testGetProbability() { public void testGetProbability() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).getProbability()); expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability());
} }
public void testGetNormalizedScore() { public void testGetNormalizedScore() {
assertEquals(88.0, new BucketNormalizable(bucket).getNormalizedScore(), EPSILON); assertEquals(88.0, new BucketNormalizable(bucket, INDEX_NAME).getNormalizedScore(), EPSILON);
} }
public void testSetNormalizedScore() { public void testSetNormalizedScore() {
BucketNormalizable normalizable = new BucketNormalizable(bucket); BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME);
normalizable.setNormalizedScore(99.0); normalizable.setNormalizedScore(99.0);
@ -99,8 +101,11 @@ public class BucketNormalizableTests extends ESTestCase {
} }
public void testGetChildren() { public void testGetChildren() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren(); BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME))
.collect(Collectors.toList()));
List<Normalizable> children = bn.getChildren();
assertEquals(6, children.size()); assertEquals(6, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON); assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
@ -117,7 +122,8 @@ public class BucketNormalizableTests extends ESTestCase {
} }
public void testGetChildren_GivenTypeBucketInfluencer() { public void testGetChildren_GivenTypeBucketInfluencer() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren(0); BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
List<Normalizable> children = bn.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER);
assertEquals(2, children.size()); assertEquals(2, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
@ -127,7 +133,10 @@ public class BucketNormalizableTests extends ESTestCase {
} }
public void testGetChildren_GivenTypeRecord() { public void testGetChildren_GivenTypeRecord() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren(1); BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME))
.collect(Collectors.toList()));
List<Normalizable> children = bn.getChildren(Normalizable.ChildType.RECORD);
assertEquals(2, children.size()); assertEquals(2, children.size());
assertTrue(children.get(0) instanceof RecordNormalizable); assertTrue(children.get(0) instanceof RecordNormalizable);
@ -136,53 +145,45 @@ public class BucketNormalizableTests extends ESTestCase {
assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON); assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON);
} }
public void testGetChildren_GivenInvalidType() {
expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).getChildren(3));
}
public void testSetMaxChildrenScore_GivenDifferentScores() { public void testSetMaxChildrenScore_GivenDifferentScores() {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket); BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertTrue(bucketNormalizable.setMaxChildrenScore(0, 95.0)); assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0));
assertTrue(bucketNormalizable.setMaxChildrenScore(1, 42.0)); assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 42.0));
assertEquals(95.0, bucket.getAnomalyScore(), EPSILON); assertEquals(95.0, bucket.getAnomalyScore(), EPSILON);
assertEquals(42.0, bucket.getMaxNormalizedProbability(), EPSILON); assertEquals(42.0, bucket.getMaxNormalizedProbability(), EPSILON);
} }
public void testSetMaxChildrenScore_GivenSameScores() { public void testSetMaxChildrenScore_GivenSameScores() {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket); BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertFalse(bucketNormalizable.setMaxChildrenScore(0, 88.0)); assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(1, 2.0)); assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 2.0));
assertEquals(88.0, bucket.getAnomalyScore(), EPSILON); assertEquals(88.0, bucket.getAnomalyScore(), EPSILON);
assertEquals(2.0, bucket.getMaxNormalizedProbability(), EPSILON); assertEquals(2.0, bucket.getMaxNormalizedProbability(), EPSILON);
} }
public void testSetMaxChildrenScore_GivenInvalidType() {
expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).setMaxChildrenScore(3, 95.0));
}
public void testSetParentScore() { public void testSetParentScore() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).setParentScore(42.0)); expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0));
} }
public void testResetBigChangeFlag() { public void testResetBigChangeFlag() {
BucketNormalizable normalizable = new BucketNormalizable(bucket); BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME);
normalizable.raiseBigChangeFlag(); normalizable.raiseBigChangeFlag();
normalizable.resetBigChangeFlag(); normalizable.resetBigChangeFlag();
assertFalse(bucket.hadBigNormalizedUpdate()); assertFalse(normalizable.hadBigNormalizedUpdate());
} }
public void testRaiseBigChangeFlag() { public void testRaiseBigChangeFlag() {
BucketNormalizable normalizable = new BucketNormalizable(bucket); BucketNormalizable normalizable = new BucketNormalizable(bucket, INDEX_NAME);
normalizable.resetBigChangeFlag(); normalizable.resetBigChangeFlag();
normalizable.raiseBigChangeFlag(); normalizable.raiseBigChangeFlag();
assertTrue(bucket.hadBigNormalizedUpdate()); assertTrue(normalizable.hadBigNormalizedUpdate());
} }
} }

View File

@ -12,6 +12,7 @@ import org.junit.Before;
import java.util.Date; import java.util.Date;
public class InfluencerNormalizableTests extends ESTestCase { public class InfluencerNormalizableTests extends ESTestCase {
private static final String INDEX_NAME = "foo-index";
private static final double EPSILON = 0.0001; private static final double EPSILON = 0.0001;
private Influencer influencer; private Influencer influencer;
@ -24,43 +25,43 @@ public class InfluencerNormalizableTests extends ESTestCase {
} }
public void testIsContainerOnly() { public void testIsContainerOnly() {
assertFalse(new InfluencerNormalizable(influencer).isContainerOnly()); assertFalse(new InfluencerNormalizable(influencer, INDEX_NAME).isContainerOnly());
} }
public void testGetLevel() { public void testGetLevel() {
assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer).getLevel()); assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer, INDEX_NAME).getLevel());
} }
public void testGetPartitionFieldName() { public void testGetPartitionFieldName() {
assertNull(new InfluencerNormalizable(influencer).getPartitionFieldName()); assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getPartitionFieldName());
} }
public void testGetPartitionFieldValue() { public void testGetPartitionFieldValue() {
assertNull(new InfluencerNormalizable(influencer).getPartitionFieldValue()); assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getPartitionFieldValue());
} }
public void testGetPersonFieldName() { public void testGetPersonFieldName() {
assertEquals("airline", new InfluencerNormalizable(influencer).getPersonFieldName()); assertEquals("airline", new InfluencerNormalizable(influencer, INDEX_NAME).getPersonFieldName());
} }
public void testGetFunctionName() { public void testGetFunctionName() {
assertNull(new InfluencerNormalizable(influencer).getFunctionName()); assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getFunctionName());
} }
public void testGetValueFieldName() { public void testGetValueFieldName() {
assertNull(new InfluencerNormalizable(influencer).getValueFieldName()); assertNull(new InfluencerNormalizable(influencer, INDEX_NAME).getValueFieldName());
} }
public void testGetProbability() { public void testGetProbability() {
assertEquals(0.05, new InfluencerNormalizable(influencer).getProbability(), EPSILON); assertEquals(0.05, new InfluencerNormalizable(influencer, INDEX_NAME).getProbability(), EPSILON);
} }
public void testGetNormalizedScore() { public void testGetNormalizedScore() {
assertEquals(1.0, new InfluencerNormalizable(influencer).getNormalizedScore(), EPSILON); assertEquals(1.0, new InfluencerNormalizable(influencer, INDEX_NAME).getNormalizedScore(), EPSILON);
} }
public void testSetNormalizedScore() { public void testSetNormalizedScore() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME);
normalizable.setNormalizedScore(99.0); normalizable.setNormalizedScore(99.0);
@ -69,40 +70,38 @@ public class InfluencerNormalizableTests extends ESTestCase {
} }
public void testGetChildrenTypes() { public void testGetChildrenTypes() {
assertTrue(new InfluencerNormalizable(influencer).getChildrenTypes().isEmpty()); assertTrue(new InfluencerNormalizable(influencer, INDEX_NAME).getChildrenTypes().isEmpty());
} }
public void testGetChildren_ByType() { public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).getChildren(0)); expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER));
} }
public void testGetChildren() { public void testGetChildren() {
assertTrue(new InfluencerNormalizable(influencer).getChildren().isEmpty()); assertTrue(new InfluencerNormalizable(influencer, INDEX_NAME).getChildren().isEmpty());
} }
public void testSetMaxChildrenScore() { public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setMaxChildrenScore(0, 42.0)); expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0));
} }
public void testSetParentScore() { public void testSetParentScore() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setParentScore(42.0)); expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME).setParentScore(42.0));
} }
public void testResetBigChangeFlag() { public void testResetBigChangeFlag() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME);
normalizable.raiseBigChangeFlag(); normalizable.raiseBigChangeFlag();
normalizable.resetBigChangeFlag(); normalizable.resetBigChangeFlag();
assertFalse(normalizable.hadBigNormalizedUpdate());
assertFalse(influencer.hadBigNormalizedUpdate());
} }
public void testRaiseBigChangeFlag() { public void testRaiseBigChangeFlag() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer); InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer, INDEX_NAME);
normalizable.resetBigChangeFlag(); normalizable.resetBigChangeFlag();
normalizable.raiseBigChangeFlag(); normalizable.raiseBigChangeFlag();
assertTrue(normalizable.hadBigNormalizedUpdate());
assertTrue(influencer.hadBigNormalizedUpdate());
} }
} }

View File

@ -11,10 +11,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.*;
import java.util.Date;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -28,6 +25,7 @@ import static org.mockito.Mockito.when;
public class NormalizerTests extends ESTestCase { public class NormalizerTests extends ESTestCase {
private static final String JOB_ID = "foo"; private static final String JOB_ID = "foo";
private static final String INDEX_NAME = "foo-index";
private static final String QUANTILES_STATE = "someState"; private static final String QUANTILES_STATE = "someState";
private static final int BUCKET_SPAN = 600; private static final int BUCKET_SPAN = 600;
private static final double INITIAL_SCORE = 2.0; private static final double INITIAL_SCORE = 2.0;
@ -57,12 +55,8 @@ public class NormalizerTests extends ESTestCase {
Bucket bucket = generateBucket(new Date(0)); Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0); bucket.setAnomalyScore(0.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
List<Normalizable> asNormalizables = buckets.stream()
.map(b -> new BucketNormalizable(b)).collect(Collectors.toList());
List<Normalizable> asNormalizables = Arrays.asList(new BucketNormalizable(bucket, INDEX_NAME));
normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE); normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
assertEquals(1, asNormalizables.size()); assertEquals(1, asNormalizables.size());

View File

@ -5,18 +5,9 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Deque; import java.util.Deque;
@ -27,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.Detector; import org.elasticsearch.xpack.ml.job.Detector;
import org.elasticsearch.xpack.ml.job.Job; import org.elasticsearch.xpack.ml.job.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchBatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator;
@ -36,13 +28,28 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.junit.Before; import org.junit.Before;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.never;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.mock;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
public class ScoresUpdaterTests extends ESTestCase { public class ScoresUpdaterTests extends ESTestCase {
private static final String JOB_ID = "foo"; private static final String JOB_ID = "foo";
private static final String QUANTILES_STATE = "someState"; private static final String QUANTILES_STATE = "someState";
private static final long DEFAULT_BUCKET_SPAN = 3600; private static final long DEFAULT_BUCKET_SPAN = 3600;
private static final long DEFAULT_START_TIME = 0; private static final long DEFAULT_START_TIME = 0;
private static final long DEFAULT_END_TIME = 3600;
private JobProvider jobProvider = mock(JobProvider.class); private JobProvider jobProvider = mock(JobProvider.class);
private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class); private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class);
@ -57,6 +64,7 @@ public class ScoresUpdaterTests extends ESTestCase {
} }
@Before @Before
@SuppressWarnings("unchecked")
public void setUpMocks() throws IOException { public void setUpMocks() throws IOException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
@ -72,9 +80,10 @@ public class ScoresUpdaterTests extends ESTestCase {
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory); scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
givenProviderReturnsNoBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME); givenProviderReturnsNoBuckets();
givenProviderReturnsNoInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME); givenProviderReturnsNoInfluencers();
givenNormalizerFactoryReturnsMock(); givenNormalizerFactoryReturnsMock();
givenNormalizerRaisesBigChangeFlag();
} }
public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException { public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException {
@ -83,136 +92,70 @@ public class ScoresUpdaterTests extends ESTestCase {
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.7, 0.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.7, 0.0));
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket); buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); givenProviderReturnsBuckets(buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(0); verifyNormalizerWasInvoked(0);
verifyBucketWasNotUpdated(bucket); verifyNothingWasUpdated();
verifyBucketRecordsWereNotUpdated(bucket.getId());
} }
public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException { public void testUpdate_GivenTwoBucketsOnlyOneUpdated() throws IOException {
Bucket bucket = generateBucket(new Date(0)); Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(30.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
bucket = generateBucket(new Date(1000));
bucket.setAnomalyScore(0.0); bucket.setAnomalyScore(0.0);
bucket.setBucketInfluencers(new ArrayList<>());
Deque<Bucket> buckets = new ArrayDeque<>(); givenProviderReturnsBuckets(buckets);
buckets.add(bucket); givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(0); verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any());
verifyBucketRecordsWereNotUpdated(bucket.getId());
} }
public void testUpdate_GivenSingleBucketWithoutBigChangeAndNoRecords() throws IOException { public void testUpdate_GivenSingleBucketWithAnomalyScoreAndNoRecords() throws IOException {
Bucket bucket = generateBucket(new Date(0)); Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(buckets);
givenProviderReturnsRecords(new ArrayDeque<>());
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(1);
}
public void testUpdate_GivenSingleBucketAndRecords() throws IOException {
Bucket bucket = generateBucket(new Date(DEFAULT_START_TIME));
bucket.setAnomalyScore(30.0); bucket.setAnomalyScore(30.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<AnomalyRecord> records = new ArrayDeque<>();
buckets.add(bucket); AnomalyRecord record1 = createRecord();
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); AnomalyRecord record2 = createRecord();
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithoutBigChangeAndRecordsWithoutBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(30.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithoutBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
records.add(record1); records.add(record1);
records.add(record2); records.add(record2);
bucket.setRecords(records);
bucket.setRecordCount(2);
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket); buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); givenProviderReturnsBuckets(buckets);
givenProviderReturnsRecords(records);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1); verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any());
verifyBucketRecordsWereNotUpdated(bucket.getId()); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any());
} verify(jobRenormalizedResultsPersister, times(2)).executeRequest(anyString());
public void testUpdate_GivenSingleBucketWithBigChangeAndNoRecords() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithoutBigChangeAndSomeRecordsWithBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
AnomalyRecord record3 = createRecordWithBigChange();
records.add(record1);
records.add(record2);
records.add(record3);
bucket.setRecords(records);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3));
}
public void testUpdate_GivenSingleBucketWithBigChangeAndSomeRecordsWithBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
AnomalyRecord record3 = createRecordWithBigChange();
records.add(record1);
records.add(record2);
records.add(record3);
bucket.setRecords(records);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3));
} }
public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException { public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException {
@ -222,7 +165,6 @@ public class ScoresUpdaterTests extends ESTestCase {
bucket.setAnomalyScore(42.0); bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0); bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
batch1.add(bucket); batch1.add(bucket);
} }
@ -230,11 +172,11 @@ public class ScoresUpdaterTests extends ESTestCase {
secondBatchBucket.addBucketInfluencer(createTimeBucketInfluencer(secondBatchBucket.getTimestamp(), 0.04, 42.0)); secondBatchBucket.addBucketInfluencer(createTimeBucketInfluencer(secondBatchBucket.getTimestamp(), 0.04, 42.0));
secondBatchBucket.setAnomalyScore(42.0); secondBatchBucket.setAnomalyScore(42.0);
secondBatchBucket.setMaxNormalizedProbability(50.0); secondBatchBucket.setMaxNormalizedProbability(50.0);
secondBatchBucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> batch2 = new ArrayDeque<>(); Deque<Bucket> batch2 = new ArrayDeque<>();
batch2.add(secondBatchBucket); batch2.add(secondBatchBucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch1, batch2); givenProviderReturnsBuckets(batch1, batch2);
givenProviderReturnsRecords(new ArrayDeque<>());
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
@ -242,13 +184,7 @@ public class ScoresUpdaterTests extends ESTestCase {
// Batch 1 - Just verify first and last were updated as Mockito // Batch 1 - Just verify first and last were updated as Mockito
// is forbiddingly slow when tring to verify all 10000 // is forbiddingly slow when tring to verify all 10000
verifyBucketWasUpdated(batch1.getFirst()); verifyBucketWasUpdated(10001);
verifyBucketRecordsWereNotUpdated(batch1.getFirst().getId());
verifyBucketWasUpdated(batch1.getLast());
verifyBucketRecordsWereNotUpdated(batch1.getLast().getId());
verifyBucketWasUpdated(secondBatchBucket);
verifyBucketRecordsWereNotUpdated(secondBatchBucket.getId());
} }
public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondNormalization() throws IOException { public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondNormalization() throws IOException {
@ -256,109 +192,101 @@ public class ScoresUpdaterTests extends ESTestCase {
bucket1.setAnomalyScore(42.0); bucket1.setAnomalyScore(42.0);
bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0)); bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0));
bucket1.setMaxNormalizedProbability(50.0); bucket1.setMaxNormalizedProbability(50.0);
bucket1.raiseBigNormalizedUpdateFlag(); List<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> records = new ArrayList<>();
when(jobProvider.expandBucket(JOB_ID, false, bucket1)).thenReturn(100000); Date date = new Date();
for (int i=0; i<100000; i++) {
records.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i)));
}
Bucket bucket2 = generateBucket(new Date(10000 * 1000)); Bucket bucket2 = generateBucket(new Date(10000 * 1000));
bucket2.addBucketInfluencer(createTimeBucketInfluencer(bucket2.getTimestamp(), 0.04, 42.0)); bucket2.addBucketInfluencer(createTimeBucketInfluencer(bucket2.getTimestamp(), 0.04, 42.0));
bucket2.setAnomalyScore(42.0); bucket2.setAnomalyScore(42.0);
bucket2.setMaxNormalizedProbability(50.0); bucket2.setMaxNormalizedProbability(50.0);
bucket2.raiseBigNormalizedUpdateFlag();
Deque<Bucket> batch = new ArrayDeque<>(); Deque<Bucket> batch = new ArrayDeque<>();
batch.add(bucket1); batch.add(bucket1);
batch.add(bucket2); batch.add(bucket2);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch); givenProviderReturnsBuckets(batch);
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> recordBatches = new ArrayList<>();
recordBatches.add(new ArrayDeque<>(records));
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(recordBatches);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(2); verifyNormalizerWasInvoked(2);
verifyBucketWasUpdated(bucket1);
verifyBucketRecordsWereNotUpdated(bucket1.getId());
verifyBucketWasUpdated(bucket2);
verifyBucketRecordsWereNotUpdated(bucket2.getId());
} }
public void testUpdate_GivenInfluencerWithBigChange() throws IOException { public void testUpdate_GivenInfluencerWithBigChange() throws IOException {
Influencer influencer = new Influencer(JOB_ID, "n", "v", new Date(DEFAULT_START_TIME), 600, 1); Influencer influencer = new Influencer(JOB_ID, "n", "v", new Date(DEFAULT_START_TIME), 600, 1);
influencer.raiseBigNormalizedUpdateFlag();
Deque<Influencer> influencers = new ArrayDeque<>(); Deque<Influencer> influencers = new ArrayDeque<>();
influencers.add(influencer); influencers.add(influencer);
givenProviderReturnsInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME, influencers); givenProviderReturnsInfluencers(influencers);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1); verifyNormalizerWasInvoked(1);
verifyInfluencerWasUpdated(influencer); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any());
verify(jobRenormalizedResultsPersister, times(1)).executeRequest(anyString());
} }
public void testDefaultRenormalizationWindowBasedOnTime() throws IOException { public void testDefaultRenormalizationWindowBasedOnTime() throws IOException {
Bucket bucket = generateBucket(new Date(0)); Bucket bucket = generateBucket(new Date(2509200000L));
bucket.setAnomalyScore(42.0); bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0); bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket); buckets.add(bucket);
givenProviderReturnsBuckets(2509200000L, 2595600000L, buckets); givenProviderReturnsBuckets(buckets);
givenProviderReturnsNoInfluencers(2509200000L, 2595600000L); givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false); scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false);
verifyNormalizerWasInvoked(1); verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket); verifyBucketWasUpdated(1);
verifyBucketRecordsWereNotUpdated(bucket.getId());
} }
public void testManualRenormalizationWindow() throws IOException { public void testManualRenormalizationWindow() throws IOException {
Bucket bucket = generateBucket(new Date(3600000));
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0); bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0); bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket); buckets.add(bucket);
givenProviderReturnsBuckets(3600000, 90000000L, buckets); givenProviderReturnsBuckets(buckets);
givenProviderReturnsNoInfluencers(3600000, 90000000L); givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false); scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false);
verifyNormalizerWasInvoked(1); verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket); verifyBucketWasUpdated(1);
verifyBucketRecordsWereNotUpdated(bucket.getId());
} }
public void testManualRenormalizationWindow_GivenExtension() throws IOException { public void testManualRenormalizationWindow_GivenExtension() throws IOException {
Bucket bucket = generateBucket(new Date(0)); Bucket bucket = generateBucket(new Date(2700000));
bucket.setAnomalyScore(42.0); bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0)); bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0); bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>(); Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket); buckets.add(bucket);
givenProviderReturnsBuckets(2700000, 90000000L, buckets); givenProviderReturnsBuckets(buckets);
givenProviderReturnsNoInfluencers(2700000, 90000000L); givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false); scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false);
verifyNormalizerWasInvoked(1); verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket); verifyBucketWasUpdated(1);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
private void verifyNormalizerWasInvoked(int times) throws IOException {
int bucketSpan = job.getAnalysisConfig() == null ? 0
: job.getAnalysisConfig().getBucketSpan().intValue();
verify(normalizer, times(times)).normalize(
eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
eq(QUANTILES_STATE));
} }
private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) { private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) {
@ -369,80 +297,106 @@ public class ScoresUpdaterTests extends ESTestCase {
return influencer; return influencer;
} }
private void givenNormalizerFactoryReturnsMock() { private static AnomalyRecord createRecord() {
when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer);
}
private void givenProviderReturnsNoBuckets(long startTime, long endTime) {
givenBuckets(startTime, endTime, Collections.emptyList());
}
private void givenProviderReturnsBuckets(long startTime, long endTime, Deque<Bucket> batch1, Deque<Bucket> batch2) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(batch1));
batches.add(new ArrayDeque<>(batch2));
givenBuckets(startTime, endTime, batches);
}
private void givenProviderReturnsBuckets(long startTime, long endTime, Deque<Bucket> buckets) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(buckets));
givenBuckets(startTime, endTime, batches);
}
private void givenBuckets(long startTime, long endTime, List<Deque<Bucket>> batches) {
BatchedDocumentsIterator<Bucket> iterator = new MockBatchedDocumentsIterator<>(startTime,
endTime, batches);
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(iterator);
}
private void givenProviderReturnsNoInfluencers(long startTime, long endTime) {
givenProviderReturnsInfluencers(startTime, endTime, new ArrayDeque<>());
}
private void givenProviderReturnsInfluencers(long startTime, long endTime,
Deque<Influencer> influencers) {
List<Deque<Influencer>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(influencers));
BatchedDocumentsIterator<Influencer> iterator = new MockBatchedDocumentsIterator<>(
startTime, endTime, batches);
when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
}
private void verifyBucketWasUpdated(Bucket bucket) {
verify(jobRenormalizedResultsPersister).updateBucket(bucket);
}
private void verifyRecordsWereUpdated(String bucketId, List<AnomalyRecord> records) {
verify(jobRenormalizedResultsPersister).updateRecords(bucketId, records);
}
private void verifyBucketWasNotUpdated(Bucket bucket) {
verify(jobRenormalizedResultsPersister, never()).updateBucket(bucket);
}
private void verifyBucketRecordsWereNotUpdated(String bucketId) {
verify(jobRenormalizedResultsPersister, never()).updateRecords(eq(bucketId),
anyListOf(AnomalyRecord.class));
}
private static AnomalyRecord createRecordWithoutBigChange() {
return createRecord(false);
}
private static AnomalyRecord createRecordWithBigChange() {
return createRecord(true);
}
private static AnomalyRecord createRecord(boolean hadBigChange) {
AnomalyRecord anomalyRecord = mock(AnomalyRecord.class); AnomalyRecord anomalyRecord = mock(AnomalyRecord.class);
when(anomalyRecord.hadBigNormalizedUpdate()).thenReturn(hadBigChange);
when(anomalyRecord.getId()).thenReturn("someId"); when(anomalyRecord.getId()).thenReturn("someId");
return anomalyRecord; return anomalyRecord;
} }
private void verifyInfluencerWasUpdated(Influencer influencer) { private void givenNormalizerFactoryReturnsMock() {
List<Influencer> list = new ArrayList<>(); when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer);
list.add(influencer); }
verify(jobRenormalizedResultsPersister).updateInfluencer(eq(JOB_ID), eq(list)); private void givenProviderReturnsNoBuckets() {
givenBuckets(Collections.emptyList());
}
@SuppressWarnings("unchecked")
private void givenNormalizerRaisesBigChangeFlag() {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
List<Normalizable> normalizables = (List<Normalizable>) invocationOnMock.getArguments()[2];
for (Normalizable normalizable : normalizables) {
normalizable.raiseBigChangeFlag();
for (Normalizable child : normalizable.getChildren()) {
child.raiseBigChangeFlag();
}
}
return null;
}
}).when(normalizer).normalize(anyInt(), anyBoolean(), anyList(), anyString());
}
private void givenProviderReturnsBuckets(Deque<Bucket> batch1, Deque<Bucket> batch2) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(batch1));
batches.add(new ArrayDeque<>(batch2));
givenBuckets(batches);
}
private void givenProviderReturnsBuckets(Deque<Bucket> buckets) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(buckets));
givenBuckets(batches);
}
private void givenBuckets(List<Deque<Bucket>> batches) {
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>>> batchesWithIndex = new ArrayList<>();
for (Deque<Bucket> deque : batches) {
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> queueWithIndex = new ArrayDeque<>();
for (Bucket bucket : deque) {
queueWithIndex.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", bucket));
}
batchesWithIndex.add(queueWithIndex);
}
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Bucket>> bucketIter =
new MockBatchedDocumentsIterator<>(batchesWithIndex);
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
}
private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) {
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> batch = new ArrayDeque<>();
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> batches = new ArrayList<>();
for (AnomalyRecord record : records) {
batch.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", record));
}
batches.add(batch);
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(batches);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
}
private void givenProviderReturnsNoInfluencers() {
givenProviderReturnsInfluencers(new ArrayDeque<>());
}
private void givenProviderReturnsInfluencers(Deque<Influencer> influencers) {
List<Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>>> batches = new ArrayList<>();
Deque<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> queue = new ArrayDeque<>();
for (Influencer inf : influencers) {
queue.add(new ElasticsearchBatchedResultsIterator.ResultWithIndex<>("foo", inf));
}
batches.add(queue);
BatchedDocumentsIterator<ElasticsearchBatchedResultsIterator.ResultWithIndex<Influencer>> iterator =
new MockBatchedDocumentsIterator<>(batches);
when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
}
private void verifyNormalizerWasInvoked(int times) throws IOException {
int bucketSpan = job.getAnalysisConfig() == null ? 0 : job.getAnalysisConfig().getBucketSpan().intValue();
verify(normalizer, times(times)).normalize(
eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
eq(QUANTILES_STATE));
}
private void verifyNothingWasUpdated() {
verify(jobRenormalizedResultsPersister, never()).updateBucket(any());
verify(jobRenormalizedResultsPersister, never()).updateResults(any());
}
private void verifyBucketWasUpdated(int bucketCount) {
verify(jobRenormalizedResultsPersister, times(bucketCount)).updateBucket(any());
} }
} }

View File

@ -251,14 +251,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertEquals(bucket1.hashCode(), bucket2.hashCode()); assertEquals(bucket1.hashCode(), bucket2.hashCode());
} }
public void testIsNormalizable_GivenEmptyBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(Collections.emptyList());
bucket.setAnomalyScore(90.0);
assertFalse(bucket.isNormalizable());
}
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() { public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123); Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1)); bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));