From 65f03a888887baa820cbb40b77e6ef3f81c6bb11 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 8 Dec 2016 18:39:03 +0000 Subject: [PATCH] Use well-defined IDs for records and influencers (elastic/elasticsearch#510) * Use well-defined IDs for records and influencers Removes the reliance on ES autogenerated UUIDs for all types that will be renormalized * Address some review comments Original commit: elastic/x-pack-elasticsearch@85fde8b95701a1f831dfc845cb2c3b6a88de826c --- ...asticsearchBatchedInfluencersIterator.java | 4 +- .../persistence/ElasticsearchMappings.java | 14 +-- .../prelert/job/persistence/JobProvider.java | 15 +-- .../job/persistence/JobRenormaliser.java | 7 +- .../job/persistence/JobResultsPersister.java | 81 +++++----------- .../output/AutoDetectResultProcessor.java | 8 +- .../prelert/job/results/AnomalyRecord.java | 92 +++++++------------ .../prelert/job/results/BucketInfluencer.java | 80 ++++++++-------- .../xpack/prelert/job/results/Influencer.java | 84 ++++++++--------- .../results/PerPartitionMaxProbabilities.java | 25 ++--- .../job/results/ReservedFieldNames.java | 3 + .../action/GetBucketActionResponseTests.java | 8 +- .../GetInfluencersActionResponseTests.java | 4 +- .../action/GetRecordsActionResponseTests.java | 4 +- .../prelert/integration/PrelertJobIT.java | 16 ++-- .../ElasticsearchMappingsTests.java | 1 - .../job/persistence/JobProviderTests.java | 19 +++- .../persistence/JobResultsPersisterTests.java | 14 ++- .../job/persistence/QueryPageTests.java | 3 +- .../AutoDetectResultProcessorTests.java | 24 ++--- .../output/AutodetectResultsParserTests.java | 39 ++++---- .../job/results/AutodetectResultTests.java | 5 +- .../job/results/BucketInfluencerTests.java | 34 ++++--- .../prelert/job/results/BucketTests.java | 35 ++++--- .../prelert/job/results/InfluencerTests.java | 5 +- .../PerPartitionMaxProbabilitiesTests.java | 6 +- .../test/jobs_get_result_influencers.yaml | 12 ++- .../test/jobs_get_result_records.yaml | 12 ++- .../test/revert_model_snapshot.yaml | 20 ++-- 29 files changed, 317 insertions(+), 357 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedInfluencersIterator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedInfluencersIterator.java index 087a5ebdea2..53b8c5fd29e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedInfluencersIterator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBatchedInfluencersIterator.java @@ -33,8 +33,6 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedResult throw new ElasticsearchParseException("failed to parser influencer", e); } - Influencer influencer = Influencer.PARSER.apply(parser, () -> parseFieldMatcher); - influencer.setId(hit.getId()); - return influencer; + return Influencer.PARSER.apply(parser, () -> parseFieldMatcher); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java index 737fba6dd65..38ad76d20bb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java @@ -120,10 +120,10 @@ public class ElasticsearchMappings { .endObject() .startObject(PROPERTIES) .startObject(Result.RESULT_TYPE.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false) .endObject() .startObject(Job.ID.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false) .endObject() .startObject(ES_TIMESTAMP) .field(TYPE, DATE) @@ -156,7 +156,7 @@ public class ElasticsearchMappings { .field(TYPE, NESTED) .startObject(PROPERTIES) .startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false) .endObject() .startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()) .field(TYPE, KEYWORD) @@ -174,7 +174,7 @@ public class ElasticsearchMappings { .field(TYPE, NESTED) .startObject(PROPERTIES) .startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false) .endObject() .startObject(BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName()) .field(TYPE, DOUBLE) @@ -182,10 +182,9 @@ public class ElasticsearchMappings { .endObject() .endObject() .startObject(BucketInfluencer.INFLUENCER_FIELD_NAME.getPreferredName()) - .field(TYPE, KEYWORD) + .field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false) .endObject() - // per-partition max probabilities mapping .startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName()) .field(TYPE, NESTED) @@ -248,6 +247,9 @@ public class ElasticsearchMappings { builder.startObject(AnomalyRecord.DETECTOR_INDEX.getPreferredName()) .field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false) .endObject() + .startObject(AnomalyRecord.SEQUENCE_NUM.getPreferredName()) + .field(TYPE, INTEGER).field(INCLUDE_IN_ALL, false) + .endObject() .startObject(AnomalyRecord.ACTUAL.getPreferredName()) .field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false) .endObject() diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index c3198040fda..ce613d1f787 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -491,10 +491,8 @@ public class JobProvider { } catch (IOException e) { throw new ElasticsearchParseException("failed to parse PerPartitionMaxProbabilities", e); } - PerPartitionMaxProbabilities perPartitionMaxProbabilities = - PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher); - perPartitionMaxProbabilities.setId(hit.getId()); - results.add(perPartitionMaxProbabilities); + + results.add(PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher)); } return results; @@ -739,11 +737,8 @@ public class JobProvider { } catch (IOException e) { throw new ElasticsearchParseException("failed to parse records", e); } - AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher); - // set the ID - record.setId(hit.getId()); - results.add(record); + results.add(AnomalyRecord.PARSER.apply(parser, () -> parseFieldMatcher)); } return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD); @@ -808,10 +803,8 @@ public class JobProvider { } catch (IOException e) { throw new ElasticsearchParseException("failed to parse influencer", e); } - Influencer influencer = Influencer.PARSER.apply(parser, () -> parseFieldMatcher); - influencer.setId(hit.getId()); - influencers.add(influencer); + influencers.add(Influencer.PARSER.apply(parser, () -> parseFieldMatcher)); } return new QueryPage<>(influencers, response.getHits().getTotalHits(), Influencer.RESULTS_FIELD); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java index dba4c6d87a8..b85008258ac 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java @@ -51,7 +51,7 @@ public class JobRenormaliser extends AbstractComponent { * @param records The updated records */ public void updateRecords(String jobId, List records) { - jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records, false).executeRequest(); + jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records).executeRequest(); } /** @@ -64,8 +64,7 @@ public class JobRenormaliser extends AbstractComponent { */ public void updatePerPartitionMaxProbabilities(String jobId, String documentId, List records) { PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records); - ppMaxProbs.setId(documentId); - jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs, false).executeRequest(); + jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs).executeRequest(); } /** @@ -76,7 +75,7 @@ public class JobRenormaliser extends AbstractComponent { * @param influencers The updated influencers */ public void updateInfluencer(String jobId, List influencers) { - jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers, false).executeRequest(); + jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers).executeRequest(); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index b433924c83b..ba4a40d0256 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -91,8 +91,7 @@ public class JobResultsPersister extends AbstractComponent { bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content)); - persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(), - bucket.isInterim()); + persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers()); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e)); } @@ -100,13 +99,13 @@ public class JobResultsPersister extends AbstractComponent { return this; } - private void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers, - Date bucketTime, boolean isInterim) throws IOException { + private void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers) + throws IOException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { - XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim); + XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer); // Need consistent IDs to ensure overwriting on renormalisation - String id = bucketId + bucketInfluencer.getInfluencerFieldName(); + String id = bucketInfluencer.getId(); logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content)); @@ -118,27 +117,17 @@ public class JobResultsPersister extends AbstractComponent { * Persist a list of anomaly records * * @param records the records to persist - * @param autoGenerateId If true then persist the influencer with an auto generated ID - * else use {@link AnomalyRecord#getId()} * @return this */ - public Builder persistRecords(List records, boolean autoGenerateId) { + public Builder persistRecords(List records) { try { for (AnomalyRecord record : records) { XContentBuilder content = toXContentBuilder(record); - - if (autoGenerateId) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", - jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); - } - else { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); - bulkRequest.add( - client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content)); - } + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", + jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); + bulkRequest.add( + client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content)); } } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e)); @@ -152,25 +141,16 @@ public class JobResultsPersister extends AbstractComponent { * an auto generated ID * * @param influencers the influencers to persist - * @param autoGenerateId If true then persist the influencer with an auto generated ID - * else use {@link Influencer#getId()} * @return this */ - public Builder persistInfluencers(List influencers, boolean autoGenerateId) { + public Builder persistInfluencers(List influencers) { try { for (Influencer influencer : influencers) { XContentBuilder content = toXContentBuilder(influencer); - if (autoGenerateId) { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", - jobId, Influencer.RESULT_TYPE_VALUE, indexName); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); - } - else { - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", - jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); - bulkRequest.add( - client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content)); - } + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", + jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); + bulkRequest.add( + client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content)); } } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e)); @@ -183,26 +163,16 @@ public class JobResultsPersister extends AbstractComponent { * Persist {@link PerPartitionMaxProbabilities} * * @param partitionProbabilities The probabilities to persist - * @param autoGenerateId If true then persist the PerPartitionMaxProbabilities with an auto generated ID - * else use {@link PerPartitionMaxProbabilities#getId()} * @return this */ - public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities, boolean autoGenerateId) { + public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) { try { XContentBuilder builder = toXContentBuilder(partitionProbabilities); - - if (autoGenerateId) { - logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp()); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(builder)); - } - else { - logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), - partitionProbabilities.getId()); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()) - .setSource(builder)); - } + logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", + jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), + partitionProbabilities.getId()); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()) + .setSource(builder)); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", new Object[]{jobId}, e)); @@ -223,7 +193,6 @@ public class JobResultsPersister extends AbstractComponent { } } - /** * Persist the category definition * @@ -335,13 +304,9 @@ public class JobResultsPersister extends AbstractComponent { return builder; } - private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer, - Date bucketTime, boolean isInterim) throws IOException { - BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer); - influencer.setIsInterim(isInterim); - influencer.setTimestamp(bucketTime); + private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer) throws IOException { XContentBuilder builder = jsonBuilder(); - influencer.toXContent(builder, ToXContent.EMPTY_PARAMS); + bucketInfluencer.toXContent(builder, ToXContent.EMPTY_PARAMS); return builder; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index f1e0ba28bc6..49e4405611d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -88,7 +88,7 @@ public class AutoDetectResultProcessor { LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount); LOGGER.info("[{}] Parse results Complete", jobId); } catch (Exception e) { - LOGGER.error((Supplier) () -> new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}, e)); + LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); } finally { completionLatch.countDown(); flushListener.clear(); @@ -116,14 +116,14 @@ public class AutoDetectResultProcessor { } List records = result.getRecords(); if (records != null && !records.isEmpty()) { - context.bulkResultsPersister.persistRecords(records, true); + context.bulkResultsPersister.persistRecords(records); if (context.isPerPartitionNormalization) { - context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records), true); + context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records)); } } List influencers = result.getInfluencers(); if (influencers != null && !influencers.isEmpty()) { - context.bulkResultsPersister.persistInfluencers(influencers, true); + context.bulkResultsPersister.persistInfluencers(influencers); } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java index a3bac523efa..b49bfd0e9b6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.time.TimeUtils; import java.io.IOException; @@ -39,6 +40,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { * Result fields (all detector types) */ public static final ParseField DETECTOR_INDEX = new ParseField("detector_index"); + public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num"); public static final ParseField PROBABILITY = new ParseField("probability"); public static final ParseField BY_FIELD_NAME = new ParseField("by_field_name"); public static final ParseField BY_FIELD_VALUE = new ParseField("by_field_value"); @@ -77,19 +79,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { public static final ParseField INITIAL_NORMALIZED_PROBABILITY = new ParseField("initial_normalized_probability"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new AnomalyRecord((String) a[0])); + new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new AnomalyRecord((String) a[0], (Date) a[1], (long) a[2], (int) a[3])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE); - PARSER.declareDouble(AnomalyRecord::setProbability, PROBABILITY); - PARSER.declareDouble(AnomalyRecord::setAnomalyScore, ANOMALY_SCORE); - PARSER.declareDouble(AnomalyRecord::setNormalizedProbability, NORMALIZED_PROBABILITY); - PARSER.declareDouble(AnomalyRecord::setInitialNormalizedProbability, INITIAL_NORMALIZED_PROBABILITY); - PARSER.declareLong(AnomalyRecord::setBucketSpan, BUCKET_SPAN); - PARSER.declareInt(AnomalyRecord::setDetectorIndex, DETECTOR_INDEX); - PARSER.declareBoolean(AnomalyRecord::setInterim, IS_INTERIM); - PARSER.declareField(AnomalyRecord::setTimestamp, p -> { + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); } else if (p.currentToken() == Token.VALUE_STRING) { @@ -97,6 +91,15 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { } throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); }, TIMESTAMP, ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); + PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE); + PARSER.declareDouble(AnomalyRecord::setProbability, PROBABILITY); + PARSER.declareDouble(AnomalyRecord::setAnomalyScore, ANOMALY_SCORE); + PARSER.declareDouble(AnomalyRecord::setNormalizedProbability, NORMALIZED_PROBABILITY); + PARSER.declareDouble(AnomalyRecord::setInitialNormalizedProbability, INITIAL_NORMALIZED_PROBABILITY); + PARSER.declareInt(AnomalyRecord::setDetectorIndex, DETECTOR_INDEX); + PARSER.declareBoolean(AnomalyRecord::setInterim, IS_INTERIM); PARSER.declareString(AnomalyRecord::setByFieldName, BY_FIELD_NAME); PARSER.declareString(AnomalyRecord::setByFieldValue, BY_FIELD_VALUE); PARSER.declareString(AnomalyRecord::setCorrelatedByFieldValue, CORRELATED_BY_FIELD_VALUE); @@ -114,7 +117,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { } private final String jobId; - private String id; + private final int sequenceNum; private int detectorIndex; private double probability; private String byFieldName; @@ -139,21 +142,24 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { private double initialNormalizedProbability; - private Date timestamp; - private long bucketSpan; + private final Date timestamp; + private final long bucketSpan; private List influencers; private boolean hadBigNormalisedUpdate; - public AnomalyRecord(String jobId) { + public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.bucketSpan = bucketSpan; + this.sequenceNum = sequenceNum; } @SuppressWarnings("unchecked") public AnomalyRecord(StreamInput in) throws IOException { jobId = in.readString(); - id = in.readOptionalString(); + sequenceNum = in.readInt(); detectorIndex = in.readInt(); probability = in.readDouble(); byFieldName = in.readOptionalString(); @@ -179,21 +185,18 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { anomalyScore = in.readDouble(); normalizedProbability = in.readDouble(); initialNormalizedProbability = in.readDouble(); - if (in.readBoolean()) { - timestamp = new Date(in.readLong()); - } + timestamp = new Date(in.readLong()); bucketSpan = in.readLong(); if (in.readBoolean()) { influencers = in.readList(Influence::new); } hadBigNormalisedUpdate = in.readBoolean(); - } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - out.writeOptionalString(id); + out.writeInt(sequenceNum); out.writeInt(detectorIndex); out.writeDouble(probability); out.writeOptionalString(byFieldName); @@ -225,11 +228,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { out.writeDouble(anomalyScore); out.writeDouble(normalizedProbability); out.writeDouble(initialNormalizedProbability); - boolean hasTimestamp = timestamp != null; - out.writeBoolean(hasTimestamp); - if (hasTimestamp) { - out.writeLong(timestamp.getTime()); - } + out.writeLong(timestamp.getTime()); out.writeLong(bucketSpan); boolean hasInfluencers = influencers != null; out.writeBoolean(hasInfluencers); @@ -250,10 +249,9 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { builder.field(INITIAL_NORMALIZED_PROBABILITY.getPreferredName(), initialNormalizedProbability); builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex); + builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); builder.field(IS_INTERIM.getPreferredName(), isInterim); - if (timestamp != null) { - builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); - } + builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); if (byFieldName != null) { builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName); } @@ -305,15 +303,10 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { } /** - * Data store ID of this record. May be null for records that have not been - * read from the data store. + * Data store ID of this record. */ public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; + return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum; } public int getDetectorIndex() { @@ -352,10 +345,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { return timestamp; } - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; - } - /** * Bucketspan expressed in seconds */ @@ -363,13 +352,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { return bucketSpan; } - /** - * Bucketspan expressed in seconds - */ - public void setBucketSpan(long bucketSpan) { - this.bucketSpan = bucketSpan; - } - public double getProbability() { return probability; } @@ -378,7 +360,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { probability = value; } - public String getByFieldName() { return byFieldName; } @@ -509,14 +490,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { @Override public int hashCode() { - // ID is NOT included in the hash, so that a record from the data store - // will hash the same as a record representing the same anomaly that did - // not come from the data store - // hadBigNormalisedUpdate is also deliberately excluded from the hash + // hadBigNormalisedUpdate is deliberately excluded from the hash - return Objects.hash(detectorIndex, probability, anomalyScore, initialNormalizedProbability, - normalizedProbability, typical, actual, + return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore, + normalizedProbability, initialNormalizedProbability, typical, actual, function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, timestamp, isInterim, causes, influencers, jobId); @@ -535,13 +513,11 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { AnomalyRecord that = (AnomalyRecord) other; - // ID is NOT compared, so that a record from the data store will compare - // equal to a record representing the same anomaly that did not come - // from the data store - - // hadBigNormalisedUpdate is also deliberately excluded from the test + // hadBigNormalisedUpdate is deliberately excluded from the test return Objects.equals(this.jobId, that.jobId) && this.detectorIndex == that.detectorIndex + && this.sequenceNum == that.sequenceNum + && this.bucketSpan == that.bucketSpan && this.probability == that.probability && this.anomalyScore == that.anomalyScore && this.normalizedProbability == that.normalizedProbability diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java index 9c0cda059a9..879761d51bc 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.time.TimeUtils; import java.io.IOException; @@ -39,20 +40,16 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { public static final ParseField PROBABILITY = new ParseField("probability"); public static final ParseField IS_INTERIM = new ParseField("is_interim"); public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); + public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0])); + new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0], + (Date) a[1], (long) a[2], (int) a[3])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE); - PARSER.declareString(BucketInfluencer::setInfluencerFieldName, INFLUENCER_FIELD_NAME); - PARSER.declareDouble(BucketInfluencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE); - PARSER.declareDouble(BucketInfluencer::setAnomalyScore, ANOMALY_SCORE); - PARSER.declareDouble(BucketInfluencer::setRawAnomalyScore, RAW_ANOMALY_SCORE); - PARSER.declareDouble(BucketInfluencer::setProbability, PROBABILITY); - PARSER.declareBoolean(BucketInfluencer::setIsInterim, IS_INTERIM); - PARSER.declareField(BucketInfluencer::setTimestamp, p -> { + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); } else if (p.currentToken() == Token.VALUE_STRING) { @@ -60,6 +57,15 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { } throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); }, TIMESTAMP, ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); + PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE); + PARSER.declareString(BucketInfluencer::setInfluencerFieldName, INFLUENCER_FIELD_NAME); + PARSER.declareDouble(BucketInfluencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE); + PARSER.declareDouble(BucketInfluencer::setAnomalyScore, ANOMALY_SCORE); + PARSER.declareDouble(BucketInfluencer::setRawAnomalyScore, RAW_ANOMALY_SCORE); + PARSER.declareDouble(BucketInfluencer::setProbability, PROBABILITY); + PARSER.declareBoolean(BucketInfluencer::setIsInterim, IS_INTERIM); } private final String jobId; @@ -69,21 +75,15 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { private double rawAnomalyScore; private double probability; private boolean isInterim; - private Date timestamp; + private final Date timestamp; + private final long bucketSpan; + private final int sequenceNum; - public BucketInfluencer(String jobId) { + public BucketInfluencer(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; - } - - public BucketInfluencer(BucketInfluencer prototype) { - jobId = prototype.jobId; - influenceField = prototype.influenceField; - initialAnomalyScore = prototype.initialAnomalyScore; - anomalyScore = prototype.anomalyScore; - rawAnomalyScore = prototype.rawAnomalyScore; - probability = prototype.probability; - isInterim = prototype.isInterim; - timestamp = prototype.timestamp; + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.bucketSpan = bucketSpan; + this.sequenceNum = sequenceNum; } public BucketInfluencer(StreamInput in) throws IOException { @@ -94,9 +94,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { rawAnomalyScore = in.readDouble(); probability = in.readDouble(); isInterim = in.readBoolean(); - if (in.readBoolean()) { - timestamp = new Date(in.readLong()); - } + timestamp = new Date(in.readLong()); + bucketSpan = in.readLong(); + sequenceNum = in.readInt(); } @Override @@ -108,11 +108,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { out.writeDouble(rawAnomalyScore); out.writeDouble(probability); out.writeBoolean(isInterim); - boolean hasTimestamp = timestamp != null; - out.writeBoolean(hasTimestamp); - if (hasTimestamp) { - out.writeLong(timestamp.getTime()); - } + out.writeLong(timestamp.getTime()); + out.writeLong(bucketSpan); + out.writeInt(sequenceNum); } @Override @@ -127,14 +125,20 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore); builder.field(RAW_ANOMALY_SCORE.getPreferredName(), rawAnomalyScore); builder.field(PROBABILITY.getPreferredName(), probability); - if (timestamp != null) { - builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); - } + builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); + builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); + builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); builder.field(IS_INTERIM.getPreferredName(), isInterim); builder.endObject(); return builder; } + /** + * Data store ID of this bucket influencer. + */ + public String getId() { + return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum; + } public String getJobId() { return jobId; @@ -188,17 +192,14 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { return isInterim; } - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; - } - public Date getTimestamp() { return timestamp; } @Override public int hashCode() { - return Objects.hash(influenceField, initialAnomalyScore, anomalyScore, rawAnomalyScore, probability, isInterim, timestamp, jobId); + return Objects.hash(influenceField, initialAnomalyScore, anomalyScore, rawAnomalyScore, probability, isInterim, timestamp, jobId, + bucketSpan, sequenceNum); } @Override @@ -220,6 +221,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { return Objects.equals(influenceField, other.influenceField) && Double.compare(initialAnomalyScore, other.initialAnomalyScore) == 0 && Double.compare(anomalyScore, other.anomalyScore) == 0 && Double.compare(rawAnomalyScore, other.rawAnomalyScore) == 0 && Double.compare(probability, other.probability) == 0 && Objects.equals(isInterim, other.isInterim) - && Objects.equals(timestamp, other.timestamp) && Objects.equals(jobId, other.jobId); + && Objects.equals(timestamp, other.timestamp) && Objects.equals(jobId, other.jobId) && bucketSpan == other.bucketSpan + && sequenceNum == other.sequenceNum; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java index 52d982c25a2..642ca983c3d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.time.TimeUtils; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -33,7 +34,9 @@ public class Influencer extends ToXContentToBytes implements Writeable { * Field names */ public static final ParseField PROBABILITY = new ParseField("probability"); + public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num"); public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name"); public static final ParseField INFLUENCER_FIELD_VALUE = new ParseField("influencer_field_value"); public static final ParseField INITIAL_ANOMALY_SCORE = new ParseField("initial_anomaly_score"); @@ -43,18 +46,14 @@ public class Influencer extends ToXContentToBytes implements Writeable { public static final ParseField RESULTS_FIELD = new ParseField("influencers"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - RESULT_TYPE_FIELD.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2])); + RESULT_TYPE_FIELD.getPreferredName(), a -> new Influencer((String) a[0], (String) a[1], (String) a[2], + (Date) a[3], (long) a[4], (int) a[5])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_NAME); PARSER.declareString(ConstructingObjectParser.constructorArg(), INFLUENCER_FIELD_VALUE); - PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE); - PARSER.declareDouble(Influencer::setProbability, PROBABILITY); - PARSER.declareDouble(Influencer::setAnomalyScore, ANOMALY_SCORE); - PARSER.declareDouble(Influencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE); - PARSER.declareBoolean(Influencer::setInterim, Bucket.IS_INTERIM); - PARSER.declareField(Influencer::setTimestamp, p -> { + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); } else if (p.currentToken() == Token.VALUE_STRING) { @@ -62,11 +61,19 @@ public class Influencer extends ToXContentToBytes implements Writeable { } throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); }, TIMESTAMP, ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); + PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE); + PARSER.declareDouble(Influencer::setProbability, PROBABILITY); + PARSER.declareDouble(Influencer::setAnomalyScore, ANOMALY_SCORE); + PARSER.declareDouble(Influencer::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE); + PARSER.declareBoolean(Influencer::setInterim, Bucket.IS_INTERIM); } - private String jobId; - private String id; - private Date timestamp; + private final String jobId; + private final Date timestamp; + private final long bucketSpan; + private final int sequenceNum; private String influenceField; private String influenceValue; private double probability; @@ -75,18 +82,18 @@ public class Influencer extends ToXContentToBytes implements Writeable { private boolean hadBigNormalisedUpdate; private boolean isInterim; - public Influencer(String jobId, String fieldName, String fieldValue) { + public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; influenceField = fieldName; influenceValue = fieldValue; + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.bucketSpan = bucketSpan; + this.sequenceNum = sequenceNum; } public Influencer(StreamInput in) throws IOException { jobId = in.readString(); - id = in.readOptionalString(); - if (in.readBoolean()) { - timestamp = new Date(in.readLong()); - } + timestamp = new Date(in.readLong()); influenceField = in.readString(); influenceValue = in.readString(); probability = in.readDouble(); @@ -94,17 +101,14 @@ public class Influencer extends ToXContentToBytes implements Writeable { anomalyScore = in.readDouble(); hadBigNormalisedUpdate = in.readBoolean(); isInterim = in.readBoolean(); + bucketSpan = in.readLong(); + sequenceNum = in.readInt(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - out.writeOptionalString(id); - boolean hasTimestamp = timestamp != null; - out.writeBoolean(hasTimestamp); - if (hasTimestamp) { - out.writeLong(timestamp.getTime()); - } + out.writeLong(timestamp.getTime()); out.writeString(influenceField); out.writeString(influenceValue); out.writeDouble(probability); @@ -112,6 +116,8 @@ public class Influencer extends ToXContentToBytes implements Writeable { out.writeDouble(anomalyScore); out.writeBoolean(hadBigNormalisedUpdate); out.writeBoolean(isInterim); + out.writeLong(bucketSpan); + out.writeInt(sequenceNum); } @Override @@ -124,10 +130,10 @@ public class Influencer extends ToXContentToBytes implements Writeable { builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore); builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore); builder.field(PROBABILITY.getPreferredName(), probability); + builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); + builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(Bucket.IS_INTERIM.getPreferredName(), isInterim); - if (timestamp != null) { - builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); - } + builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); builder.endObject(); return builder; } @@ -136,16 +142,8 @@ public class Influencer extends ToXContentToBytes implements Writeable { return jobId; } - /** - * Data store ID of this record. May be null for records that have not been - * read from the data store. - */ public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; + return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + sequenceNum; } public double getProbability() { @@ -160,10 +158,6 @@ public class Influencer extends ToXContentToBytes implements Writeable { return timestamp; } - public void setTimestamp(Date date) { - timestamp = date; - } - public String getInfluencerFieldName() { return influenceField; } @@ -210,13 +204,11 @@ public class Influencer extends ToXContentToBytes implements Writeable { @Override public int hashCode() { - // ID is NOT included in the hash, so that a record from the data store - // will hash the same as a record representing the same anomaly that did - // not come from the data store - // hadBigNormalisedUpdate is also deliberately excluded from the hash + // hadBigNormalisedUpdate is deliberately excluded from the hash - return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim); + return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim, + bucketSpan, sequenceNum); } @Override @@ -235,16 +227,12 @@ public class Influencer extends ToXContentToBytes implements Writeable { Influencer other = (Influencer) obj; - // ID is NOT compared, so that a record from the data store will compare - // equal to a record representing the same anomaly that did not come - // from the data store - - // hadBigNormalisedUpdate is also deliberately excluded from the test + // hadBigNormalisedUpdate is deliberately excluded from the test return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp) && Objects.equals(influenceField, other.influenceField) && Objects.equals(influenceValue, other.influenceValue) && Double.compare(initialAnomalyScore, other.initialAnomalyScore) == 0 && Double.compare(anomalyScore, other.anomalyScore) == 0 && Double.compare(probability, other.probability) == 0 - && (isInterim == other.isInterim); + && (isInterim == other.isInterim) && (bucketSpan == other.bucketSpan) && (sequenceNum == other.sequenceNum); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java index 6ebd0707422..e9e0894742a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java @@ -48,7 +48,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> - new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (List) a[2])); + new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (long) a[2], (List) a[3])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); @@ -61,18 +61,21 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + Bucket.TIMESTAMP.getPreferredName() + "]"); }, Bucket.TIMESTAMP, ObjectParser.ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN); PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES); PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE); } private final String jobId; private final Date timestamp; + private final long bucketSpan; private final List perPartitionMaxProbabilities; - private String id; - public PerPartitionMaxProbabilities(String jobId, Date timestamp, List partitionProbabilities) { + public PerPartitionMaxProbabilities(String jobId, Date timestamp, long bucketSpan, + List partitionProbabilities) { this.jobId = jobId; this.timestamp = timestamp; + this.bucketSpan = bucketSpan; this.perPartitionMaxProbabilities = partitionProbabilities; } @@ -82,22 +85,23 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W } this.jobId = records.get(0).getJobId(); this.timestamp = records.get(0).getTimestamp(); + this.bucketSpan = records.get(0).getBucketSpan(); this.perPartitionMaxProbabilities = calcMaxNormalizedProbabilityPerPartition(records); } public PerPartitionMaxProbabilities(StreamInput in) throws IOException { jobId = in.readString(); timestamp = new Date(in.readLong()); + bucketSpan = in.readLong(); perPartitionMaxProbabilities = in.readList(PartitionProbability::new); - id = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); out.writeLong(timestamp.getTime()); + out.writeLong(bucketSpan); out.writeList(perPartitionMaxProbabilities); - out.writeOptionalString(id); } public String getJobId() { @@ -105,11 +109,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W } public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; + return jobId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + RESULT_TYPE_VALUE; } public Date getTimestamp() { @@ -170,6 +170,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(Bucket.TIMESTAMP.getPreferredName(), timestamp.getTime()); + builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities); builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); builder.endObject(); @@ -178,7 +179,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W @Override public int hashCode() { - return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, id); + return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, bucketSpan); } @Override @@ -195,7 +196,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp) - && Objects.equals(this.id, that.id) + && this.bucketSpan == that.bucketSpan && Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java index 099a7c1263d..b25d603dd9e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java @@ -79,6 +79,7 @@ public final class ReservedFieldNames { AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(), AnomalyRecord.INITIAL_NORMALIZED_PROBABILITY.getPreferredName(), AnomalyRecord.BUCKET_SPAN.getPreferredName(), + AnomalyRecord.SEQUENCE_NUM.getPreferredName(), Bucket.ANOMALY_SCORE.getPreferredName(), Bucket.BUCKET_INFLUENCERS.getPreferredName(), @@ -119,6 +120,8 @@ public final class ReservedFieldNames { Influencer.INFLUENCER_FIELD_VALUE.getPreferredName(), Influencer.INITIAL_ANOMALY_SCORE.getPreferredName(), Influencer.ANOMALY_SCORE.getPreferredName(), + Influencer.BUCKET_SPAN.getPreferredName(), + Influencer.SEQUENCE_NUM.getPreferredName(), ModelDebugOutput.PARTITION_FIELD_NAME.getPreferredName(), ModelDebugOutput.PARTITION_FIELD_VALUE.getPreferredName(), ModelDebugOutput.OVER_FIELD_NAME.getPreferredName(), ModelDebugOutput.OVER_FIELD_VALUE.getPreferredName(), diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java index 9a0a4722d59..2f526637755 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java @@ -24,6 +24,7 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase hits = new ArrayList<>(listSize); @@ -37,7 +38,8 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase bucketInfluencers = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BucketInfluencer bucketInfluencer = new BucketInfluencer("foo"); + BucketInfluencer bucketInfluencer = new BucketInfluencer("foo", bucket.getTimestamp(), bucket.getBucketSpan(), + sequenceNum++); bucketInfluencer.setAnomalyScore(randomDouble()); bucketInfluencer.setInfluencerFieldName(randomAsciiOfLengthBetween(1, 20)); bucketInfluencer.setInitialAnomalyScore(randomDouble()); @@ -86,14 +88,12 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase records = new ArrayList<>(size); for (int i = 0; i < size; i++) { - AnomalyRecord anomalyRecord = new AnomalyRecord(jobId); + AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, new Date(randomLong()), randomPositiveLong(), sequenceNum++); anomalyRecord.setAnomalyScore(randomDouble()); anomalyRecord.setActual(Collections.singletonList(randomDouble())); anomalyRecord.setTypical(Collections.singletonList(randomDouble())); anomalyRecord.setProbability(randomDouble()); - anomalyRecord.setId(randomAsciiOfLengthBetween(1, 20)); anomalyRecord.setInterim(randomBoolean()); - anomalyRecord.setTimestamp(new Date(randomLong())); records.add(anomalyRecord); } bucket.setRecords(records); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetInfluencersActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetInfluencersActionResponseTests.java index 7fe7b9b89d8..71cda32ab1f 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetInfluencersActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetInfluencersActionResponseTests.java @@ -22,13 +22,11 @@ public class GetInfluencersActionResponseTests extends AbstractStreamableTestCas List hits = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), - randomAsciiOfLengthBetween(1, 20)); + randomAsciiOfLengthBetween(1, 20), new Date(randomPositiveLong()), randomPositiveLong(), j + 1); influencer.setAnomalyScore(randomDouble()); influencer.setInitialAnomalyScore(randomDouble()); influencer.setProbability(randomDouble()); - influencer.setId(randomAsciiOfLengthBetween(1, 20)); influencer.setInterim(randomBoolean()); - influencer.setTimestamp(new Date(randomLong())); hits.add(influencer); } QueryPage buckets = new QueryPage<>(hits, listSize, Influencer.RESULTS_FIELD); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetRecordsActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetRecordsActionResponseTests.java index a863c777ad7..98dafa9c1c7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetRecordsActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetRecordsActionResponseTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class GetRecordsActionResponseTests extends AbstractStreamableTestCase { @@ -21,8 +22,7 @@ public class GetRecordsActionResponseTests extends AbstractStreamableTestCase hits = new ArrayList<>(listSize); String jobId = randomAsciiOfLengthBetween(1, 20); for (int j = 0; j < listSize; j++) { - AnomalyRecord record = new AnomalyRecord(jobId); - record.setId(randomAsciiOfLengthBetween(1, 20)); + AnomalyRecord record = new AnomalyRecord(jobId, new Date(), 600, j + 1); hits.add(record); } QueryPage snapshots = new QueryPage<>(hits, listSize, AnomalyRecord.RESULTS_FIELD); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java index 194c08b891b..d9c9716bf9b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/PrelertJobIT.java @@ -226,9 +226,9 @@ public class PrelertJobIT extends ESRestTestCase { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); assertThat(e.getMessage(), containsString("No known job with id '1'")); - addRecordResult("1", "1234"); - addRecordResult("1", "1235"); - addRecordResult("1", "1236"); + addRecordResult("1", "1234", 1, 1); + addRecordResult("1", "1235", 1, 2); + addRecordResult("1", "1236", 1, 3); Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "results/1/records", params); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); String responseAsString = responseEntityToString(response); @@ -259,7 +259,7 @@ public class PrelertJobIT extends ESRestTestCase { Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult)); } - private Response addRecordResult(String jobId, String timestamp) throws Exception { + private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception { try { client().performRequest("put", "prelertresults-" + jobId, Collections.emptyMap(), new StringEntity(RESULT_MAPPING)); } catch (ResponseException e) { @@ -268,10 +268,12 @@ public class PrelertJobIT extends ESRestTestCase { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); } - String bucketResult = - String.format(Locale.ROOT, "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"record\"}", jobId, timestamp); + String recordResult = + String.format(Locale.ROOT, + "{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}", + jobId, timestamp, bucketSpan, sequenceNum); return client().performRequest("put", "prelertresults-" + jobId + "/result/" + timestamp, - Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult)); + Collections.singletonMap("refresh", "true"), new StringEntity(recordResult)); } private static String responseEntityToString(Response response) throws Exception { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java index 78797d9c9a5..ebf198b9d87 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappingsTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.ModelState; -import org.elasticsearch.xpack.prelert.job.SchedulerConfig; import org.elasticsearch.xpack.prelert.job.audit.AuditActivity; import org.elasticsearch.xpack.prelert.job.audit.AuditMessage; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java index 4b7c8f530b2..f8f25ee69d3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobProviderTests.java @@ -408,6 +408,7 @@ public class JobProviderTests extends ESTestCase { recordMap1.put("timestamp", now.getTime()); recordMap1.put("function", "irritable"); recordMap1.put("bucket_span", 22); + recordMap1.put("sequence_num", 1); Map recordMap2 = new HashMap<>(); recordMap2.put("job_id", "foo"); recordMap2.put("typical", 1122.4); @@ -415,6 +416,7 @@ public class JobProviderTests extends ESTestCase { recordMap2.put("timestamp", now.getTime()); recordMap2.put("function", "irrascible"); recordMap2.put("bucket_span", 22); + recordMap2.put("sequence_num", 2); source.add(recordMap1); source.add(recordMap2); @@ -458,6 +460,7 @@ public class JobProviderTests extends ESTestCase { recordMap1.put("timestamp", now.getTime()); recordMap1.put("function", "irritable"); recordMap1.put("bucket_span", 22); + recordMap1.put("sequence_num", 1); Map recordMap2 = new HashMap<>(); recordMap2.put("job_id", "foo"); recordMap2.put("typical", 1122.4); @@ -465,6 +468,7 @@ public class JobProviderTests extends ESTestCase { recordMap2.put("timestamp", now.getTime()); recordMap2.put("function", "irrascible"); recordMap2.put("bucket_span", 22); + recordMap2.put("sequence_num", 2); source.add(recordMap1); source.add(recordMap2); @@ -515,6 +519,7 @@ public class JobProviderTests extends ESTestCase { recordMap1.put("timestamp", now.getTime()); recordMap1.put("function", "irritable"); recordMap1.put("bucket_span", 22); + recordMap1.put("sequence_num", 1); Map recordMap2 = new HashMap<>(); recordMap2.put("job_id", "foo"); recordMap2.put("typical", 1122.4); @@ -522,6 +527,7 @@ public class JobProviderTests extends ESTestCase { recordMap2.put("timestamp", now.getTime()); recordMap2.put("function", "irrascible"); recordMap2.put("bucket_span", 22); + recordMap2.put("sequence_num", 2); source.add(recordMap1); source.add(recordMap2); @@ -563,6 +569,7 @@ public class JobProviderTests extends ESTestCase { recordMap.put("timestamp", now.getTime()); recordMap.put("function", "irritable"); recordMap.put("bucket_span", 22); + recordMap.put("sequence_num", i + 1); source.add(recordMap); } @@ -594,6 +601,7 @@ public class JobProviderTests extends ESTestCase { recordMap.put("timestamp", now.getTime()); recordMap.put("function", "irritable"); recordMap.put("bucket_span", 22); + recordMap.put("sequence_num", i + 1); source.add(recordMap); } @@ -679,6 +687,8 @@ public class JobProviderTests extends ESTestCase { recordMap1.put("influencer_field_value", "Bob"); recordMap1.put("initial_anomaly_score", 22.2); recordMap1.put("anomaly_score", 22.6); + recordMap1.put("bucket_span", 123); + recordMap1.put("sequence_num", 1); Map recordMap2 = new HashMap<>(); recordMap2.put("job_id", "foo"); recordMap2.put("probability", 0.99); @@ -687,6 +697,8 @@ public class JobProviderTests extends ESTestCase { recordMap2.put("influencer_field_value", "James"); recordMap2.put("initial_anomaly_score", 5.0); recordMap2.put("anomaly_score", 5.0); + recordMap2.put("bucket_span", 123); + recordMap2.put("sequence_num", 2); source.add(recordMap1); source.add(recordMap2); @@ -740,6 +752,8 @@ public class JobProviderTests extends ESTestCase { recordMap1.put("influencer_field_value", "Bob"); recordMap1.put("initial_anomaly_score", 22.2); recordMap1.put("anomaly_score", 22.6); + recordMap1.put("bucket_span", 123); + recordMap1.put("sequence_num", 1); Map recordMap2 = new HashMap<>(); recordMap2.put("job_id", "foo"); recordMap2.put("probability", 0.99); @@ -748,6 +762,8 @@ public class JobProviderTests extends ESTestCase { recordMap2.put("influencer_field_value", "James"); recordMap2.put("initial_anomaly_score", 5.0); recordMap2.put("anomaly_score", 5.0); + recordMap2.put("bucket_span", 123); + recordMap2.put("sequence_num", 2); source.add(recordMap1); source.add(recordMap2); @@ -967,10 +983,9 @@ public class JobProviderTests extends ESTestCase { } private AnomalyRecord createAnomalyRecord(String partitionFieldValue, Date timestamp, double normalizedProbability) { - AnomalyRecord record = new AnomalyRecord("foo"); + AnomalyRecord record = new AnomalyRecord("foo", timestamp, 600, 42); record.setPartitionFieldValue(partitionFieldValue); record.setNormalizedProbability(normalizedProbability); - record.setTimestamp(timestamp); return record; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index ca1df02e4b7..7f3c7af7252 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -50,7 +50,7 @@ public class JobResultsPersisterTests extends ESTestCase { bucket.setProcessingTimeMs(8888); bucket.setRecordCount(1); - BucketInfluencer bi = new BucketInfluencer(JOB_ID); + BucketInfluencer bi = new BucketInfluencer(JOB_ID, new Date(), 600, 1); bi.setAnomalyScore(14.15); bi.setInfluencerFieldName("biOne"); bi.setInitialAnomalyScore(18.12); @@ -59,7 +59,7 @@ public class JobResultsPersisterTests extends ESTestCase { bucket.addBucketInfluencer(bi); // We are adding a record but it shouldn't be persisted as part of the bucket - AnomalyRecord record = new AnomalyRecord(JOB_ID); + AnomalyRecord record = new AnomalyRecord(JOB_ID, new Date(), 600, 2); record.setAnomalyScore(99.8); bucket.setRecords(Arrays.asList(record)); @@ -94,14 +94,13 @@ public class JobResultsPersisterTests extends ESTestCase { Client client = clientBuilder.build(); List records = new ArrayList<>(); - AnomalyRecord r1 = new AnomalyRecord(JOB_ID); + AnomalyRecord r1 = new AnomalyRecord(JOB_ID, new Date(), 42, 1); records.add(r1); List actuals = new ArrayList<>(); actuals.add(5.0); actuals.add(5.1); r1.setActual(actuals); r1.setAnomalyScore(99.8); - r1.setBucketSpan(42); r1.setByFieldName("byName"); r1.setByFieldValue("byValue"); r1.setCorrelatedByFieldValue("testCorrelations"); @@ -122,7 +121,7 @@ public class JobResultsPersisterTests extends ESTestCase { r1.setTypical(typicals); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.bulkPersisterBuilder(JOB_ID).persistRecords(records, true).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest(); List captured = captor.getAllValues(); assertEquals(1, captured.size()); @@ -156,15 +155,14 @@ public class JobResultsPersisterTests extends ESTestCase { Client client = clientBuilder.build(); List influencers = new ArrayList<>(); - Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1"); + Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1); inf.setAnomalyScore(16); - inf.setId("infID"); inf.setInitialAnomalyScore(55.5); inf.setProbability(0.4); influencers.add(inf); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers, true).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest(); List captured = captor.getAllValues(); assertEquals(1, captured.size()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/QueryPageTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/QueryPageTests.java index 6b65afed6b4..8f20147872a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/QueryPageTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/QueryPageTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.support.AbstractWireSerializingTestCase; import java.util.ArrayList; +import java.util.Date; public class QueryPageTests extends AbstractWireSerializingTestCase> { @@ -19,7 +20,7 @@ public class QueryPageTests extends AbstractWireSerializingTestCase hits = new ArrayList<>(); for (int i = 0; i < hitCount; i++) { hits.add(new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), - randomAsciiOfLengthBetween(1, 20))); + randomAsciiOfLengthBetween(1, 20), new Date(), randomPositiveLong(), i + 1)); } return new QueryPage<>(hits, hitCount, new ParseField("test")); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index ccd6e55f86f..5b18d50cebf 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -22,12 +22,12 @@ import org.mockito.InOrder; import java.io.InputStream; import java.util.Arrays; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.stream.Stream; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -114,13 +114,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); - AnomalyRecord record1 = new AnomalyRecord("foo"); - AnomalyRecord record2 = new AnomalyRecord("foo"); + AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123, 1); + AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123, 2); List records = Arrays.asList(record1, record2); when(result.getRecords()).thenReturn(records); processor.processResult(context, result); - verify(bulkBuilder, times(1)).persistRecords(records, true); + verify(bulkBuilder, times(1)).persistRecords(records); verifyNoMoreInteractions(persister); } @@ -134,16 +134,16 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); - AnomalyRecord record1 = new AnomalyRecord("foo"); + AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123, 1); record1.setPartitionFieldValue("pValue"); - AnomalyRecord record2 = new AnomalyRecord("foo"); + AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123, 2); record2.setPartitionFieldValue("pValue"); List records = Arrays.asList(record1, record2); when(result.getRecords()).thenReturn(records); processor.processResult(context, result); - verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class), eq(true)); - verify(bulkBuilder, times(1)).persistRecords(records, true); + verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); + verify(bulkBuilder, times(1)).persistRecords(records); verifyNoMoreInteractions(persister); } @@ -157,13 +157,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); - Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue"); - Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2"); + Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123, 1); + Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123, 1); List influencers = Arrays.asList(influencer1, influencer2); when(result.getInfluencers()).thenReturn(influencers); processor.processResult(context, result); - verify(bulkBuilder, times(1)).persistInfluencers(influencers, true); + verify(bulkBuilder, times(1)).persistInfluencers(influencers); verifyNoMoreInteractions(persister); } @@ -321,4 +321,4 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verifyNoMoreInteractions(renormaliser); } -} \ No newline at end of file +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java index 923a4501853..b3b3b33ca52 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java @@ -32,33 +32,38 @@ public class AutodetectResultsParserTests extends ESTestCase { public static final String METRIC_OUTPUT_SAMPLE = "[{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359450000000," + "\"bucket_span\":22, \"records\":[]," + "\"max_normalized_probability\":0, \"anomaly_score\":0,\"record_count\":0,\"event_count\":806,\"bucket_influencers\":[" - + "{\"job_id\":\"foo\",\"anomaly_score\":0, \"probability\":0.0, \"influencer_field_name\":\"bucket_time\"," + + "{\"sequence_num\":1,\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0," + + "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\"," + "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.1, normaliser 2" + ".1]\"}}" - + ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22," - + "\"records\":[{\"job_id\":\"foo\",\"probability\":0.0637541," + + ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":" + + "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":1,\"job_id\":\"foo\",\"probability\":0.0637541," + "\"by_field_name\":\"airline\",\"by_field_value\":\"JZA\", \"typical\":[1020.08],\"actual\":[1042.14]," + "\"field_name\":\"responsetime\",\"function\":\"max\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"}," - + "{\"job_id\":\"foo\",\"probability\":0.00748292,\"by_field_name\":\"airline\",\"by_field_value\":\"AMX\", " - + "\"typical\":[20.2137],\"actual\":[22.8855],\"field_name\":\"responsetime\",\"function\":\"max\"," + - "\"partition_field_name\":\"\"," - + " \"partition_field_value\":\"\"},{\"job_id\":\"foo\",\"probability\":0.023494,\"by_field_name\":\"airline\"," + + "{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":2,\"job_id\":\"foo\",\"probability\":0.00748292," + + "\"by_field_name\":\"airline\",\"by_field_value\":\"AMX\", " + + "\"typical\":[20.2137],\"actual\":[22.8855],\"field_name\":\"responsetime\",\"function\":\"max\"," + + "\"partition_field_name\":\"\",\"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22," + + "\"sequence_num\":3,\"job_id\":\"foo\",\"probability\":0.023494,\"by_field_name\":\"airline\"," + "\"by_field_value\":\"DAL\", \"typical\":[382.177],\"actual\":[358.934],\"field_name\":\"responsetime\",\"function\":\"min\"," - + "\"partition_field_name\":\"\", \"partition_field_value\":\"\"},{\"job_id\":\"foo\",\"probability\":0.0473552," - + "\"by_field_name\":\"airline\",\"by_field_value\":\"SWA\", \"typical\":[152.148],\"actual\":[96.6425]," - + "\"field_name\":\"responsetime\",\"function\":\"min\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"}]," + + "\"partition_field_name\":\"\", \"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22," + + "\"sequence_num\":4,\"job_id\":\"foo\"," + + "\"probability\":0.0473552,\"by_field_name\":\"airline\",\"by_field_value\":\"SWA\", \"typical\":[152.148]," + + "\"actual\":[96.6425],\"field_name\":\"responsetime\",\"function\":\"min\",\"partition_field_name\":\"\"," + + "\"partition_field_value\":\"\"}]," + "\"initial_anomaly_score\":0.0140005, \"anomaly_score\":20.22688, \"max_normalized_probability\":10.5688, \"record_count\":4," - + "\"event_count\":820,\"bucket_influencers\":[{\"job_id\":\"foo\", \"raw_anomaly_score\":" - + "0.0140005, \"probability\":0.01,\"influencer_field_name\":\"bucket_time\",\"initial_anomaly_score\":20.22688" - + ",\"anomaly_score\":20.22688} ,{\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03," - + "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": " + - "{\"job_id\":\"foo\"," + + "\"event_count\":820,\"bucket_influencers\":[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":5," + + "\"job_id\":\"foo\", \"raw_anomaly_score\":0.0140005, \"probability\":0.01,\"influencer_field_name\":\"bucket_time\"," + + "\"initial_anomaly_score\":20.22688,\"anomaly_score\":20.22688} ,{\"timestamp\":1359453600000,\"bucket_span\":22," + + "\"sequence_num\":6,\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03," + + "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": " + + "{\"job_id\":\"foo\"," + "\"quantile_state\":\"[normaliser 1.2, normaliser 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ," + "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.3, normaliser 2.3]\"}} ]"; public static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\"," + - "\"function\":\"max\"," + + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\"," + + "\"sequence_num\":1,\"function\":\"max\"," + "\"causes\":[{\"probability\":1.38951e-08,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," + "\"over_field_value\":\"mail.google.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[9.19027e+07]}]," + "\"normalized_probability\":100,\"anomaly_score\":44.7324},{\"probability\":3.86587e-07,\"field_name\":\"sum_cs_bytes_\"," diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java index 058ad9a9c1f..89bb4d75041 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java @@ -46,7 +46,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase(size); for (int i = 0; i < size; i++) { - AnomalyRecord record = new AnomalyRecord(jobId); + AnomalyRecord record = new AnomalyRecord(jobId, new Date(randomLong()), randomPositiveLong(), i + 1); record.setProbability(randomDoubleBetween(0.0, 1.0, true)); records.add(record); } @@ -56,7 +56,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase(size); for (int i = 0; i < size; i++) { - Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10)); + Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10), + new Date(randomLong()), randomPositiveLong(), i + 1); influencer.setProbability(randomDoubleBetween(0.0, 1.0, true)); influencers.add(influencer); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencerTests.java index 34ef778489f..26dd6a59937 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencerTests.java @@ -16,7 +16,8 @@ public class BucketInfluencerTests extends AbstractSerializingTestCase { @Override protected Bucket createTestInstance() { + int sequenceNum = 1; String jobId = "foo"; Bucket bucket = new Bucket(jobId, new Date(randomLong()), randomPositiveLong()); @@ -32,7 +33,7 @@ public class BucketTests extends AbstractSerializingTestCase { int size = randomInt(10); List bucketInfluencers = new ArrayList<>(size); for (int i = 0; i < size; i++) { - BucketInfluencer bucketInfluencer = new BucketInfluencer(jobId); + BucketInfluencer bucketInfluencer = new BucketInfluencer(jobId, new Date(), 600, i + 1); bucketInfluencer.setAnomalyScore(randomDouble()); bucketInfluencer.setInfluencerFieldName(randomAsciiOfLengthBetween(1, 20)); bucketInfluencer.setInitialAnomalyScore(randomDouble()); @@ -81,14 +82,12 @@ public class BucketTests extends AbstractSerializingTestCase { int size = randomInt(10); List records = new ArrayList<>(size); for (int i = 0; i < size; i++) { - AnomalyRecord anomalyRecord = new AnomalyRecord(jobId); + AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, bucket.getTimestamp(), bucket.getBucketSpan(), sequenceNum++); anomalyRecord.setAnomalyScore(randomDouble()); anomalyRecord.setActual(Collections.singletonList(randomDouble())); anomalyRecord.setTypical(Collections.singletonList(randomDouble())); anomalyRecord.setProbability(randomDouble()); - anomalyRecord.setId(randomAsciiOfLengthBetween(1, 20)); anomalyRecord.setInterim(randomBoolean()); - anomalyRecord.setTimestamp(new Date(randomLong())); records.add(anomalyRecord); } bucket.setRecords(records); @@ -167,7 +166,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testEquals_GivenOneHasRecordsAndTheOtherDoesNot() { Bucket bucket1 = new Bucket("foo", new Date(123), 123); - bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo"))); + bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1))); Bucket bucket2 = new Bucket("foo", new Date(123), 123); bucket2.setRecords(null); @@ -177,18 +176,19 @@ public class BucketTests extends AbstractSerializingTestCase { public void testEquals_GivenDifferentNumberOfRecords() { Bucket bucket1 = new Bucket("foo", new Date(123), 123); - bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo"))); + bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1))); Bucket bucket2 = new Bucket("foo", new Date(123), 123); - bucket2.setRecords(Arrays.asList(new AnomalyRecord("foo"), new AnomalyRecord("foo"))); + bucket2.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1), + new AnomalyRecord("foo", new Date(123), 123, 2))); assertFalse(bucket1.equals(bucket2)); assertFalse(bucket2.equals(bucket1)); } public void testEquals_GivenSameNumberOfRecordsButDifferent() { - AnomalyRecord anomalyRecord1 = new AnomalyRecord("foo"); + AnomalyRecord anomalyRecord1 = new AnomalyRecord("foo", new Date(123), 123, 1); anomalyRecord1.setAnomalyScore(1.0); - AnomalyRecord anomalyRecord2 = new AnomalyRecord("foo"); + AnomalyRecord anomalyRecord2 = new AnomalyRecord("foo", new Date(123), 123, 2); anomalyRecord1.setAnomalyScore(2.0); Bucket bucket1 = new Bucket("foo", new Date(123), 123); @@ -212,13 +212,12 @@ public class BucketTests extends AbstractSerializingTestCase { public void testEquals_GivenDifferentBucketInfluencers() { Bucket bucket1 = new Bucket("foo", new Date(123), 123); - BucketInfluencer influencer1 = new BucketInfluencer("foo"); + BucketInfluencer influencer1 = new BucketInfluencer("foo", new Date(123), 123, 1); influencer1.setInfluencerFieldName("foo"); bucket1.addBucketInfluencer(influencer1); - ; Bucket bucket2 = new Bucket("foo", new Date(123), 123); - BucketInfluencer influencer2 = new BucketInfluencer("foo"); + BucketInfluencer influencer2 = new BucketInfluencer("foo", new Date(123), 123, 2); influencer2.setInfluencerFieldName("bar"); bucket2.addBucketInfluencer(influencer2); @@ -227,8 +226,8 @@ public class BucketTests extends AbstractSerializingTestCase { } public void testEquals_GivenEqualBuckets() { - AnomalyRecord record = new AnomalyRecord("job_id"); - BucketInfluencer bucketInfluencer = new BucketInfluencer("foo"); + AnomalyRecord record = new AnomalyRecord("job_id", new Date(123), 123, 1); + BucketInfluencer bucketInfluencer = new BucketInfluencer("foo", new Date(123), 123, 1); Date date = new Date(); Bucket bucket1 = new Bucket("foo", date, 123); @@ -274,7 +273,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.addBucketInfluencer(new BucketInfluencer("foo")); + bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1)); bucket.setAnomalyScore(0.0); bucket.setRecordCount(0); @@ -283,7 +282,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.addBucketInfluencer(new BucketInfluencer("foo")); + bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1)); bucket.setAnomalyScore(0.0); bucket.setRecordCount(1); @@ -292,7 +291,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.addBucketInfluencer(new BucketInfluencer("foo")); + bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1)); bucket.setAnomalyScore(1.0); bucket.setRecordCount(0); @@ -301,7 +300,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.addBucketInfluencer(new BucketInfluencer("foo")); + bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1)); bucket.setAnomalyScore(1.0); bucket.setRecordCount(1); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java index a60eaed2df3..47fd87012c2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java @@ -10,11 +10,14 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; +import java.util.Date; + public class InfluencerTests extends AbstractSerializingTestCase { @Override protected Influencer createTestInstance() { - return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20)); + return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), + new Date(), randomPositiveLong(), randomIntBetween(1, 1000)); } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java index 2440439911d..7135d52ffd3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; import org.joda.time.DateTime; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCase { @@ -24,7 +25,8 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa pps.add(new PerPartitionMaxProbabilities.PartitionProbability(randomAsciiOfLength(12), randomDouble())); } - return new PerPartitionMaxProbabilities(randomAsciiOfLength(20), new DateTime(randomDateTimeZone()).toDate(), pps); + return new PerPartitionMaxProbabilities(randomAsciiOfLength(20), new DateTime(randomDateTimeZone()).toDate(), + randomPositiveLong(), pps); } @Override @@ -74,7 +76,7 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa } private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double normalizedProbability) { - AnomalyRecord record = new AnomalyRecord("foo"); + AnomalyRecord record = new AnomalyRecord("foo", new Date(), 600, 1); record.setPartitionFieldValue(partitionFieldValue); record.setNormalizedProbability(normalizedProbability); return record; diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_influencers.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_influencers.yaml index 3e20a41d14d..275ab37b6c0 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_influencers.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_influencers.yaml @@ -18,7 +18,7 @@ setup: index: index: prelertresults-farequote type: result - id: 1 + id: farequote_1464739200000_1_1 body: { "job_id": "farequote", @@ -26,14 +26,16 @@ setup: "influencer_field_name": "foo", "influencer_field_value": "bar", "anomaly_score": 80.0, - "result_type" : "influencer" + "result_type" : "influencer", + "bucket_span" : 1, + "sequence_num" : 1 } - do: index: index: prelertresults-farequote type: result - id: 2 + id: farequote_1464825600000_1_2 body: { "job_id": "farequote", @@ -41,7 +43,9 @@ setup: "influencer_field_name": "foo", "influencer_field_value": "zoo", "anomaly_score": 50.0, - "result_type" : "influencer" + "result_type" : "influencer", + "bucket_span" : 1, + "sequence_num" : 2 } - do: indices.refresh: diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml index 5e509b8894b..095f3481282 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml @@ -31,26 +31,30 @@ setup: index: index: prelertresults-farequote type: result - id: 2 + id: farequote_1464739200000_1_1 body: { "job_id": "farequote", "result_type": "record", "timestamp": "2016-06-01T00:00:00Z", - "anomaly_score": 60.0 + "anomaly_score": 60.0, + "bucket_span": 1, + "sequence_num": 1 } - do: index: index: prelertresults-farequote type: result - id: 3 + id: farequote_1464825600000_1_2 body: { "job_id": "farequote", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z", - "anomaly_score": 80.0 + "anomaly_score": 80.0, + "bucket_span": 1, + "sequence_num": 2 } - do: diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index 6b2dee8b7b8..33e38d9d40c 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -81,35 +81,37 @@ setup: index: index: prelertresults-foo type: result - id: "foo_1464825600000_1_record" - body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z" } + id: "foo_1464825600000_1_1" + body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z", "bucket_span":1, "sequence_num":1 } - do: index: index: prelertresults-foo type: result - id: "foo_1462060800000_1_record" - body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z" } + id: "foo_1462060800000_1_2" + body: { "job_id": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z", "bucket_span":1, "sequence_num":2 } - do: index: index: prelertresults-foo type: result - id: "foo_1464825600000_1_influencer" + id: "foo_1464825600000_1_3" body: { "job_id": "foo", "result_type": "influencer", "timestamp": "2016-06-02T00:00:00Z", "influencer_field_name": "foo", "influencer_field_value": "zoo", - "anomaly_score": 50.0 + "anomaly_score": 50.0, + "bucket_span": 1, + "sequence_num": 3 } - do: index: index: prelertresults-foo type: result - id: "foo_1462060800000_1_influencer" + id: "foo_1462060800000_1_4" body: { "job_id": "foo", @@ -117,7 +119,9 @@ setup: "timestamp": "2016-05-01T00:00:00Z", "influencer_field_name": "foo", "influencer_field_value": "zoo", - "anomaly_score": 50.0 + "anomaly_score": 50.0, + "bucket_span": 1, + "sequence_num": 4 } - do: