Make the per partition max anomaly scores a result type (elastic/elasticsearch#411)

Original commit: elastic/x-pack-elasticsearch@002a1d7623
This commit is contained in:
David Kyle 2016-11-29 11:52:37 +00:00 committed by GitHub
parent 78cd60048c
commit 576a591d3b
13 changed files with 516 additions and 210 deletions

View File

@ -56,12 +56,11 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
import org.elasticsearch.xpack.prelert.lists.ListDocument;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -69,7 +68,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@ -191,7 +189,6 @@ public class ElasticsearchJobProvider implements JobProvider {
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
XContentBuilder modelSizeStatsMapping = ElasticsearchMappings.modelSizeStatsMapping();
XContentBuilder modelDebugMapping = ElasticsearchMappings.modelDebugOutputMapping(termFields);
XContentBuilder partitionScoreMapping = ElasticsearchMappings.bucketPartitionMaxNormalizedScores();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
String jobId = job.getId();
@ -206,7 +203,6 @@ public class ElasticsearchJobProvider implements JobProvider {
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
createIndexRequest.mapping(ModelSizeStats.TYPE.getPreferredName(), modelSizeStatsMapping);
createIndexRequest.mapping(ModelDebugOutput.TYPE.getPreferredName(), modelDebugMapping);
createIndexRequest.mapping(ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE, partitionScoreMapping);
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@ -295,10 +291,10 @@ public class ElasticsearchJobProvider implements JobProvider {
}
}
} else {
List<ScoreTimestamp> scores =
partitionScores(jobId, query.getEpochStart(), query.getEpochEnd(), query.getPartitionValue());
List<PerPartitionMaxProbabilities> scores =
partitionMaxNormalisedProbabilities(jobId, query.getEpochStart(), query.getEpochEnd(), query.getPartitionValue());
mergePartitionScoresIntoBucket(scores, buckets.results());
mergePartitionScoresIntoBucket(scores, buckets.results(), query.getPartitionValue());
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
@ -312,16 +308,16 @@ public class ElasticsearchJobProvider implements JobProvider {
return buckets;
}
void mergePartitionScoresIntoBucket(List<ScoreTimestamp> scores, List<Bucket> buckets) {
Iterator<ScoreTimestamp> itr = scores.iterator();
ScoreTimestamp score = itr.hasNext() ? itr.next() : null;
void mergePartitionScoresIntoBucket(List<PerPartitionMaxProbabilities> partitionProbs, List<Bucket> buckets, String partitionValue) {
Iterator<PerPartitionMaxProbabilities> itr = partitionProbs.iterator();
PerPartitionMaxProbabilities partitionProb = itr.hasNext() ? itr.next() : null;
for (Bucket b : buckets) {
if (score == null) {
if (partitionProb == null) {
b.setMaxNormalizedProbability(0.0);
} else {
if (score.timestamp.equals(b.getTimestamp())) {
b.setMaxNormalizedProbability(score.score);
score = itr.hasNext() ? itr.next() : null;
if (partitionProb.getTimestamp().equals(b.getTimestamp())) {
b.setMaxNormalizedProbability(partitionProb.getMaxProbabilityForPartition(partitionValue));
partitionProb = itr.hasNext() ? itr.next() : null;
} else {
b.setMaxNormalizedProbability(0.0);
}
@ -360,7 +356,7 @@ public class ElasticsearchJobProvider implements JobProvider {
try {
parser = XContentFactory.xContent(source).createParser(source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parser bucket", e);
throw new ElasticsearchParseException("failed to parse bucket", e);
}
Bucket bucket = Bucket.PARSER.apply(parser, () -> parseFieldMatcher);
@ -408,7 +404,7 @@ public class ElasticsearchJobProvider implements JobProvider {
try {
parser = XContentFactory.xContent(source).createParser(source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parser bucket", e);
throw new ElasticsearchParseException("failed to parse bucket", e);
}
Bucket bucket = Bucket.PARSER.apply(parser, () -> parseFieldMatcher);
@ -422,13 +418,17 @@ public class ElasticsearchJobProvider implements JobProvider {
expandBucket(jobId, query.isIncludeInterim(), bucket);
}
} else {
List<ScoreTimestamp> scores =
partitionScores(jobId,
query.getTimestamp(), query.getTimestamp() + 1,
query.getPartitionValue());
List<PerPartitionMaxProbabilities> partitionProbs =
partitionMaxNormalisedProbabilities(jobId, query.getTimestamp(), query.getTimestamp() + 1, query.getPartitionValue());
if (partitionProbs.size() > 1) {
LOGGER.error("Found more than one PerPartitionMaxProbabilities with timestamp [" + query.getTimestamp() + "]" +
" from index " + indexName);
}
if (partitionProbs.size() > 0) {
bucket.setMaxNormalizedProbability(partitionProbs.get(0).getMaxProbabilityForPartition(query.getPartitionValue()));
}
bucket.setMaxNormalizedProbability(scores.isEmpty() == false ?
scores.get(0).score : 0.0d);
if (query.isExpand() && bucket.getRecordCount() > 0) {
this.expandBucketForPartitionValue(jobId, query.isIncludeInterim(),
bucket, query.getPartitionValue());
@ -440,31 +440,27 @@ public class ElasticsearchJobProvider implements JobProvider {
return new QueryPage<>(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD);
}
final class ScoreTimestamp {
double score;
Date timestamp;
public ScoreTimestamp(Date timestamp, double score) {
this.score = score;
this.timestamp = timestamp;
}
}
private List<ScoreTimestamp> partitionScores(String jobId, Object epochStart,
Object epochEnd, String partitionFieldValue)
throws ResourceNotFoundException {
QueryBuilder qb = new ResultsFilterBuilder()
private List<PerPartitionMaxProbabilities> partitionMaxNormalisedProbabilities(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue)
throws ResourceNotFoundException
{
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, epochStart, epochEnd)
.build();
FieldSortBuilder sb = new FieldSortBuilder(ElasticsearchMappings.ES_TIMESTAMP)
.order(SortOrder.ASC);
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(timeRangeQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE))
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(ElasticsearchMappings.ES_TIMESTAMP).order(SortOrder.ASC);
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
.setQuery(qb)
.setQuery(boolQuery)
.addSort(sb)
.setTypes(ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE);
.setTypes(Result.TYPE.getPreferredName());
SearchResponse searchResponse;
try {
@ -473,23 +469,20 @@ public class ElasticsearchJobProvider implements JobProvider {
throw ExceptionsHelper.missingJobException(jobId);
}
List<ScoreTimestamp> results = new ArrayList<>();
List<PerPartitionMaxProbabilities> results = new ArrayList<>();
// expect 1 document per bucket
if (searchResponse.getHits().totalHits() > 0) {
Map<String, Object> m = searchResponse.getHits().getAt(0).getSource();
@SuppressWarnings("unchecked")
List<Map<String, Object>> probs = (List<Map<String, Object>>)
m.get(ReservedFieldNames.PARTITION_NORMALIZED_PROBS);
for (Map<String, Object> prob : probs) {
if (partitionFieldValue.equals(prob.get(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()))) {
Date ts = new Date(TimeUtils.dateStringToEpoch((String) m.get(ElasticsearchMappings.ES_TIMESTAMP)));
results.add(new ScoreTimestamp(ts,
(Double) prob.get(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())));
}
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse PerPartitionMaxProbabilities", e);
}
PerPartitionMaxProbabilities perPartitionMaxProbabilities =
PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher);
perPartitionMaxProbabilities.setId(hit.getId());
results.add(perPartitionMaxProbabilities);
}
return results;

View File

@ -23,7 +23,7 @@ import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influence;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
@ -196,6 +196,19 @@ public class ElasticsearchMappings {
// influencer mapping
.startObject(Influencer.INFLUENCER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
// per-partition max probabilities mapping
.startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject();
addAnomalyRecordFieldsToMapping(builder);
@ -382,47 +395,6 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* Partition normalized scores. There is one per bucket
* so the timestamp is sufficient to uniquely identify
* the document per bucket per job
* <p>
* Partition field values and scores are nested objects.
*/
public static XContentBuilder bucketPartitionMaxNormalizedScores() throws IOException {
return jsonBuilder()
.startObject()
.startObject(ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE)
.startObject(ALL)
.field(ENABLED, false)
// analyzer must be specified even though _all is disabled
// because all types in the same index must have the same
// analyzer for a given field
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
.endObject()
.startObject(ReservedFieldNames.PARTITION_NORMALIZED_PROBS)
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
}
public static XContentBuilder categorizerStateMapping() throws IOException {
return jsonBuilder()
.startObject()

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
@ -65,7 +66,6 @@ public class JobRenormaliser extends AbstractComponent {
logger.error(new ParameterizedMessage("[{}] Error updating standalone bucket influencer state", new Object[]{jobId}, e));
return;
}
jobResultsPersister.persistPerPartitionMaxProbabilities(bucket);
}
@ -107,6 +107,15 @@ public class JobRenormaliser extends AbstractComponent {
}
}
public void updatePerPartitionMaxProbabilities(String jobId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs =
new PerPartitionMaxProbabilities(records);
logger.trace("[{}] ES API CALL: update result type {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, ppMaxProbs.getId());
jobResultsPersister.persistPerPartitionMaxProbabilities(ppMaxProbs);
}
/**
* Update the influencer for a particular job
*/

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
@ -27,20 +26,19 @@ import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Saves result Buckets and Quantiles to Elasticsearch<br>
*
* <p>
* <b>Buckets</b> are written with the following structure:
* <h2>Bucket</h2> The results of each job are stored in buckets, this is the
* top level structure for the results. A bucket contains multiple anomaly
@ -81,8 +79,6 @@ public class JobResultsPersister extends AbstractComponent {
.execute().actionGet();
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
persistPerPartitionMaxProbabilities(bucket);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e));
}
@ -90,6 +86,7 @@ public class JobResultsPersister extends AbstractComponent {
/**
* Persist a list of anomaly records
*
* @param records the records to persist
*/
public void persistRecords(List<AnomalyRecord> records) {
@ -122,6 +119,7 @@ public class JobResultsPersister extends AbstractComponent {
/**
* Persist a list of influencers
*
* @param influencers the influencers to persist
*/
public void persistInfluencers(List<Influencer> influencers) {
@ -151,8 +149,26 @@ public class JobResultsPersister extends AbstractComponent {
}
}
public void persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
String jobId = partitionProbabilities.getJobId();
try {
XContentBuilder builder = toXContentBuilder(partitionProbabilities);
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp());
client.prepareIndex(indexName, Result.TYPE.getPreferredName())
.setSource(builder)
.execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores",
new Object[]{jobId}, e));
}
}
/**
* Persist the category definition
*
* @param category The category to be persisted
*/
public void persistCategoryDefinition(CategoryDefinition category) {
@ -291,7 +307,7 @@ public class JobResultsPersister extends AbstractComponent {
}
void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
Date bucketTime, boolean isInterim) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk();
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
@ -323,38 +339,6 @@ public class JobResultsPersister extends AbstractComponent {
return builder;
}
void persistPerPartitionMaxProbabilities(Bucket bucket) {
String jobId = bucket.getJobId();
if (bucket.getPerPartitionMaxProbability().isEmpty()) {
return;
}
try {
XContentBuilder builder = jsonBuilder();
builder.startObject()
.field(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp())
.field(Job.ID.getPreferredName(), bucket.getJobId());
builder.startArray(ReservedFieldNames.PARTITION_NORMALIZED_PROBS);
for (Map.Entry<String, Double> entry : bucket.getPerPartitionMaxProbability().entrySet()) {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), entry.getKey())
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), entry.getValue())
.endObject();
}
builder.endArray().endObject();
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}",
jobId, ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE, indexName, bucket.getEpoch());
client.prepareIndex(indexName, ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE)
.setSource(builder)
.setId(bucket.getId())
.execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores",
new Object[]{jobId}, e));
}
}
private static final String INDEX_PREFIX = "prelertresults-";
public static String getJobIndexName(String jobId) {

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import java.io.InputStream;
@ -102,14 +103,14 @@ public class AutoDetectResultProcessor {
persister.deleteInterimResults(context.jobId);
context.deleteInterimRequired = false;
}
if (context.isPerPartitionNormalization) {
bucket.calcMaxNormalizedProbabilityPerPartition();
}
persister.persistBucket(bucket);
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
persister.persistRecords(records);
if (context.isPerPartitionNormalization) {
persister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records));
}
}
List<Influencer> influencers = result.getInfluencers();
if (influencers != null && !influencers.isEmpty()) {

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser.Token;
@ -29,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* Bucket Result POJO
@ -40,6 +37,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
* Field Names
*/
public static final ParseField JOB_ID = Job.ID;
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField ANOMALY_SCORE = new ParseField("anomalyScore");
public static final ParseField INITIAL_ANOMALY_SCORE = new ParseField("initialAnomalyScore");
@ -324,12 +322,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
}
}
public Map<String, Double> calcMaxNormalizedProbabilityPerPartition() {
perPartitionMaxProbability = records.stream().collect(Collectors.groupingBy(AnomalyRecord::getPartitionFieldValue, Collector
.of(DoubleMaxBox::new, (m, ar) -> m.accept(ar.getNormalizedProbability()), DoubleMaxBox::combine, DoubleMaxBox::value)));
return perPartitionMaxProbability;
}
public Map<String, Double> getPerPartitionMaxProbability() {
return perPartitionMaxProbability;

View File

@ -0,0 +1,274 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.results;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* When per-partition normalisation is enabled this class represents
* the max anomalous probabilities of each partition per bucket. These values
* calculated from the bucket's anomaly records.
*/
public class PerPartitionMaxProbabilities extends ToXContentToBytes implements Writeable {
/**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "partitionNormalizedProbs";
/*
* Field Names
*/
public static final ParseField PER_PARTITION_MAX_PROBABILITIES = new ParseField("perPartitionMaxProbabilities");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<PerPartitionMaxProbabilities, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a ->
new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (List<PartitionProbability>) a[2]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + Bucket.TIMESTAMP.getPreferredName() + "]");
}, Bucket.TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES);
PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE);
}
private final String jobId;
private final Date timestamp;
private final List<PartitionProbability> perPartitionMaxProbabilities;
private String id;
public PerPartitionMaxProbabilities(String jobId, Date timestamp, List<PartitionProbability> partitionProbabilities) {
this.jobId = jobId;
this.timestamp = timestamp;
this.perPartitionMaxProbabilities = partitionProbabilities;
}
public PerPartitionMaxProbabilities(List<AnomalyRecord> records) {
if (records.isEmpty()) {
throw new IllegalArgumentException("PerPartitionMaxProbabilities cannot be created from an empty list of records");
}
this.jobId = records.get(0).getJobId();
this.timestamp = records.get(0).getTimestamp();
this.perPartitionMaxProbabilities = calcMaxNormalizedProbabilityPerPartition(records);
}
@SuppressWarnings("unchecked")
public PerPartitionMaxProbabilities(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = new Date(in.readLong());
perPartitionMaxProbabilities = in.readList(PartitionProbability::new);
id = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(timestamp.getTime());
out.writeList(perPartitionMaxProbabilities);
out.writeOptionalString(id);
}
public String getJobId() {
return jobId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Date getTimestamp() {
return timestamp;
}
public List<PartitionProbability> getPerPartitionMaxProbabilities() {
return perPartitionMaxProbabilities;
}
public double getMaxProbabilityForPartition(String partitionValue) {
Optional<PartitionProbability> first =
perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst();
return first.isPresent() ? first.get().getMaxNormalisedProbability() : 0.0;
}
/**
* Box class for the stream collector function below
*/
private final class DoubleMaxBox {
private double value = 0.0;
public DoubleMaxBox() {
}
public void accept(double d) {
if (d > value) {
value = d;
}
}
public DoubleMaxBox combine(DoubleMaxBox other) {
return (this.value > other.value) ? this : other;
}
public Double value() {
return this.value;
}
}
private List<PartitionProbability> calcMaxNormalizedProbabilityPerPartition(List<AnomalyRecord> anomalyRecords) {
Map<String, Double> maxValueByPartition = anomalyRecords.stream().collect(
Collectors.groupingBy(AnomalyRecord::getPartitionFieldValue,
Collector.of(DoubleMaxBox::new, (m, ar) -> m.accept(ar.getNormalizedProbability()),
DoubleMaxBox::combine, DoubleMaxBox::value)));
List<PartitionProbability> pProbs = new ArrayList<>();
for (Map.Entry<String, Double> entry : maxValueByPartition.entrySet()) {
pProbs.add(new PartitionProbability(entry.getKey(), entry.getValue()));
}
return pProbs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Bucket.TIMESTAMP.getPreferredName(), timestamp.getTime());
builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, id);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof PerPartitionMaxProbabilities == false) {
return false;
}
PerPartitionMaxProbabilities that = (PerPartitionMaxProbabilities) other;
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.timestamp, that.timestamp)
&& Objects.equals(this.id, that.id)
&& Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities);
}
/**
* Class for partitionValue, maxNormalisedProb pairs
*/
public static class PartitionProbability extends ToXContentToBytes implements Writeable {
public static final ConstructingObjectParser<PartitionProbability, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(),
a -> new PartitionProbability((String) a[0], (double) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), Bucket.MAX_NORMALIZED_PROBABILITY);
}
private final String partitionValue;
private final double maxNormalisedProbability;
PartitionProbability(String partitionName, double maxNormalisedProbability) {
this.partitionValue = partitionName;
this.maxNormalisedProbability = maxNormalisedProbability;
}
public PartitionProbability(StreamInput in) throws IOException {
partitionValue = in.readString();
maxNormalisedProbability = in.readDouble();
}
public String getPartitionValue() {
return partitionValue;
}
public double getMaxNormalisedProbability() {
return maxNormalisedProbability;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionValue);
out.writeDouble(maxNormalisedProbability);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue)
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalisedProbability)
.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(partitionValue, maxNormalisedProbability);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof PartitionProbability == false) {
return false;
}
PartitionProbability that = (PartitionProbability) other;
return Objects.equals(this.partitionValue, that.partitionValue) && this.maxNormalisedProbability == that.maxNormalisedProbability;
}
}
}

View File

@ -34,9 +34,6 @@ public final class ReservedFieldNames {
*/
private static final String ES_TIMESTAMP = "timestamp";
public static final String PARTITION_NORMALIZED_PROB_TYPE = "partitionNormalizedProb";
public static final String PARTITION_NORMALIZED_PROBS = "partitionNormalizedProbs";
/**
* This array should be updated to contain all the field names that appear
* in any documents we store in our results index. (The reason it's any
@ -95,9 +92,6 @@ public final class ReservedFieldNames {
BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(),
BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(),
PARTITION_NORMALIZED_PROBS,
PARTITION_NORMALIZED_PROB_TYPE,
CategoryDefinition.CATEGORY_ID.getPreferredName(),
CategoryDefinition.TERMS.getPreferredName(),
CategoryDefinition.REGEX.getPreferredName(),
@ -146,6 +140,8 @@ public final class ReservedFieldNames {
ModelSnapshot.LATEST_RECORD_TIME.getPreferredName(),
ModelSnapshot.LATEST_RESULT_TIME.getPreferredName(),
PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(),
Quantiles.QUANTILE_STATE.getPreferredName(),
Result.RESULT_TYPE.getPreferredName(),

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
@ -922,10 +923,22 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
ElasticsearchJobProvider provider = createProvider(clientBuilder.build());
List<ElasticsearchJobProvider.ScoreTimestamp> scores = new ArrayList<>();
scores.add(provider.new ScoreTimestamp(new Date(2), 1.0));
scores.add(provider.new ScoreTimestamp(new Date(3), 2.0));
scores.add(provider.new ScoreTimestamp(new Date(5), 3.0));
List<PerPartitionMaxProbabilities> partitionMaxProbs = new ArrayList<>();
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("partitionValue1", new Date(2), 1.0));
records.add(createAnomalyRecord("partitionValue2", new Date(2), 4.0));
partitionMaxProbs.add(new PerPartitionMaxProbabilities(records));
records.clear();
records.add(createAnomalyRecord("partitionValue1", new Date(3), 2.0));
records.add(createAnomalyRecord("partitionValue2", new Date(3), 1.0));
partitionMaxProbs.add(new PerPartitionMaxProbabilities(records));
records.clear();
records.add(createAnomalyRecord("partitionValue1", new Date(5), 3.0));
records.add(createAnomalyRecord("partitionValue2", new Date(5), 2.0));
partitionMaxProbs.add(new PerPartitionMaxProbabilities(records));
List<Bucket> buckets = new ArrayList<>();
buckets.add(createBucketAtEpochTime(1));
@ -935,13 +948,29 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
buckets.add(createBucketAtEpochTime(5));
buckets.add(createBucketAtEpochTime(6));
provider.mergePartitionScoresIntoBucket(scores, buckets);
provider.mergePartitionScoresIntoBucket(partitionMaxProbs, buckets, "partitionValue1");
assertEquals(0.0, buckets.get(0).getMaxNormalizedProbability(), 0.001);
assertEquals(1.0, buckets.get(1).getMaxNormalizedProbability(), 0.001);
assertEquals(2.0, buckets.get(2).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(3).getMaxNormalizedProbability(), 0.001);
assertEquals(3.0, buckets.get(4).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(5).getMaxNormalizedProbability(), 0.001);
provider.mergePartitionScoresIntoBucket(partitionMaxProbs, buckets, "partitionValue2");
assertEquals(0.0, buckets.get(0).getMaxNormalizedProbability(), 0.001);
assertEquals(4.0, buckets.get(1).getMaxNormalizedProbability(), 0.001);
assertEquals(1.0, buckets.get(2).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(3).getMaxNormalizedProbability(), 0.001);
assertEquals(2.0, buckets.get(4).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(5).getMaxNormalizedProbability(), 0.001);
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, Date timestamp, double normalizedProbability) {
AnomalyRecord record = new AnomalyRecord("foo");
record.setPartitionFieldValue(partitionFieldValue);
record.setNormalizedProbability(normalizedProbability);
record.setTimestamp(timestamp);
return record;
}
public void testMergePartitionScoresIntoBucket_WithEmptyScoresList() throws InterruptedException, ExecutionException {
@ -950,7 +979,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
ElasticsearchJobProvider provider = createProvider(clientBuilder.build());
List<ElasticsearchJobProvider.ScoreTimestamp> scores = new ArrayList<>();
List<PerPartitionMaxProbabilities> scores = new ArrayList<>();
List<Bucket> buckets = new ArrayList<>();
buckets.add(createBucketAtEpochTime(1));
@ -958,7 +987,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
buckets.add(createBucketAtEpochTime(3));
buckets.add(createBucketAtEpochTime(4));
provider.mergePartitionScoresIntoBucket(scores, buckets);
provider.mergePartitionScoresIntoBucket(scores, buckets, "partitionValue");
assertEquals(0.0, buckets.get(0).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(1).getMaxNormalizedProbability(), 0.001);
assertEquals(0.0, buckets.get(2).getMaxNormalizedProbability(), 0.001);

View File

@ -128,11 +128,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.bucketPartitionMaxNormalizedScores();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.categorizerStateMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
import org.mockito.InOrder;
@ -66,25 +67,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, times(1)).persistBucket(bucket);
verify(persister, never()).deleteInterimResults("_id");
verify(bucket, never()).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
}
public void testProcessResult_bucket_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", true);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, never()).deleteInterimResults("_id");
verify(bucket, times(1)).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
}
@ -101,7 +83,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, times(1)).persistBucket(bucket);
verify(persister, times(1)).deleteInterimResults("_id");
verify(bucket, never()).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
assertFalse(context.deleteInterimRequired);
}
@ -124,6 +105,27 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(persister);
}
public void testProcessResult_records_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo");
record1.setPartitionFieldValue("pValue");
AnomalyRecord record2 = new AnomalyRecord("foo");
record2.setPartitionFieldValue("pValue");
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
verify(persister, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(persister, times(1)).persistRecords(records);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_influencers() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);

View File

@ -308,29 +308,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertTrue(bucket.isNormalisable());
}
public void testSetMaxNormalizedProbabilityPerPartition() {
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("A", 20.0));
records.add(createAnomalyRecord("A", 40.0));
records.add(createAnomalyRecord("B", 90.0));
records.add(createAnomalyRecord("B", 15.0));
records.add(createAnomalyRecord("B", 45.0));
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setRecords(records);
Map<String, Double> ppProb = bucket.calcMaxNormalizedProbabilityPerPartition();
assertEquals(40.0, ppProb.get("A"), 0.0001);
assertEquals(90.0, ppProb.get("B"), 0.0001);
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double normalizedProbability) {
AnomalyRecord record = new AnomalyRecord("foo");
record.setPartitionFieldValue(partitionFieldValue);
record.setNormalizedProbability(normalizedProbability);
return record;
}
public void testPartitionAnomalyScore() {
List<PartitionScore> pScore = new ArrayList<>();
pScore.add(new PartitionScore("pf", "pv1", 10, 0.1));
@ -350,5 +327,4 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
anomalyScore = bucket.partitionAnomalyScore("pv4");
assertEquals(60.0, anomalyScore, 0.001);
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.results;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.List;
public class PerPartitionMaxProbabilitiesTest extends AbstractSerializingTestCase<PerPartitionMaxProbabilities> {
@Override
protected PerPartitionMaxProbabilities createTestInstance() {
int num = randomInt(10);
List<PerPartitionMaxProbabilities.PartitionProbability> pps = new ArrayList<>();
for (int i=0; i<num; i++) {
pps.add(new PerPartitionMaxProbabilities.PartitionProbability(randomAsciiOfLength(12), randomDouble()));
}
return new PerPartitionMaxProbabilities(randomAsciiOfLength(20), new DateTime(randomDateTimeZone()).toDate(), pps);
}
@Override
protected Writeable.Reader<PerPartitionMaxProbabilities> instanceReader() {
return PerPartitionMaxProbabilities::new;
}
@Override
protected PerPartitionMaxProbabilities parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return PerPartitionMaxProbabilities.PARSER.apply(parser, () -> matcher);
}
public void testCreateFromAListOfRecords() {
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("A", 20.0));
records.add(createAnomalyRecord("A", 40.0));
records.add(createAnomalyRecord("B", 90.0));
records.add(createAnomalyRecord("B", 15.0));
records.add(createAnomalyRecord("B", 45.0));
PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records);
List<PerPartitionMaxProbabilities.PartitionProbability> pProbs = ppMax.getPerPartitionMaxProbabilities();
assertEquals(2, pProbs.size());
for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) {
if (pProb.getPartitionValue().equals("A")) {
assertEquals(40.0, pProb.getMaxNormalisedProbability(), 0.0001);
}
else {
assertEquals(90.0, pProb.getMaxNormalisedProbability(), 0.0001);
}
}
}
public void testMaxProbabilityForPartition() {
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("A", 20.0));
records.add(createAnomalyRecord("A", 40.0));
records.add(createAnomalyRecord("B", 90.0));
records.add(createAnomalyRecord("B", 15.0));
records.add(createAnomalyRecord("B", 45.0));
PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records);
assertEquals(40.0, ppMax.getMaxProbabilityForPartition("A"), 0.0001);
assertEquals(90.0, ppMax.getMaxProbabilityForPartition("B"), 0.0001);
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double normalizedProbability) {
AnomalyRecord record = new AnomalyRecord("foo");
record.setPartitionFieldValue(partitionFieldValue);
record.setNormalizedProbability(normalizedProbability);
return record;
}
}