Use well-defined IDs for records and influencers (elastic/elasticsearch#510)

* Use well-defined IDs for records and influencers

Removes the reliance on ES autogenerated UUIDs for all types that will
be renormalized

* Address some review comments

Original commit: elastic/x-pack-elasticsearch@85fde8b957
This commit is contained in:
David Roberts 2016-12-08 18:39:03 +00:00 committed by GitHub
parent 256ab7f3e2
commit 65f03a8888
29 changed files with 317 additions and 357 deletions

View File

@ -33,8 +33,6 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult
throw new ElasticsearchParseException("failed to parser influencer", e);
}
Influencer influencer = Influencer.PARSER.apply(parser, () -> parseFieldMatcher);
influencer.setId(hit.getId());
return influencer;
return Influencer.PARSER.apply(parser, () -> parseFieldMatcher);
}
}

View File

@ -120,10 +120,10 @@ public class ElasticsearchMappings {
.endObject()
.startObject(PROPERTIES)
.startObject(Result.RESULT_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE)
@ -156,7 +156,7 @@ public class ElasticsearchMappings {
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
@ -174,7 +174,7 @@ public class ElasticsearchMappings {
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
@ -182,10 +182,9 @@ public class ElasticsearchMappings {
.endObject()
.endObject()
.startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
// per-partition max probabilities mapping
.startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName())
.field(TYPE, NESTED)
@ -248,6 +247,9 @@ public class ElasticsearchMappings {
builder.startObject(AnomalyRecord.DETECTOR_INDEX.getPreferredName())
.field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.SEQUENCE_NUM.getPreferredName())
.field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(AnomalyRecord.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()

View File

@ -491,10 +491,8 @@ public class JobProvider {
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse PerPartitionMaxProbabilities", e);
}
PerPartitionMaxProbabilities perPartitionMaxProbabilities =
PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher);
perPartitionMaxProbabilities.setId(hit.getId());
results.add(perPartitionMaxProbabilities);
results.add(PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher));
}
return results;
@ -739,11 +737,8 @@ public class JobProvider {
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse records", e);
}
AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher);
// set the ID
record.setId(hit.getId());
results.add(record);
results.add(AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher));
}
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD);
@ -808,10 +803,8 @@ public class JobProvider {
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse influencer", e);
}
Influencer influencer = Influencer.PARSER.apply(parser, () -> parseFieldMatcher);
influencer.setId(hit.getId());
influencers.add(influencer);
influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher));
}
return new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD);

View File

@ -51,7 +51,7 @@ public class JobRenormaliser extends AbstractComponent {
* @param records The updated records
*/
public void updateRecords(String jobId, List<AnomalyRecord> records) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records, false).executeRequest();
jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records).executeRequest();
}
/**
@ -64,8 +64,7 @@ public class JobRenormaliser extends AbstractComponent {
*/
public void updatePerPartitionMaxProbabilities(String jobId, String documentId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records);
ppMaxProbs.setId(documentId);
jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs, false).executeRequest();
jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs).executeRequest();
}
/**
@ -76,7 +75,7 @@ public class JobRenormaliser extends AbstractComponent {
* @param influencers The updated influencers
*/
public void updateInfluencer(String jobId, List<Influencer> influencers) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers, false).executeRequest();
jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers).executeRequest();
}
}

View File

@ -91,8 +91,7 @@ public class JobResultsPersister extends AbstractComponent {
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content));
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e));
}
@ -100,13 +99,13 @@ public class JobResultsPersister extends AbstractComponent {
return this;
}
private void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
private void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers)
throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim);
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer);
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content));
@ -118,27 +117,17 @@ public class JobResultsPersister extends AbstractComponent {
* Persist a list of anomaly records
*
* @param records the records to persist
* @param autoGenerateId If true then persist the influencer with an auto generated ID
* else use {@link AnomalyRecord#getId()}
* @return this
*/
public Builder persistRecords(List<AnomalyRecord> records, boolean autoGenerateId) {
public Builder persistRecords(List<AnomalyRecord> records) {
try {
for (AnomalyRecord record : records) {
XContentBuilder content = toXContentBuilder(record);
if (autoGenerateId) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
else {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content));
}
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e));
@ -152,25 +141,16 @@ public class JobResultsPersister extends AbstractComponent {
* an auto generated ID
*
* @param influencers the influencers to persist
* @param autoGenerateId If true then persist the influencer with an auto generated ID
* else use {@link Influencer#getId()}
* @return this
*/
public Builder persistInfluencers(List<Influencer> influencers, boolean autoGenerateId) {
public Builder persistInfluencers(List<Influencer> influencers) {
try {
for (Influencer influencer : influencers) {
XContentBuilder content = toXContentBuilder(influencer);
if (autoGenerateId) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, Influencer.RESULT_TYPE_VALUE, indexName);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
else {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content));
}
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e));
@ -183,26 +163,16 @@ public class JobResultsPersister extends AbstractComponent {
* Persist {@link PerPartitionMaxProbabilities}
*
* @param partitionProbabilities The probabilities to persist
* @param autoGenerateId If true then persist the PerPartitionMaxProbabilities with an auto generated ID
* else use {@link PerPartitionMaxProbabilities#getId()}
* @return this
*/
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities, boolean autoGenerateId) {
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
try {
XContentBuilder builder = toXContentBuilder(partitionProbabilities);
if (autoGenerateId) {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(builder));
}
else {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId())
.setSource(builder));
}
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId())
.setSource(builder));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
new Object[]{jobId}, e));
@ -223,7 +193,6 @@ public class JobResultsPersister extends AbstractComponent {
}
}
/**
* Persist the category definition
*
@ -335,13 +304,9 @@ public class JobResultsPersister extends AbstractComponent {
return builder;
}
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer,
Date bucketTime, boolean isInterim) throws IOException {
BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer);
influencer.setIsInterim(isInterim);
influencer.setTimestamp(bucketTime);
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer) throws IOException {
XContentBuilder builder = jsonBuilder();
influencer.toXContent(builder, ToXContent.EMPTY_PARAMS);
bucketInfluencer.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}

View File

@ -88,7 +88,7 @@ public class AutoDetectResultProcessor {
LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount);
LOGGER.info("[{}] Parse results Complete", jobId);
} catch (Exception e) {
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}, e));
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e);
} finally {
completionLatch.countDown();
flushListener.clear();
@ -116,14 +116,14 @@ public class AutoDetectResultProcessor {
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
context.bulkResultsPersister.persistRecords(records, true);
context.bulkResultsPersister.persistRecords(records);
if (context.isPerPartitionNormalization) {
context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records), true);
context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records));
}
}
List<Influencer> influencers = result.getInfluencers();
if (influencers != null && !influencers.isEmpty()) {
context.bulkResultsPersister.persistInfluencers(influencers, true);
context.bulkResultsPersister.persistInfluencers(influencers);
}
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
if (categoryDefinition != null) {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
import java.io.IOException;
@ -39,6 +40,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
* Result fields (all detector types)
*/
public static final ParseField DETECTOR_INDEX = new ParseField("detector_index");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField BY_FIELD_NAME = new ParseField("by_field_name");
public static final ParseField BY_FIELD_VALUE = new ParseField("by_field_value");
@ -77,19 +79,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public static final ParseField INITIAL_NORMALIZED_PROBABILITY = new ParseField("initial_normalized_probability");
public static final ConstructingObjectParser<AnomalyRecord, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new AnomalyRecord((String) a[0]));
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new AnomalyRecord((String) a[0], (Date) a[1], (long) a[2], (int) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(AnomalyRecord::setProbability, PROBABILITY);
PARSER.declareDouble(AnomalyRecord::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(AnomalyRecord::setNormalizedProbability, NORMALIZED_PROBABILITY);
PARSER.declareDouble(AnomalyRecord::setInitialNormalizedProbability, INITIAL_NORMALIZED_PROBABILITY);
PARSER.declareLong(AnomalyRecord::setBucketSpan, BUCKET_SPAN);
PARSER.declareInt(AnomalyRecord::setDetectorIndex, DETECTOR_INDEX);
PARSER.declareBoolean(AnomalyRecord::setInterim, IS_INTERIM);
PARSER.declareField(AnomalyRecord::setTimestamp, p -> {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
@ -97,6 +91,15 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(AnomalyRecord::setProbability, PROBABILITY);
PARSER.declareDouble(AnomalyRecord::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(AnomalyRecord::setNormalizedProbability, NORMALIZED_PROBABILITY);
PARSER.declareDouble(AnomalyRecord::setInitialNormalizedProbability, INITIAL_NORMALIZED_PROBABILITY);
PARSER.declareInt(AnomalyRecord::setDetectorIndex, DETECTOR_INDEX);
PARSER.declareBoolean(AnomalyRecord::setInterim, IS_INTERIM);
PARSER.declareString(AnomalyRecord::setByFieldName, BY_FIELD_NAME);
PARSER.declareString(AnomalyRecord::setByFieldValue, BY_FIELD_VALUE);
PARSER.declareString(AnomalyRecord::setCorrelatedByFieldValue, CORRELATED_BY_FIELD_VALUE);
@ -114,7 +117,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
}
private final String jobId;
private String id;
private final int sequenceNum;
private int detectorIndex;
private double probability;
private String byFieldName;
@ -139,21 +142,24 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
private double initialNormalizedProbability;
private Date timestamp;
private long bucketSpan;
private final Date timestamp;
private final long bucketSpan;
private List<Influence> influencers;
private boolean hadBigNormalisedUpdate;
public AnomalyRecord(String jobId) {
public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
@SuppressWarnings("unchecked")
public AnomalyRecord(StreamInput in) throws IOException {
jobId = in.readString();
id = in.readOptionalString();
sequenceNum = in.readInt();
detectorIndex = in.readInt();
probability = in.readDouble();
byFieldName = in.readOptionalString();
@ -179,21 +185,18 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
anomalyScore = in.readDouble();
normalizedProbability = in.readDouble();
initialNormalizedProbability = in.readDouble();
if (in.readBoolean()) {
timestamp = new Date(in.readLong());
}
timestamp = new Date(in.readLong());
bucketSpan = in.readLong();
if (in.readBoolean()) {
influencers = in.readList(Influence::new);
}
hadBigNormalisedUpdate = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeOptionalString(id);
out.writeInt(sequenceNum);
out.writeInt(detectorIndex);
out.writeDouble(probability);
out.writeOptionalString(byFieldName);
@ -225,11 +228,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
out.writeDouble(anomalyScore);
out.writeDouble(normalizedProbability);
out.writeDouble(initialNormalizedProbability);
boolean hasTimestamp = timestamp != null;
out.writeBoolean(hasTimestamp);
if (hasTimestamp) {
out.writeLong(timestamp.getTime());
}
out.writeLong(timestamp.getTime());
out.writeLong(bucketSpan);
boolean hasInfluencers = influencers != null;
out.writeBoolean(hasInfluencers);
@ -250,10 +249,9 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
builder.field(INITIAL_NORMALIZED_PROBABILITY.getPreferredName(), initialNormalizedProbability);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex);
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(IS_INTERIM.getPreferredName(), isInterim);
if (timestamp != null) {
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
}
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
if (byFieldName != null) {
builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName);
}
@ -305,15 +303,10 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
}
/**
* Data store ID of this record. May be null for records that have not been
* read from the data store.
* Data store ID of this record.
*/
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum;
}
public int getDetectorIndex() {
@ -352,10 +345,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
/**
* Bucketspan expressed in seconds
*/
@ -363,13 +352,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
return bucketSpan;
}
/**
* Bucketspan expressed in seconds
*/
public void setBucketSpan(long bucketSpan) {
this.bucketSpan = bucketSpan;
}
public double getProbability() {
return probability;
}
@ -378,7 +360,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
probability = value;
}
public String getByFieldName() {
return byFieldName;
}
@ -509,14 +490,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
// ID is NOT included in the hash, so that a record from the data store
// will hash the same as a record representing the same anomaly that did
// not come from the data store
// hadBigNormalisedUpdate is also deliberately excluded from the hash
// hadBigNormalisedUpdate is deliberately excluded from the hash
return Objects.hash(detectorIndex, probability, anomalyScore, initialNormalizedProbability,
normalizedProbability, typical, actual,
return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore,
normalizedProbability, initialNormalizedProbability, typical, actual,
function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue,
partitionFieldName, partitionFieldValue, overFieldName, overFieldValue,
timestamp, isInterim, causes, influencers, jobId);
@ -535,13 +513,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
AnomalyRecord that = (AnomalyRecord) other;
// ID is NOT compared, so that a record from the data store will compare
// equal to a record representing the same anomaly that did not come
// from the data store
// hadBigNormalisedUpdate is also deliberately excluded from the test
// hadBigNormalisedUpdate is deliberately excluded from the test
return Objects.equals(this.jobId, that.jobId)
&& this.detectorIndex == that.detectorIndex
&& this.sequenceNum == that.sequenceNum
&& this.bucketSpan == that.bucketSpan
&& this.probability == that.probability
&& this.anomalyScore == that.anomalyScore
&& this.normalizedProbability == that.normalizedProbability

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
import java.io.IOException;
@ -39,20 +40,16 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField IS_INTERIM = new ParseField("is_interim");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
public static final ConstructingObjectParser<BucketInfluencer, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0]));
new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0],
(Date) a[1], (long) a[2], (int) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareString(BucketInfluencer::setInfluencerFieldName, INFLUENCER_FIELD_NAME);
PARSER.declareDouble(BucketInfluencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setRawAnomalyScore, RAW_ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setProbability, PROBABILITY);
PARSER.declareBoolean(BucketInfluencer::setIsInterim, IS_INTERIM);
PARSER.declareField(BucketInfluencer::setTimestamp, p -> {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
@ -60,6 +57,15 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareString(BucketInfluencer::setInfluencerFieldName, INFLUENCER_FIELD_NAME);
PARSER.declareDouble(BucketInfluencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setRawAnomalyScore, RAW_ANOMALY_SCORE);
PARSER.declareDouble(BucketInfluencer::setProbability, PROBABILITY);
PARSER.declareBoolean(BucketInfluencer::setIsInterim, IS_INTERIM);
}
private final String jobId;
@ -69,21 +75,15 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
private double rawAnomalyScore;
private double probability;
private boolean isInterim;
private Date timestamp;
private final Date timestamp;
private final long bucketSpan;
private final int sequenceNum;
public BucketInfluencer(String jobId) {
public BucketInfluencer(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
}
public BucketInfluencer(BucketInfluencer prototype) {
jobId = prototype.jobId;
influenceField = prototype.influenceField;
initialAnomalyScore = prototype.initialAnomalyScore;
anomalyScore = prototype.anomalyScore;
rawAnomalyScore = prototype.rawAnomalyScore;
probability = prototype.probability;
isInterim = prototype.isInterim;
timestamp = prototype.timestamp;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
public BucketInfluencer(StreamInput in) throws IOException {
@ -94,9 +94,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
rawAnomalyScore = in.readDouble();
probability = in.readDouble();
isInterim = in.readBoolean();
if (in.readBoolean()) {
timestamp = new Date(in.readLong());
}
timestamp = new Date(in.readLong());
bucketSpan = in.readLong();
sequenceNum = in.readInt();
}
@Override
@ -108,11 +108,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
out.writeDouble(rawAnomalyScore);
out.writeDouble(probability);
out.writeBoolean(isInterim);
boolean hasTimestamp = timestamp != null;
out.writeBoolean(hasTimestamp);
if (hasTimestamp) {
out.writeLong(timestamp.getTime());
}
out.writeLong(timestamp.getTime());
out.writeLong(bucketSpan);
out.writeInt(sequenceNum);
}
@Override
@ -127,14 +125,20 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(RAW_ANOMALY_SCORE.getPreferredName(), rawAnomalyScore);
builder.field(PROBABILITY.getPreferredName(), probability);
if (timestamp != null) {
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
}
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(IS_INTERIM.getPreferredName(), isInterim);
builder.endObject();
return builder;
}
/**
* Data store ID of this bucket influencer.
*/
public String getId() {
return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum;
}
public String getJobId() {
return jobId;
@ -188,17 +192,14 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
return isInterim;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public Date getTimestamp() {
return timestamp;
}
@Override
public int hashCode() {
return Objects.hash(influenceField, initialAnomalyScore, anomalyScore, rawAnomalyScore, probability, isInterim, timestamp, jobId);
return Objects.hash(influenceField, initialAnomalyScore, anomalyScore, rawAnomalyScore, probability, isInterim, timestamp, jobId,
bucketSpan, sequenceNum);
}
@Override
@ -220,6 +221,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
return Objects.equals(influenceField, other.influenceField) && Double.compare(initialAnomalyScore, other.initialAnomalyScore) == 0
&& Double.compare(anomalyScore, other.anomalyScore) == 0 && Double.compare(rawAnomalyScore, other.rawAnomalyScore) == 0
&& Double.compare(probability, other.probability) == 0 && Objects.equals(isInterim, other.isInterim)
&& Objects.equals(timestamp, other.timestamp) && Objects.equals(jobId, other.jobId);
&& Objects.equals(timestamp, other.timestamp) && Objects.equals(jobId, other.jobId) && bucketSpan == other.bucketSpan
&& sequenceNum == other.sequenceNum;
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -33,7 +34,9 @@ public class Influencer extends ToXContentToBytes implements Writeable {
* Field names
*/
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name");
public static final ParseField INFLUENCER_FIELD_VALUE = new ParseField("influencer_field_value");
public static final ParseField INITIAL_ANOMALY_SCORE = new ParseField("initial_anomaly_score");
@ -43,18 +46,14 @@ public class Influencer extends ToXContentToBytes implements Writeable {
public static final ParseField RESULTS_FIELD = new ParseField("influencers");
public static final ConstructingObjectParser<Influencer, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
RESULT_TYPE_FIELD.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2]));
RESULT_TYPE_FIELD.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2],
(Date) a[3], (long) a[4], (int) a[5]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_NAME);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_VALUE);
PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(Influencer::setProbability, PROBABILITY);
PARSER.declareDouble(Influencer::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(Influencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
PARSER.declareBoolean(Influencer::setInterim, Bucket.IS_INTERIM);
PARSER.declareField(Influencer::setTimestamp, p -> {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
@ -62,11 +61,19 @@ public class Influencer extends ToXContentToBytes implements Writeable {
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE);
PARSER.declareDouble(Influencer::setProbability, PROBABILITY);
PARSER.declareDouble(Influencer::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(Influencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
PARSER.declareBoolean(Influencer::setInterim, Bucket.IS_INTERIM);
}
private String jobId;
private String id;
private Date timestamp;
private final String jobId;
private final Date timestamp;
private final long bucketSpan;
private final int sequenceNum;
private String influenceField;
private String influenceValue;
private double probability;
@ -75,18 +82,18 @@ public class Influencer extends ToXContentToBytes implements Writeable {
private boolean hadBigNormalisedUpdate;
private boolean isInterim;
public Influencer(String jobId, String fieldName, String fieldValue) {
public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
influenceField = fieldName;
influenceValue = fieldValue;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
public Influencer(StreamInput in) throws IOException {
jobId = in.readString();
id = in.readOptionalString();
if (in.readBoolean()) {
timestamp = new Date(in.readLong());
}
timestamp = new Date(in.readLong());
influenceField = in.readString();
influenceValue = in.readString();
probability = in.readDouble();
@ -94,17 +101,14 @@ public class Influencer extends ToXContentToBytes implements Writeable {
anomalyScore = in.readDouble();
hadBigNormalisedUpdate = in.readBoolean();
isInterim = in.readBoolean();
bucketSpan = in.readLong();
sequenceNum = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeOptionalString(id);
boolean hasTimestamp = timestamp != null;
out.writeBoolean(hasTimestamp);
if (hasTimestamp) {
out.writeLong(timestamp.getTime());
}
out.writeLong(timestamp.getTime());
out.writeString(influenceField);
out.writeString(influenceValue);
out.writeDouble(probability);
@ -112,6 +116,8 @@ public class Influencer extends ToXContentToBytes implements Writeable {
out.writeDouble(anomalyScore);
out.writeBoolean(hadBigNormalisedUpdate);
out.writeBoolean(isInterim);
out.writeLong(bucketSpan);
out.writeInt(sequenceNum);
}
@Override
@ -124,10 +130,10 @@ public class Influencer extends ToXContentToBytes implements Writeable {
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore);
builder.field(PROBABILITY.getPreferredName(), probability);
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(Bucket.IS_INTERIM.getPreferredName(), isInterim);
if (timestamp != null) {
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
}
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
builder.endObject();
return builder;
}
@ -136,16 +142,8 @@ public class Influencer extends ToXContentToBytes implements Writeable {
return jobId;
}
/**
* Data store ID of this record. May be null for records that have not been
* read from the data store.
*/
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum;
}
public double getProbability() {
@ -160,10 +158,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
return timestamp;
}
public void setTimestamp(Date date) {
timestamp = date;
}
public String getInfluencerFieldName() {
return influenceField;
}
@ -210,13 +204,11 @@ public class Influencer extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
// ID is NOT included in the hash, so that a record from the data store
// will hash the same as a record representing the same anomaly that did
// not come from the data store
// hadBigNormalisedUpdate is also deliberately excluded from the hash
// hadBigNormalisedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim);
return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim,
bucketSpan, sequenceNum);
}
@Override
@ -235,16 +227,12 @@ public class Influencer extends ToXContentToBytes implements Writeable {
Influencer other = (Influencer) obj;
// ID is NOT compared, so that a record from the data store will compare
// equal to a record representing the same anomaly that did not come
// from the data store
// hadBigNormalisedUpdate is also deliberately excluded from the test
// hadBigNormalisedUpdate is deliberately excluded from the test
return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp)
&& Objects.equals(influenceField, other.influenceField)
&& Objects.equals(influenceValue, other.influenceValue)
&& Double.compare(initialAnomalyScore, other.initialAnomalyScore) == 0
&& Double.compare(anomalyScore, other.anomalyScore) == 0 && Double.compare(probability, other.probability) == 0
&& (isInterim == other.isInterim);
&& (isInterim == other.isInterim) && (bucketSpan == other.bucketSpan) && (sequenceNum == other.sequenceNum);
}
}

View File

@ -48,7 +48,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<PerPartitionMaxProbabilities, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a ->
new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (List<PartitionProbability>) a[2]));
new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (long) a[2], (List<PartitionProbability>) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
@ -61,18 +61,21 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + Bucket.TIMESTAMP.getPreferredName() + "]");
}, Bucket.TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES);
PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE);
}
private final String jobId;
private final Date timestamp;
private final long bucketSpan;
private final List<PartitionProbability> perPartitionMaxProbabilities;
private String id;
public PerPartitionMaxProbabilities(String jobId, Date timestamp, List<PartitionProbability> partitionProbabilities) {
public PerPartitionMaxProbabilities(String jobId, Date timestamp, long bucketSpan,
List<PartitionProbability> partitionProbabilities) {
this.jobId = jobId;
this.timestamp = timestamp;
this.bucketSpan = bucketSpan;
this.perPartitionMaxProbabilities = partitionProbabilities;
}
@ -82,22 +85,23 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
this.jobId = records.get(0).getJobId();
this.timestamp = records.get(0).getTimestamp();
this.bucketSpan = records.get(0).getBucketSpan();
this.perPartitionMaxProbabilities = calcMaxNormalizedProbabilityPerPartition(records);
}
public PerPartitionMaxProbabilities(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = new Date(in.readLong());
bucketSpan = in.readLong();
perPartitionMaxProbabilities = in.readList(PartitionProbability::new);
id = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(timestamp.getTime());
out.writeLong(bucketSpan);
out.writeList(perPartitionMaxProbabilities);
out.writeOptionalString(id);
}
public String getJobId() {
@ -105,11 +109,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + RESULT_TYPE_VALUE;
}
public Date getTimestamp() {
@ -170,6 +170,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Bucket.TIMESTAMP.getPreferredName(), timestamp.getTime());
builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.endObject();
@ -178,7 +179,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, id);
return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, bucketSpan);
}
@Override
@ -195,7 +196,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.timestamp, that.timestamp)
&& Objects.equals(this.id, that.id)
&& this.bucketSpan == that.bucketSpan
&& Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities);
}

View File

@ -79,6 +79,7 @@ public final class ReservedFieldNames {
AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(),
AnomalyRecord.INITIAL_NORMALIZED_PROBABILITY.getPreferredName(),
AnomalyRecord.BUCKET_SPAN.getPreferredName(),
AnomalyRecord.SEQUENCE_NUM.getPreferredName(),
Bucket.ANOMALY_SCORE.getPreferredName(),
Bucket.BUCKET_INFLUENCERS.getPreferredName(),
@ -119,6 +120,8 @@ public final class ReservedFieldNames {
Influencer.INFLUENCER_FIELD_VALUE.getPreferredName(),
Influencer.INITIAL_ANOMALY_SCORE.getPreferredName(),
Influencer.ANOMALY_SCORE.getPreferredName(),
Influencer.BUCKET_SPAN.getPreferredName(),
Influencer.SEQUENCE_NUM.getPreferredName(),
ModelDebugOutput.PARTITION_FIELD_NAME.getPreferredName(), ModelDebugOutput.PARTITION_FIELD_VALUE.getPreferredName(),
ModelDebugOutput.OVER_FIELD_NAME.getPreferredName(), ModelDebugOutput.OVER_FIELD_VALUE.getPreferredName(),

View File

@ -24,6 +24,7 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
@Override
protected Response createTestInstance() {
int sequenceNum = 0;
int listSize = randomInt(10);
List<Bucket> hits = new ArrayList<>(listSize);
@ -37,7 +38,8 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
int size = randomInt(10);
List<BucketInfluencer> bucketInfluencers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BucketInfluencer bucketInfluencer = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer = new BucketInfluencer("foo", bucket.getTimestamp(), bucket.getBucketSpan(),
sequenceNum++);
bucketInfluencer.setAnomalyScore(randomDouble());
bucketInfluencer.setInfluencerFieldName(randomAsciiOfLengthBetween(1, 20));
bucketInfluencer.setInitialAnomalyScore(randomDouble());
@ -86,14 +88,12 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
int size = randomInt(10);
List<AnomalyRecord> records = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId);
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, new Date(randomLong()), randomPositiveLong(), sequenceNum++);
anomalyRecord.setAnomalyScore(randomDouble());
anomalyRecord.setActual(Collections.singletonList(randomDouble()));
anomalyRecord.setTypical(Collections.singletonList(randomDouble()));
anomalyRecord.setProbability(randomDouble());
anomalyRecord.setId(randomAsciiOfLengthBetween(1, 20));
anomalyRecord.setInterim(randomBoolean());
anomalyRecord.setTimestamp(new Date(randomLong()));
records.add(anomalyRecord);
}
bucket.setRecords(records);

View File

@ -22,13 +22,11 @@ public class GetInfluencersActionResponseTests extends AbstractStreamableTestCas
List<Influencer> hits = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
randomAsciiOfLengthBetween(1, 20));
randomAsciiOfLengthBetween(1, 20), new Date(randomPositiveLong()), randomPositiveLong(), j + 1);
influencer.setAnomalyScore(randomDouble());
influencer.setInitialAnomalyScore(randomDouble());
influencer.setProbability(randomDouble());
influencer.setId(randomAsciiOfLengthBetween(1, 20));
influencer.setInterim(randomBoolean());
influencer.setTimestamp(new Date(randomLong()));
hits.add(influencer);
}
QueryPage<Influencer> buckets = new QueryPage<>(hits, listSize, Influencer.RESULTS_FIELD);

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class GetRecordsActionResponseTests extends AbstractStreamableTestCase<GetRecordsAction.Response> {
@ -21,8 +22,7 @@ public class GetRecordsActionResponseTests extends AbstractStreamableTestCase<Ge
List<AnomalyRecord> hits = new ArrayList<>(listSize);
String jobId = randomAsciiOfLengthBetween(1, 20);
for (int j = 0; j < listSize; j++) {
AnomalyRecord record = new AnomalyRecord(jobId);
record.setId(randomAsciiOfLengthBetween(1, 20));
AnomalyRecord record = new AnomalyRecord(jobId, new Date(), 600, j + 1);
hits.add(record);
}
QueryPage<AnomalyRecord> snapshots = new QueryPage<>(hits, listSize, AnomalyRecord.RESULTS_FIELD);

View File

@ -226,9 +226,9 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
assertThat(e.getMessage(), containsString("No known job with id '1'"));
addRecordResult("1", "1234");
addRecordResult("1", "1235");
addRecordResult("1", "1236");
addRecordResult("1", "1234", 1, 1);
addRecordResult("1", "1235", 1, 2);
addRecordResult("1", "1236", 1, 3);
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "results/1/records", params);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
@ -259,7 +259,7 @@ public class PrelertJobIT extends ESRestTestCase {
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult));
}
private Response addRecordResult(String jobId, String timestamp) throws Exception {
private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception {
try {
client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING));
} catch (ResponseException e) {
@ -268,10 +268,12 @@ public class PrelertJobIT extends ESRestTestCase {
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
}
String bucketResult =
String.format(Locale.ROOT, "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"record\"}", jobId, timestamp);
String recordResult =
String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, timestamp, bucketSpan, sequenceNum);
return client().performRequest("put", "prelertresults-" + jobId + "/result/" + timestamp,
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult));
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult));
}
private static String responseEntityToString(Response response) throws Exception {

View File

@ -13,7 +13,6 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.ModelState;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.audit.AuditActivity;
import org.elasticsearch.xpack.prelert.job.audit.AuditMessage;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;

View File

@ -408,6 +408,7 @@ public class JobProviderTests extends ESTestCase {
recordMap1.put("timestamp", now.getTime());
recordMap1.put("function", "irritable");
recordMap1.put("bucket_span", 22);
recordMap1.put("sequence_num", 1);
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("typical", 1122.4);
@ -415,6 +416,7 @@ public class JobProviderTests extends ESTestCase {
recordMap2.put("timestamp", now.getTime());
recordMap2.put("function", "irrascible");
recordMap2.put("bucket_span", 22);
recordMap2.put("sequence_num", 2);
source.add(recordMap1);
source.add(recordMap2);
@ -458,6 +460,7 @@ public class JobProviderTests extends ESTestCase {
recordMap1.put("timestamp", now.getTime());
recordMap1.put("function", "irritable");
recordMap1.put("bucket_span", 22);
recordMap1.put("sequence_num", 1);
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("typical", 1122.4);
@ -465,6 +468,7 @@ public class JobProviderTests extends ESTestCase {
recordMap2.put("timestamp", now.getTime());
recordMap2.put("function", "irrascible");
recordMap2.put("bucket_span", 22);
recordMap2.put("sequence_num", 2);
source.add(recordMap1);
source.add(recordMap2);
@ -515,6 +519,7 @@ public class JobProviderTests extends ESTestCase {
recordMap1.put("timestamp", now.getTime());
recordMap1.put("function", "irritable");
recordMap1.put("bucket_span", 22);
recordMap1.put("sequence_num", 1);
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("typical", 1122.4);
@ -522,6 +527,7 @@ public class JobProviderTests extends ESTestCase {
recordMap2.put("timestamp", now.getTime());
recordMap2.put("function", "irrascible");
recordMap2.put("bucket_span", 22);
recordMap2.put("sequence_num", 2);
source.add(recordMap1);
source.add(recordMap2);
@ -563,6 +569,7 @@ public class JobProviderTests extends ESTestCase {
recordMap.put("timestamp", now.getTime());
recordMap.put("function", "irritable");
recordMap.put("bucket_span", 22);
recordMap.put("sequence_num", i + 1);
source.add(recordMap);
}
@ -594,6 +601,7 @@ public class JobProviderTests extends ESTestCase {
recordMap.put("timestamp", now.getTime());
recordMap.put("function", "irritable");
recordMap.put("bucket_span", 22);
recordMap.put("sequence_num", i + 1);
source.add(recordMap);
}
@ -679,6 +687,8 @@ public class JobProviderTests extends ESTestCase {
recordMap1.put("influencer_field_value", "Bob");
recordMap1.put("initial_anomaly_score", 22.2);
recordMap1.put("anomaly_score", 22.6);
recordMap1.put("bucket_span", 123);
recordMap1.put("sequence_num", 1);
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("probability", 0.99);
@ -687,6 +697,8 @@ public class JobProviderTests extends ESTestCase {
recordMap2.put("influencer_field_value", "James");
recordMap2.put("initial_anomaly_score", 5.0);
recordMap2.put("anomaly_score", 5.0);
recordMap2.put("bucket_span", 123);
recordMap2.put("sequence_num", 2);
source.add(recordMap1);
source.add(recordMap2);
@ -740,6 +752,8 @@ public class JobProviderTests extends ESTestCase {
recordMap1.put("influencer_field_value", "Bob");
recordMap1.put("initial_anomaly_score", 22.2);
recordMap1.put("anomaly_score", 22.6);
recordMap1.put("bucket_span", 123);
recordMap1.put("sequence_num", 1);
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("probability", 0.99);
@ -748,6 +762,8 @@ public class JobProviderTests extends ESTestCase {
recordMap2.put("influencer_field_value", "James");
recordMap2.put("initial_anomaly_score", 5.0);
recordMap2.put("anomaly_score", 5.0);
recordMap2.put("bucket_span", 123);
recordMap2.put("sequence_num", 2);
source.add(recordMap1);
source.add(recordMap2);
@ -967,10 +983,9 @@ public class JobProviderTests extends ESTestCase {
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, Date timestamp, double normalizedProbability) {
AnomalyRecord record = new AnomalyRecord("foo");
AnomalyRecord record = new AnomalyRecord("foo", timestamp, 600, 42);
record.setPartitionFieldValue(partitionFieldValue);
record.setNormalizedProbability(normalizedProbability);
record.setTimestamp(timestamp);
return record;
}

View File

@ -50,7 +50,7 @@ public class JobResultsPersisterTests extends ESTestCase {
bucket.setProcessingTimeMs(8888);
bucket.setRecordCount(1);
BucketInfluencer bi = new BucketInfluencer(JOB_ID);
BucketInfluencer bi = new BucketInfluencer(JOB_ID, new Date(), 600, 1);
bi.setAnomalyScore(14.15);
bi.setInfluencerFieldName("biOne");
bi.setInitialAnomalyScore(18.12);
@ -59,7 +59,7 @@ public class JobResultsPersisterTests extends ESTestCase {
bucket.addBucketInfluencer(bi);
// We are adding a record but it shouldn't be persisted as part of the bucket
AnomalyRecord record = new AnomalyRecord(JOB_ID);
AnomalyRecord record = new AnomalyRecord(JOB_ID, new Date(), 600, 2);
record.setAnomalyScore(99.8);
bucket.setRecords(Arrays.asList(record));
@ -94,14 +94,13 @@ public class JobResultsPersisterTests extends ESTestCase {
Client client = clientBuilder.build();
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord r1 = new AnomalyRecord(JOB_ID);
AnomalyRecord r1 = new AnomalyRecord(JOB_ID, new Date(), 42, 1);
records.add(r1);
List<Double> actuals = new ArrayList<>();
actuals.add(5.0);
actuals.add(5.1);
r1.setActual(actuals);
r1.setAnomalyScore(99.8);
r1.setBucketSpan(42);
r1.setByFieldName("byName");
r1.setByFieldValue("byValue");
r1.setCorrelatedByFieldValue("testCorrelations");
@ -122,7 +121,7 @@ public class JobResultsPersisterTests extends ESTestCase {
r1.setTypical(typicals);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records, true).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest();
List<XContentBuilder> captured = captor.getAllValues();
assertEquals(1, captured.size());
@ -156,15 +155,14 @@ public class JobResultsPersisterTests extends ESTestCase {
Client client = clientBuilder.build();
List<Influencer> influencers = new ArrayList<>();
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1");
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1);
inf.setAnomalyScore(16);
inf.setId("infID");
inf.setInitialAnomalyScore(55.5);
inf.setProbability(0.4);
influencers.add(inf);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers, true).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest();
List<XContentBuilder> captured = captor.getAllValues();
assertEquals(1, captured.size());

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.support.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.Date;
public class QueryPageTests extends AbstractWireSerializingTestCase<QueryPage<Influencer>> {
@ -19,7 +20,7 @@ public class QueryPageTests extends AbstractWireSerializingTestCase<QueryPage<In
ArrayList<Influencer> hits = new ArrayList<>();
for (int i = 0; i < hitCount; i++) {
hits.add(new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
randomAsciiOfLengthBetween(1, 20)));
randomAsciiOfLengthBetween(1, 20), new Date(), randomPositiveLong(), i + 1));
}
return new QueryPage<>(hits, hitCount, new ParseField("test"));
}

View File

@ -22,12 +22,12 @@ import org.mockito.InOrder;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -114,13 +114,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo");
AnomalyRecord record2 = new AnomalyRecord("foo");
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123, 1);
AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123, 2);
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistRecords(records, true);
verify(bulkBuilder, times(1)).persistRecords(records);
verifyNoMoreInteractions(persister);
}
@ -134,16 +134,16 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo");
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123, 1);
record1.setPartitionFieldValue("pValue");
AnomalyRecord record2 = new AnomalyRecord("foo");
AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123, 2);
record2.setPartitionFieldValue("pValue");
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class), eq(true));
verify(bulkBuilder, times(1)).persistRecords(records, true);
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(bulkBuilder, times(1)).persistRecords(records);
verifyNoMoreInteractions(persister);
}
@ -157,13 +157,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue");
Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2");
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123, 1);
Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123, 1);
List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
when(result.getInfluencers()).thenReturn(influencers);
processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistInfluencers(influencers, true);
verify(bulkBuilder, times(1)).persistInfluencers(influencers);
verifyNoMoreInteractions(persister);
}

View File

@ -32,33 +32,38 @@ public class AutodetectResultsParserTests extends ESTestCase {
public static final String METRIC_OUTPUT_SAMPLE = "[{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359450000000,"
+ "\"bucket_span\":22, \"records\":[],"
+ "\"max_normalized_probability\":0, \"anomaly_score\":0,\"record_count\":0,\"event_count\":806,\"bucket_influencers\":["
+ "{\"job_id\":\"foo\",\"anomaly_score\":0, \"probability\":0.0, \"influencer_field_name\":\"bucket_time\","
+ "{\"sequence_num\":1,\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0,"
+ "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\","
+ "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.1, normaliser 2" +
".1]\"}}"
+ ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,"
+ "\"records\":[{\"job_id\":\"foo\",\"probability\":0.0637541,"
+ ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":"
+ "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":1,\"job_id\":\"foo\",\"probability\":0.0637541,"
+ "\"by_field_name\":\"airline\",\"by_field_value\":\"JZA\", \"typical\":[1020.08],\"actual\":[1042.14],"
+ "\"field_name\":\"responsetime\",\"function\":\"max\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"},"
+ "{\"job_id\":\"foo\",\"probability\":0.00748292,\"by_field_name\":\"airline\",\"by_field_value\":\"AMX\", "
+ "\"typical\":[20.2137],\"actual\":[22.8855],\"field_name\":\"responsetime\",\"function\":\"max\"," +
"\"partition_field_name\":\"\","
+ " \"partition_field_value\":\"\"},{\"job_id\":\"foo\",\"probability\":0.023494,\"by_field_name\":\"airline\","
+ "{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":2,\"job_id\":\"foo\",\"probability\":0.00748292,"
+ "\"by_field_name\":\"airline\",\"by_field_value\":\"AMX\", "
+ "\"typical\":[20.2137],\"actual\":[22.8855],\"field_name\":\"responsetime\",\"function\":\"max\","
+ "\"partition_field_name\":\"\",\"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22,"
+ "\"sequence_num\":3,\"job_id\":\"foo\",\"probability\":0.023494,\"by_field_name\":\"airline\","
+ "\"by_field_value\":\"DAL\", \"typical\":[382.177],\"actual\":[358.934],\"field_name\":\"responsetime\",\"function\":\"min\","
+ "\"partition_field_name\":\"\", \"partition_field_value\":\"\"},{\"job_id\":\"foo\",\"probability\":0.0473552,"
+ "\"by_field_name\":\"airline\",\"by_field_value\":\"SWA\", \"typical\":[152.148],\"actual\":[96.6425],"
+ "\"field_name\":\"responsetime\",\"function\":\"min\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"}],"
+ "\"partition_field_name\":\"\", \"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22,"
+ "\"sequence_num\":4,\"job_id\":\"foo\","
+ "\"probability\":0.0473552,\"by_field_name\":\"airline\",\"by_field_value\":\"SWA\", \"typical\":[152.148],"
+ "\"actual\":[96.6425],\"field_name\":\"responsetime\",\"function\":\"min\",\"partition_field_name\":\"\","
+ "\"partition_field_value\":\"\"}],"
+ "\"initial_anomaly_score\":0.0140005, \"anomaly_score\":20.22688, \"max_normalized_probability\":10.5688, \"record_count\":4,"
+ "\"event_count\":820,\"bucket_influencers\":[{\"job_id\":\"foo\", \"raw_anomaly_score\":"
+ "0.0140005, \"probability\":0.01,\"influencer_field_name\":\"bucket_time\",\"initial_anomaly_score\":20.22688"
+ ",\"anomaly_score\":20.22688} ,{\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03,"
+ "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": " +
"{\"job_id\":\"foo\","
+ "\"event_count\":820,\"bucket_influencers\":[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":5,"
+ "\"job_id\":\"foo\", \"raw_anomaly_score\":0.0140005, \"probability\":0.01,\"influencer_field_name\":\"bucket_time\","
+ "\"initial_anomaly_score\":20.22688,\"anomaly_score\":20.22688} ,{\"timestamp\":1359453600000,\"bucket_span\":22,"
+ "\"sequence_num\":6,\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03,"
+ "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": "
+ "{\"job_id\":\"foo\","
+ "\"quantile_state\":\"[normaliser 1.2, normaliser 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
+ "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.3, normaliser 2.3]\"}} ]";
public static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08,"
+ "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\"," +
"\"function\":\"max\","
+ "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\","
+ "\"sequence_num\":1,\"function\":\"max\","
+ "\"causes\":[{\"probability\":1.38951e-08,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\","
+ "\"over_field_value\":\"mail.google.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[9.19027e+07]}],"
+ "\"normalized_probability\":100,\"anomaly_score\":44.7324},{\"probability\":3.86587e-07,\"field_name\":\"sum_cs_bytes_\","

View File

@ -46,7 +46,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
int size = randomInt(10);
records = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
AnomalyRecord record = new AnomalyRecord(jobId);
AnomalyRecord record = new AnomalyRecord(jobId, new Date(randomLong()), randomPositiveLong(), i + 1);
record.setProbability(randomDoubleBetween(0.0, 1.0, true));
records.add(record);
}
@ -56,7 +56,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
int size = randomInt(10);
influencers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10));
Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10),
new Date(randomLong()), randomPositiveLong(), i + 1);
influencer.setProbability(randomDoubleBetween(0.0, 1.0, true));
influencers.add(influencer);
}

View File

@ -16,7 +16,8 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
@Override
protected BucketInfluencer createTestInstance() {
BucketInfluencer bucketInfluencer = new BucketInfluencer(randomAsciiOfLengthBetween(1, 20));
BucketInfluencer bucketInfluencer = new BucketInfluencer(randomAsciiOfLengthBetween(1, 20), new Date(randomPositiveLong()),
randomPositiveLong(), randomIntBetween(1, 1000));
if (randomBoolean()) {
bucketInfluencer.setAnomalyScore(randomDouble());
}
@ -35,9 +36,6 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
if (randomBoolean()) {
bucketInfluencer.setIsInterim(randomBoolean());
}
if (randomBoolean()) {
bucketInfluencer.setTimestamp(new Date(randomLong()));
}
return bucketInfluencer;
}
@ -52,22 +50,22 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenNull() {
assertFalse(new BucketInfluencer(randomAsciiOfLengthBetween(1, 20)).equals(null));
assertFalse(new BucketInfluencer(randomAsciiOfLengthBetween(1, 20), new Date(), 600, 1).equals(null));
}
public void testEquals_GivenDifferentClass() {
assertFalse(new BucketInfluencer(randomAsciiOfLengthBetween(1, 20)).equals("a string"));
assertFalse(new BucketInfluencer(randomAsciiOfLengthBetween(1, 20), new Date(), 600, 1).equals("a string"));
}
public void testEquals_GivenEqualInfluencers() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setAnomalyScore(42.0);
bucketInfluencer1.setInfluencerFieldName("foo");
bucketInfluencer1.setInitialAnomalyScore(67.3);
bucketInfluencer1.setProbability(0.0003);
bucketInfluencer1.setRawAnomalyScore(3.14);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setAnomalyScore(42.0);
bucketInfluencer2.setInfluencerFieldName("foo");
bucketInfluencer2.setInitialAnomalyScore(67.3);
@ -80,10 +78,10 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenDifferentAnomalyScore() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setAnomalyScore(42.0);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setAnomalyScore(42.1);
assertFalse(bucketInfluencer1.equals(bucketInfluencer2));
@ -91,10 +89,10 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenDifferentFieldName() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setInfluencerFieldName("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setInfluencerFieldName("bar");
assertFalse(bucketInfluencer1.equals(bucketInfluencer2));
@ -102,10 +100,10 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenDifferentInitialAnomalyScore() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setInitialAnomalyScore(42.0);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setInitialAnomalyScore(42.1);
assertFalse(bucketInfluencer1.equals(bucketInfluencer2));
@ -113,10 +111,10 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenRawAnomalyScore() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setRawAnomalyScore(42.0);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setRawAnomalyScore(42.1);
assertFalse(bucketInfluencer1.equals(bucketInfluencer2));
@ -124,10 +122,10 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase<BucketInf
}
public void testEquals_GivenDifferentProbability() {
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer1.setProbability(0.001);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo");
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", new Date(123), 600, 1);
bucketInfluencer2.setProbability(0.002);
assertFalse(bucketInfluencer1.equals(bucketInfluencer2));

View File

@ -22,6 +22,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
@Override
protected Bucket createTestInstance() {
int sequenceNum = 1;
String jobId = "foo";
Bucket bucket = new Bucket(jobId, new Date(randomLong()), randomPositiveLong());
@ -32,7 +33,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
int size = randomInt(10);
List<BucketInfluencer> bucketInfluencers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BucketInfluencer bucketInfluencer = new BucketInfluencer(jobId);
BucketInfluencer bucketInfluencer = new BucketInfluencer(jobId, new Date(), 600, i + 1);
bucketInfluencer.setAnomalyScore(randomDouble());
bucketInfluencer.setInfluencerFieldName(randomAsciiOfLengthBetween(1, 20));
bucketInfluencer.setInitialAnomalyScore(randomDouble());
@ -81,14 +82,12 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
int size = randomInt(10);
List<AnomalyRecord> records = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId);
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, bucket.getTimestamp(), bucket.getBucketSpan(), sequenceNum++);
anomalyRecord.setAnomalyScore(randomDouble());
anomalyRecord.setActual(Collections.singletonList(randomDouble()));
anomalyRecord.setTypical(Collections.singletonList(randomDouble()));
anomalyRecord.setProbability(randomDouble());
anomalyRecord.setId(randomAsciiOfLengthBetween(1, 20));
anomalyRecord.setInterim(randomBoolean());
anomalyRecord.setTimestamp(new Date(randomLong()));
records.add(anomalyRecord);
}
bucket.setRecords(records);
@ -167,7 +166,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testEquals_GivenOneHasRecordsAndTheOtherDoesNot() {
Bucket bucket1 = new Bucket("foo", new Date(123), 123);
bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo")));
bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1)));
Bucket bucket2 = new Bucket("foo", new Date(123), 123);
bucket2.setRecords(null);
@ -177,18 +176,19 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testEquals_GivenDifferentNumberOfRecords() {
Bucket bucket1 = new Bucket("foo", new Date(123), 123);
bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo")));
bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1)));
Bucket bucket2 = new Bucket("foo", new Date(123), 123);
bucket2.setRecords(Arrays.asList(new AnomalyRecord("foo"), new AnomalyRecord("foo")));
bucket2.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1),
new AnomalyRecord("foo", new Date(123), 123, 2)));
assertFalse(bucket1.equals(bucket2));
assertFalse(bucket2.equals(bucket1));
}
public void testEquals_GivenSameNumberOfRecordsButDifferent() {
AnomalyRecord anomalyRecord1 = new AnomalyRecord("foo");
AnomalyRecord anomalyRecord1 = new AnomalyRecord("foo", new Date(123), 123, 1);
anomalyRecord1.setAnomalyScore(1.0);
AnomalyRecord anomalyRecord2 = new AnomalyRecord("foo");
AnomalyRecord anomalyRecord2 = new AnomalyRecord("foo", new Date(123), 123, 2);
anomalyRecord1.setAnomalyScore(2.0);
Bucket bucket1 = new Bucket("foo", new Date(123), 123);
@ -212,13 +212,12 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testEquals_GivenDifferentBucketInfluencers() {
Bucket bucket1 = new Bucket("foo", new Date(123), 123);
BucketInfluencer influencer1 = new BucketInfluencer("foo");
BucketInfluencer influencer1 = new BucketInfluencer("foo", new Date(123), 123, 1);
influencer1.setInfluencerFieldName("foo");
bucket1.addBucketInfluencer(influencer1);
;
Bucket bucket2 = new Bucket("foo", new Date(123), 123);
BucketInfluencer influencer2 = new BucketInfluencer("foo");
BucketInfluencer influencer2 = new BucketInfluencer("foo", new Date(123), 123, 2);
influencer2.setInfluencerFieldName("bar");
bucket2.addBucketInfluencer(influencer2);
@ -227,8 +226,8 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
}
public void testEquals_GivenEqualBuckets() {
AnomalyRecord record = new AnomalyRecord("job_id");
BucketInfluencer bucketInfluencer = new BucketInfluencer("foo");
AnomalyRecord record = new AnomalyRecord("job_id", new Date(123), 123, 1);
BucketInfluencer bucketInfluencer = new BucketInfluencer("foo", new Date(123), 123, 1);
Date date = new Date();
Bucket bucket1 = new Bucket("foo", date, 123);
@ -274,7 +273,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo"));
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(0);
@ -283,7 +282,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo"));
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(1);
@ -292,7 +291,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo"));
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(0);
@ -301,7 +300,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo"));
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(1);

View File

@ -10,11 +10,14 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
import java.util.Date;
public class InfluencerTests extends AbstractSerializingTestCase<Influencer> {
@Override
protected Influencer createTestInstance() {
return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20));
return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
new Date(), randomPositiveLong(), randomIntBetween(1, 1000));
}
@Override

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCase<PerPartitionMaxProbabilities> {
@ -24,7 +25,8 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
pps.add(new PerPartitionMaxProbabilities.PartitionProbability(randomAsciiOfLength(12), randomDouble()));
}
return new PerPartitionMaxProbabilities(randomAsciiOfLength(20), new DateTime(randomDateTimeZone()).toDate(), pps);
return new PerPartitionMaxProbabilities(randomAsciiOfLength(20), new DateTime(randomDateTimeZone()).toDate(),
randomPositiveLong(), pps);
}
@Override
@ -74,7 +76,7 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double normalizedProbability) {
AnomalyRecord record = new AnomalyRecord("foo");
AnomalyRecord record = new AnomalyRecord("foo", new Date(), 600, 1);
record.setPartitionFieldValue(partitionFieldValue);
record.setNormalizedProbability(normalizedProbability);
return record;

View File

@ -18,7 +18,7 @@ setup:
index:
index: prelertresults-farequote
type: result
id: 1
id: farequote_1464739200000_1_1
body:
{
"job_id": "farequote",
@ -26,14 +26,16 @@ setup:
"influencer_field_name": "foo",
"influencer_field_value": "bar",
"anomaly_score": 80.0,
"result_type" : "influencer"
"result_type" : "influencer",
"bucket_span" : 1,
"sequence_num" : 1
}
- do:
index:
index: prelertresults-farequote
type: result
id: 2
id: farequote_1464825600000_1_2
body:
{
"job_id": "farequote",
@ -41,7 +43,9 @@ setup:
"influencer_field_name": "foo",
"influencer_field_value": "zoo",
"anomaly_score": 50.0,
"result_type" : "influencer"
"result_type" : "influencer",
"bucket_span" : 1,
"sequence_num" : 2
}
- do:
indices.refresh:

View File

@ -31,26 +31,30 @@ setup:
index:
index: prelertresults-farequote
type: result
id: 2
id: farequote_1464739200000_1_1
body:
{
"job_id": "farequote",
"result_type": "record",
"timestamp": "2016-06-01T00:00:00Z",
"anomaly_score": 60.0
"anomaly_score": 60.0,
"bucket_span": 1,
"sequence_num": 1
}
- do:
index:
index: prelertresults-farequote
type: result
id: 3
id: farequote_1464825600000_1_2
body:
{
"job_id": "farequote",
"result_type": "record",
"timestamp": "2016-06-02T00:00:00Z",
"anomaly_score": 80.0
"anomaly_score": 80.0,
"bucket_span": 1,
"sequence_num": 2
}
- do:

View File

@ -81,35 +81,37 @@ setup:
index:
index: prelertresults-foo
type: result
id: "foo_1464825600000_1_record"
body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z" }
id: "foo_1464825600000_1_1"
body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z", "bucket_span":1, "sequence_num":1 }
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1462060800000_1_record"
body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z" }
id: "foo_1462060800000_1_2"
body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z", "bucket_span":1, "sequence_num":2 }
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1464825600000_1_influencer"
id: "foo_1464825600000_1_3"
body: {
"job_id": "foo",
"result_type": "influencer",
"timestamp": "2016-06-02T00:00:00Z",
"influencer_field_name": "foo",
"influencer_field_value": "zoo",
"anomaly_score": 50.0
"anomaly_score": 50.0,
"bucket_span": 1,
"sequence_num": 3
}
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1462060800000_1_influencer"
id: "foo_1462060800000_1_4"
body:
{
"job_id": "foo",
@ -117,7 +119,9 @@ setup:
"timestamp": "2016-05-01T00:00:00Z",
"influencer_field_name": "foo",
"influencer_field_value": "zoo",
"anomaly_score": 50.0
"anomaly_score": 50.0,
"bucket_span": 1,
"sequence_num": 4
}
- do: