From 37cd03ad4d03aa313c40cf0514aa0ff82bcceaac Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 25 Nov 2016 17:54:24 +0000 Subject: [PATCH] Split records and influencers from bucket (elastic/elasticsearch#389) In c++ the results are built all together under a bucket hierarchy. This buckets was written out and java would read it and split the bucket into its parts: the bucket itself, its records and its influencers. During the migration, the bucket started being persisted as a whole, including its records and influencers. This commit is changing this by modifying the way results are written in c++. This way, the java and c++ results writing/reading are in sync. To achieve this, the change involved writing records and influencers as top level results from c++. In addition, they are written as an array object in order to allow the java side to persist them in a bulk request. * Fix bucket counting in results processor Original commit: elastic/x-pack-elasticsearch@feadf3f887654063e3ab511ce38b36cb73f764cb --- .../job/persistence/JobRenormaliser.java | 4 +- .../job/persistence/JobResultsPersister.java | 118 ++++++++----- .../BlackHoleAutodetectProcess.java | 2 +- .../output/AutoDetectResultProcessor.java | 17 +- .../prelert/job/results/AutodetectResult.java | 143 +++++++++------- .../xpack/prelert/job/results/Bucket.java | 34 ++-- .../job/results/ReservedFieldNames.java | 1 - .../action/GetBucketActionResponseTests.java | 17 -- .../persistence/JobResultsPersisterTests.java | 162 +++++++++--------- .../AutoDetectResultProcessorTests.java | 40 +++++ .../job/results/AutodetectResultTests.java | 27 ++- .../prelert/job/results/BucketTests.java | 31 ---- 12 files changed, 336 insertions(+), 260 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java index 6722920fddb..05976b6a4fe 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java @@ -48,7 +48,7 @@ public class JobRenormaliser extends AbstractComponent { String indexName = JobResultsPersister.getJobIndexName(jobId); logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, Bucket.TYPE, indexName, bucket.getId()); client.prepareIndex(indexName, Bucket.TYPE.getPreferredName(), bucket.getId()) - .setSource(jobResultsPersister.serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket)).execute().actionGet(); + .setSource(jobResultsPersister.toXContentBuilder(bucket)).execute().actionGet(); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e)); return; @@ -88,7 +88,7 @@ public class JobRenormaliser extends AbstractComponent { bulkRequest.add( client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName(), recordId) - .setSource(jobResultsPersister.serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record))); + .setSource(jobResultsPersister.toXContentBuilder(record))); addedAny = true; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index 4341bb63704..7a35cb72dc1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -69,13 +69,9 @@ public class JobResultsPersister extends AbstractComponent { * Persist the result bucket */ public void persistBucket(Bucket bucket) { - if (bucket.getRecords() == null) { - return; - } - String jobId = bucket.getJobId(); try { - XContentBuilder content = serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket); + XContentBuilder content = toXContentBuilder(bucket); String indexName = getJobIndexName(jobId); logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}", jobId, Bucket.TYPE, indexName, bucket.getEpoch()); IndexResponse response = client.prepareIndex(indexName, Bucket.TYPE.getPreferredName()) @@ -84,43 +80,71 @@ public class JobResultsPersister extends AbstractComponent { bucket.setId(response.getId()); persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(), bucket.isInterim()); - if (bucket.getInfluencers() != null && bucket.getInfluencers().isEmpty() == false) { - BulkRequestBuilder addInfluencersRequest = client.prepareBulk(); - for (Influencer influencer : bucket.getInfluencers()) { - influencer.setTimestamp(bucket.getTimestamp()); - influencer.setInterim(bucket.isInterim()); - content = serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer); - logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID", - jobId, Influencer.TYPE, indexName); - addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName()) - .setSource(content)); - } - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions()); - BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet(); - if (addInfluencersResponse.hasFailures()) { - logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage()); - } - } - if (bucket.getRecords().isEmpty() == false) { - BulkRequestBuilder addRecordsRequest = client.prepareBulk(); - for (AnomalyRecord record : bucket.getRecords()) { - record.setTimestamp(bucket.getTimestamp()); - content = serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record); - logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID, for bucket {}", - jobId, AnomalyRecord.TYPE, indexName, bucket.getId()); - addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName()) - .setSource(content)); - } - - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions()); - BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet(); - if (addRecordsResponse.hasFailures()) { - logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); - } - } persistPerPartitionMaxProbabilities(bucket); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error writing bucket state", new Object[] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e)); + } + } + + /** + * Persist a list of anomaly records + * @param records the records to persist + */ + public void persistRecords(List records) { + if (records.isEmpty()) { + return; + } + String jobId = records.get(0).getJobId(); + String indexName = getJobIndexName(jobId); + BulkRequestBuilder addRecordsRequest = client.prepareBulk(); + XContentBuilder content = null; + try { + for (AnomalyRecord record : records) { + content = toXContentBuilder(record); + + logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID", jobId, AnomalyRecord.TYPE, indexName); + addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName()).setSource(content)); + } + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object[] {jobId}, e)); + return; + } + + logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions()); + BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet(); + if (addRecordsResponse.hasFailures()) { + logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); + } + } + + /** + * Persist a list of influencers + * @param influencers the influencers to persist + */ + public void persistInfluencers(List influencers) { + if (influencers.isEmpty()) { + return; + } + String jobId = influencers.get(0).getJobId(); + String indexName = getJobIndexName(jobId); + BulkRequestBuilder addInfluencersRequest = client.prepareBulk(); + XContentBuilder content = null; + try { + for (Influencer influencer : influencers) { + content = toXContentBuilder(influencer); + logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID", + jobId, Influencer.TYPE, indexName); + addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName()).setSource(content)); + } + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error persisting influencers", new Object[] {jobId}, e)); + return; + } + + logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions()); + BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet(); + if (addInfluencersResponse.hasFailures()) { + logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage()); } } @@ -141,7 +165,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE::getPreferredName, - () -> Quantiles.QUANTILES_ID, () -> serialiseWithJobId(Quantiles.TYPE.getPreferredName(), quantiles)); + () -> Quantiles.QUANTILES_ID, () -> toXContentBuilder(quantiles)); if (persistable.persist()) { // Refresh the index when persisting quantiles so that previously // persisted results will be available for searching. Do this using the @@ -157,7 +181,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE::getPreferredName, - modelSnapshot::getSnapshotId, () -> serialiseWithJobId(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot)); + modelSnapshot::getSnapshotId, () -> toXContentBuilder(modelSnapshot)); persistable.persist(); } @@ -168,10 +192,10 @@ public class JobResultsPersister extends AbstractComponent { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName, - () -> jobId, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats)); + () -> jobId, () -> toXContentBuilder(modelSizeStats)); persistable.persist(); persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName, - () -> null, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats)); + () -> null, () -> toXContentBuilder(modelSizeStats)); persistable.persist(); // Don't commit as we expect masses of these updates and they're only // for information at the API level @@ -182,7 +206,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) { Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, ModelDebugOutput.TYPE::getPreferredName, - () -> null, () -> serialiseWithJobId(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput)); + () -> null, () -> toXContentBuilder(modelDebugOutput)); persistable.persist(); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -193,7 +217,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistInfluencer(Influencer influencer) { Persistable persistable = new Persistable(influencer.getJobId(), influencer, Influencer.TYPE::getPreferredName, - influencer::getId, () -> serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer)); + influencer::getId, () -> toXContentBuilder(influencer)); persistable.persist(); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -251,7 +275,7 @@ public class JobResultsPersister extends AbstractComponent { } - XContentBuilder serialiseWithJobId(String objField, ToXContent obj) throws IOException { + XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { XContentBuilder builder = jsonBuilder(); obj.toXContent(builder, ToXContent.EMPTY_PARAMS); return builder; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java index 84c7d6de765..6cf34099b02 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -78,7 +78,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public String flushJob(InterimResultsParams params) throws IOException { FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID); - AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, flushAcknowledgement); + AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); builder.value(result); pipedProcessOutStream.write(builder.string().getBytes(StandardCharsets.UTF_8)); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index 98cfd3b47a4..adb6748fbbf 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -14,14 +14,17 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser; import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; +import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition; +import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import java.io.InputStream; import java.time.Duration; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -69,8 +72,10 @@ public class AutoDetectResultProcessor { while (iterator.hasNext()) { AutodetectResult result = iterator.next(); processResult(context, result); - bucketCount++; - LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); + if (result.getBucket() != null) { + bucketCount++; + LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); + } } LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount); LOGGER.info("[{}] Parse results Complete", jobId); @@ -102,6 +107,14 @@ public class AutoDetectResultProcessor { } persister.persistBucket(bucket); } + List records = result.getRecords(); + if (records != null && !records.isEmpty()) { + persister.persistRecords(records); + } + List influencers = result.getInfluencers(); + if (influencers != null && !influencers.isEmpty()) { + persister.persistInfluencers(influencers); + } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { persister.persistCategoryDefinition(categoryDefinition); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResult.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResult.java index f81b70bc659..94b4534dab8 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResult.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResult.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; @@ -19,19 +20,25 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknow import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; import java.io.IOException; +import java.util.List; import java.util.Objects; public class AutodetectResult extends ToXContentToBytes implements Writeable { public static final ParseField TYPE = new ParseField("autodetect_result"); + public static final ParseField RECORDS = new ParseField("records"); + public static final ParseField INFLUENCERS = new ParseField("influencers"); + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (Quantiles) a[1], (ModelSnapshot) a[2], - a[3] == null ? null : ((ModelSizeStats.Builder) a[3]).build(), (ModelDebugOutput) a[4], (CategoryDefinition) a[5], - (FlushAcknowledgement) a[6])); + TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List) a[1], (List) a[2], + (Quantiles) a[3], (ModelSnapshot) a[4], a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(), + (ModelDebugOutput) a[6], (CategoryDefinition) a[7], (FlushAcknowledgement) a[8])); static { PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.TYPE); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), AnomalyRecord.PARSER, RECORDS); + PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Influencer.PARSER, INFLUENCERS); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Quantiles.PARSER, Quantiles.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSnapshot.PARSER, ModelSnapshot.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, ModelSizeStats.TYPE); @@ -41,6 +48,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { } private final Bucket bucket; + private final List records; + private final List influencers; private final Quantiles quantiles; private final ModelSnapshot modelSnapshot; private final ModelSizeStats modelSizeStats; @@ -48,9 +57,12 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { private final CategoryDefinition categoryDefinition; private final FlushAcknowledgement flushAcknowledgement; - public AutodetectResult(Bucket bucket, Quantiles quantiles, ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, - ModelDebugOutput modelDebugOutput, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) { + public AutodetectResult(Bucket bucket, List records, List influencers, Quantiles quantiles, + ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelDebugOutput modelDebugOutput, + CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) { this.bucket = bucket; + this.records = records; + this.influencers = influencers; this.quantiles = quantiles; this.modelSnapshot = modelSnapshot; this.modelSizeStats = modelSizeStats; @@ -65,6 +77,16 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { } else { this.bucket = null; } + if (in.readBoolean()) { + this.records = in.readList(AnomalyRecord::new); + } else { + this.records = null; + } + if (in.readBoolean()) { + this.influencers = in.readList(Influencer::new); + } else { + this.influencers = null; + } if (in.readBoolean()) { this.quantiles = new Quantiles(in); } else { @@ -99,75 +121,73 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { @Override public void writeTo(StreamOutput out) throws IOException { - boolean hasBucket = bucket != null; - out.writeBoolean(hasBucket); - if (hasBucket) { - bucket.writeTo(out); + writeNullable(bucket, out); + writeNullable(records, out); + writeNullable(influencers, out); + writeNullable(quantiles, out); + writeNullable(modelSnapshot, out); + writeNullable(modelSizeStats, out); + writeNullable(modelDebugOutput, out); + writeNullable(categoryDefinition, out); + writeNullable(flushAcknowledgement, out); + } + + private static void writeNullable(Writeable writeable, StreamOutput out) throws IOException { + boolean isPresent = writeable != null; + out.writeBoolean(isPresent); + if (isPresent) { + writeable.writeTo(out); } - boolean hasQuantiles = quantiles != null; - out.writeBoolean(hasQuantiles); - if (hasQuantiles) { - quantiles.writeTo(out); - } - boolean hasModelSnapshot = modelSnapshot != null; - out.writeBoolean(hasModelSnapshot); - if (hasModelSnapshot) { - modelSnapshot.writeTo(out); - } - boolean hasModelSizeStats = modelSizeStats != null; - out.writeBoolean(hasModelSizeStats); - if (hasModelSizeStats) { - modelSizeStats.writeTo(out); - } - boolean hasModelDebugOutput = modelDebugOutput != null; - out.writeBoolean(hasModelDebugOutput); - if (hasModelDebugOutput) { - modelDebugOutput.writeTo(out); - } - boolean hasCategoryDefinition = categoryDefinition != null; - out.writeBoolean(hasCategoryDefinition); - if (hasCategoryDefinition) { - categoryDefinition.writeTo(out); - } - boolean hasFlushAcknowledgement = flushAcknowledgement != null; - out.writeBoolean(hasFlushAcknowledgement); - if (hasFlushAcknowledgement) { - flushAcknowledgement.writeTo(out); + } + + private static void writeNullable(List writeables, StreamOutput out) throws IOException { + boolean isPresent = writeables != null; + out.writeBoolean(isPresent); + if (isPresent) { + out.writeList(writeables); } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (bucket != null) { - builder.field(Bucket.TYPE.getPreferredName(), bucket); - } - if (quantiles != null) { - builder.field(Quantiles.TYPE.getPreferredName(), quantiles); - } - if (modelSnapshot != null) { - builder.field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot); - } - if (modelSizeStats != null) { - builder.field(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats); - } - if (modelDebugOutput != null) { - builder.field(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput); - } - if (categoryDefinition != null) { - builder.field(CategoryDefinition.TYPE.getPreferredName(), categoryDefinition); - } - if (flushAcknowledgement != null) { - builder.field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement); - } + addNullableField(Bucket.TYPE, bucket, builder); + addNullableField(RECORDS, records, builder); + addNullableField(INFLUENCERS, influencers, builder); + addNullableField(Quantiles.TYPE, quantiles, builder); + addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder); + addNullableField(ModelSizeStats.TYPE, modelSizeStats, builder); + addNullableField(ModelDebugOutput.TYPE, modelDebugOutput, builder); + addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder); + addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder); builder.endObject(); return builder; } + private static void addNullableField(ParseField field, ToXContent value, XContentBuilder builder) throws IOException { + if (value != null) { + builder.field(field.getPreferredName(), value); + } + } + + private static void addNullableField(ParseField field, List values, XContentBuilder builder) throws IOException { + if (values != null) { + builder.field(field.getPreferredName(), values); + } + } + public Bucket getBucket() { return bucket; } + public List getRecords() { + return records; + } + + public List getInfluencers() { + return influencers; + } + public Quantiles getQuantiles() { return quantiles; } @@ -194,7 +214,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { @Override public int hashCode() { - return Objects.hash(bucket, categoryDefinition, flushAcknowledgement, modelDebugOutput, modelSizeStats, modelSnapshot, quantiles); + return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelDebugOutput, modelSizeStats, + modelSnapshot, quantiles); } @Override @@ -207,6 +228,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable { } AutodetectResult other = (AutodetectResult) obj; return Objects.equals(bucket, other.bucket) && + Objects.equals(records, other.records) && + Objects.equals(influencers, other.influencers) && Objects.equals(categoryDefinition, other.categoryDefinition) && Objects.equals(flushAcknowledgement, other.flushAcknowledgement) && Objects.equals(modelDebugOutput, other.modelDebugOutput) && diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java index 341d1d5a116..ff9f0265db7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java @@ -6,14 +6,15 @@ package org.elasticsearch.xpack.prelert.job.results; import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.xpack.prelert.utils.time.TimeUtils; @@ -45,7 +46,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { public static final ParseField EVENT_COUNT = new ParseField("eventCount"); public static final ParseField RECORDS = new ParseField("records"); public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucketInfluencers"); - public static final ParseField INFLUENCERS = new ParseField("influencers"); public static final ParseField BUCKET_SPAN = new ParseField("bucketSpan"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processingTimeMs"); public static final ParseField PARTITION_SCORES = new ParseField("partitionScores"); @@ -79,7 +79,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { PARSER.declareLong(Bucket::setEventCount, EVENT_COUNT); PARSER.declareObjectArray(Bucket::setRecords, AnomalyRecord.PARSER, RECORDS); PARSER.declareObjectArray(Bucket::setBucketInfluencers, BucketInfluencer.PARSER, BUCKET_INFLUENCERS); - PARSER.declareObjectArray(Bucket::setInfluencers, Influencer.PARSER, INFLUENCERS); PARSER.declareLong(Bucket::setBucketSpan, BUCKET_SPAN); PARSER.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS); PARSER.declareObjectArray(Bucket::setPartitionScores, PartitionScore.PARSER, PARTITION_SCORES); @@ -100,7 +99,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { private boolean isInterim; private boolean hadBigNormalisedUpdate; private List bucketInfluencers = new ArrayList<>(); - private List influencers = Collections.emptyList(); private long processingTimeMs; private Map perPartitionMaxProbability = Collections.emptyMap(); private List partitionScores = Collections.emptyList(); @@ -126,7 +124,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { isInterim = in.readBoolean(); hadBigNormalisedUpdate = in.readBoolean(); bucketInfluencers = in.readList(BucketInfluencer::new); - influencers = in.readList(Influencer::new); processingTimeMs = in.readLong(); perPartitionMaxProbability = (Map) in.readGenericValue(); partitionScores = in.readList(PartitionScore::new); @@ -151,7 +148,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { out.writeBoolean(isInterim); out.writeBoolean(hadBigNormalisedUpdate); out.writeList(bucketInfluencers); - out.writeList(influencers); out.writeLong(processingTimeMs); out.writeGenericValue(perPartitionMaxProbability); out.writeList(partitionScores); @@ -169,11 +165,12 @@ public class Bucket extends ToXContentToBytes implements Writeable { builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore); builder.field(MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability); builder.field(RECORD_COUNT.getPreferredName(), recordCount); - builder.field(RECORDS.getPreferredName(), records); + if (records != null && !records.isEmpty()) { + builder.field(RECORDS.getPreferredName(), records); + } builder.field(EVENT_COUNT.getPreferredName(), eventCount); builder.field(IS_INTERIM.getPreferredName(), isInterim); builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers); - builder.field(INFLUENCERS.getPreferredName(), influencers); builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs); builder.field(PARTITION_SCORES.getPreferredName(), partitionScores); builder.endObject(); @@ -256,10 +253,15 @@ public class Bucket extends ToXContentToBytes implements Writeable { } /** - * Get all the anomaly records associated with this bucket + * Get all the anomaly records associated with this bucket. + * The records are not part of the bucket document. They will + * only be present when the bucket was retrieved and expanded + * to contain the associated records. * - * @return All the anomaly records + * @return null or the anomaly records for the bucket + * if the bucket was expanded. */ + @Nullable public List getRecords() { return records; } @@ -295,14 +297,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { processingTimeMs = timeMs; } - public List getInfluencers() { - return influencers; - } - - public void setInfluencers(List influences) { - this.influencers = influences; - } - public List getBucketInfluencers() { return bucketInfluencers; } @@ -377,7 +371,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { // hadBigNormalisedUpdate is deliberately excluded from the hash // as is id, which is generated by the datastore return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records, - isInterim, bucketSpan, bucketInfluencers, influencers); + isInterim, bucketSpan, bucketInfluencers); } /** @@ -402,7 +396,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) && (this.maxNormalizedProbability == that.maxNormalizedProbability) && (this.recordCount == that.recordCount) && Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim) - && Objects.equals(this.bucketInfluencers, that.bucketInfluencers) && Objects.equals(this.influencers, that.influencers); + && Objects.equals(this.bucketInfluencers, that.bucketInfluencers); } public boolean hadBigNormalisedUpdate() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java index e3dc116276f..9e11819eded 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java @@ -93,7 +93,6 @@ public final class ReservedFieldNames { Bucket.EVENT_COUNT.getPreferredName(), Bucket.RECORDS.getPreferredName(), Bucket.BUCKET_INFLUENCERS.getPreferredName(), - Bucket.INFLUENCERS.getPreferredName(), Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(), Bucket.PROCESSING_TIME_MS.getPreferredName(), Bucket.PARTITION_SCORES.getPreferredName(), diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java index 7b8f926c197..560cb92a2f2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetBucketActionResponseTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer; -import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.PartitionScore; import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase; @@ -57,22 +56,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase influencers = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), - randomAsciiOfLengthBetween(1, 20)); - influencer.setAnomalyScore(randomDouble()); - influencer.setInitialAnomalyScore(randomDouble()); - influencer.setProbability(randomDouble()); - influencer.setId(randomAsciiOfLengthBetween(1, 20)); - influencer.setInterim(randomBoolean()); - influencer.setTimestamp(new Date(randomLong())); - influencers.add(influencer); - } - bucket.setInfluencers(influencers); - } if (randomBoolean()) { bucket.setInitialAnomalyScore(randomDouble()); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index 54c94011165..82bfb421c91 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -5,27 +5,24 @@ */ package org.elasticsearch.xpack.prelert.job.persistence; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; -import org.mockito.ArgumentCaptor; - import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer; import org.elasticsearch.xpack.prelert.job.results.Influencer; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import static org.mockito.Mockito.mock; public class JobResultsPersisterTests extends ESTestCase { @@ -33,28 +30,19 @@ public class JobResultsPersisterTests extends ESTestCase { private static final String CLUSTER_NAME = "myCluster"; private static final String JOB_ID = "foo"; - public void testPersistBucket_NoRecords() { - Client client = mock(Client.class); - Bucket bucket = mock(Bucket.class); - when(bucket.getRecords()).thenReturn(null); - JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.persistBucket(bucket); - verifyNoMoreInteractions(client); - } - public void testPersistBucket_OneRecord() throws IOException { ArgumentCaptor captor = ArgumentCaptor.forClass(XContentBuilder.class); BulkResponse response = mock(BulkResponse.class); String responseId = "abcXZY54321"; MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) .prepareIndex("prelertresults-" + JOB_ID, Bucket.TYPE.getPreferredName(), responseId, captor) - .prepareIndex("prelertresults-" + JOB_ID, AnomalyRecord.TYPE.getPreferredName(), "", captor) .prepareIndex("prelertresults-" + JOB_ID, BucketInfluencer.TYPE.getPreferredName(), "", captor) - .prepareIndex("prelertresults-" + JOB_ID, Influencer.TYPE.getPreferredName(), "", captor) .prepareBulk(response); Client client = clientBuilder.build(); - Bucket bucket = getBucket(1); + Bucket bucket = new Bucket("foo"); + bucket.setId("1"); + bucket.setTimestamp(new Date()); bucket.setId(responseId); bucket.setAnomalyScore(99.9); bucket.setBucketSpan(123456); @@ -72,44 +60,15 @@ public class JobResultsPersisterTests extends ESTestCase { bi.setRawAnomalyScore(19.19); bucket.addBucketInfluencer(bi); - Influencer inf = new Influencer("jobname", "infName1", "infValue1"); - inf.setAnomalyScore(16); - inf.setId("infID"); - inf.setInitialAnomalyScore(55.5); - inf.setProbability(0.4); - inf.setTimestamp(bucket.getTimestamp()); - bucket.setInfluencers(Collections.singletonList(inf)); - - AnomalyRecord record = bucket.getRecords().get(0); - List actuals = new ArrayList<>(); - actuals.add(5.0); - actuals.add(5.1); - record.setActual(actuals); + // We are adding a record but it shouldn't be persisted as part of the bucket + AnomalyRecord record = new AnomalyRecord(JOB_ID); record.setAnomalyScore(99.8); - record.setBucketSpan(42); - record.setByFieldName("byName"); - record.setByFieldValue("byValue"); - record.setCorrelatedByFieldValue("testCorrelations"); - record.setDetectorIndex(3); - record.setFieldName("testFieldName"); - record.setFunction("testFunction"); - record.setFunctionDescription("testDescription"); - record.setInitialNormalizedProbability(23.4); - record.setNormalizedProbability(0.005); - record.setOverFieldName("overName"); - record.setOverFieldValue("overValue"); - record.setPartitionFieldName("partName"); - record.setPartitionFieldValue("partValue"); - record.setProbability(0.1); - List typicals = new ArrayList<>(); - typicals.add(0.44); - typicals.add(998765.3); - record.setTypical(typicals); + bucket.setRecords(Arrays.asList(record)); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); persister.persistBucket(bucket); List list = captor.getAllValues(); - assertEquals(4, list.size()); + assertEquals(2, list.size()); String s = list.get(0).string(); assertTrue(s.matches(".*anomalyScore.:99\\.9.*")); @@ -126,15 +85,50 @@ public class JobResultsPersisterTests extends ESTestCase { assertTrue(s.matches(".*initialAnomalyScore.:18\\.12.*")); assertTrue(s.matches(".*anomalyScore.:14\\.15.*")); assertTrue(s.matches(".*rawAnomalyScore.:19\\.19.*")); + } - s = list.get(2).string(); - assertTrue(s.matches(".*probability.:0\\.4.*")); - assertTrue(s.matches(".*influencerFieldName.:.infName1.*")); - assertTrue(s.matches(".*influencerFieldValue.:.infValue1.*")); - assertTrue(s.matches(".*initialAnomalyScore.:55\\.5.*")); - assertTrue(s.matches(".*anomalyScore.:16\\.0.*")); + public void testPersistRecords() throws IOException { + ArgumentCaptor captor = ArgumentCaptor.forClass(XContentBuilder.class); + BulkResponse response = mock(BulkResponse.class); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) + .prepareIndex("prelertresults-" + JOB_ID, AnomalyRecord.TYPE.getPreferredName(), "", captor) + .prepareBulk(response); + Client client = clientBuilder.build(); - s = list.get(3).string(); + List records = new ArrayList<>(); + AnomalyRecord r1 = new AnomalyRecord(JOB_ID); + records.add(r1); + List actuals = new ArrayList<>(); + actuals.add(5.0); + actuals.add(5.1); + r1.setActual(actuals); + r1.setAnomalyScore(99.8); + r1.setBucketSpan(42); + r1.setByFieldName("byName"); + r1.setByFieldValue("byValue"); + r1.setCorrelatedByFieldValue("testCorrelations"); + r1.setDetectorIndex(3); + r1.setFieldName("testFieldName"); + r1.setFunction("testFunction"); + r1.setFunctionDescription("testDescription"); + r1.setInitialNormalizedProbability(23.4); + r1.setNormalizedProbability(0.005); + r1.setOverFieldName("overName"); + r1.setOverFieldValue("overValue"); + r1.setPartitionFieldName("partName"); + r1.setPartitionFieldValue("partValue"); + r1.setProbability(0.1); + List typicals = new ArrayList<>(); + typicals.add(0.44); + typicals.add(998765.3); + r1.setTypical(typicals); + + JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); + persister.persistRecords(records); + List captured = captor.getAllValues(); + assertEquals(1, captured.size()); + + String s = captured.get(0).string(); assertTrue(s.matches(".*detectorIndex.:3.*")); assertTrue(s.matches(".*\"probability\":0\\.1.*")); assertTrue(s.matches(".*\"anomalyScore\":99\\.8.*")); @@ -155,18 +149,32 @@ public class JobResultsPersisterTests extends ESTestCase { assertTrue(s.matches(".*overFieldValue.:.overValue.*")); } - private Bucket getBucket(int numRecords) { - Bucket b = new Bucket(JOB_ID); - b.setId("1"); - b.setTimestamp(new Date()); - List records = new ArrayList<>(); - for (int i = 0; i < numRecords; ++i) { - AnomalyRecord r = new AnomalyRecord("foo"); - records.add(r); - } - b.setRecords(records); - return b; + public void testPersistInfluencers() throws IOException { + ArgumentCaptor captor = ArgumentCaptor.forClass(XContentBuilder.class); + BulkResponse response = mock(BulkResponse.class); + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME) + .prepareIndex("prelertresults-" + JOB_ID, Influencer.TYPE.getPreferredName(), "", captor) + .prepareBulk(response); + Client client = clientBuilder.build(); + + List influencers = new ArrayList<>(); + Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1"); + inf.setAnomalyScore(16); + inf.setId("infID"); + inf.setInitialAnomalyScore(55.5); + inf.setProbability(0.4); + influencers.add(inf); + + JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); + persister.persistInfluencers(influencers); + List captured = captor.getAllValues(); + assertEquals(1, captured.size()); + + String s = captured.get(0).string(); + assertTrue(s.matches(".*probability.:0\\.4.*")); + assertTrue(s.matches(".*influencerFieldName.:.infName1.*")); + assertTrue(s.matches(".*influencerFieldValue.:.infValue1.*")); + assertTrue(s.matches(".*initialAnomalyScore.:55\\.5.*")); + assertTrue(s.matches(".*anomalyScore.:16\\.0.*")); } - - } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index f939912d4b4..479c4934631 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -11,14 +11,18 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser; import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; +import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.AutodetectResult; import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition; +import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.utils.CloseableIterator; import org.mockito.InOrder; import java.io.InputStream; +import java.util.Arrays; +import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; @@ -102,6 +106,42 @@ public class AutoDetectResultProcessorTests extends ESTestCase { assertFalse(context.deleteInterimRequired); } + public void testProcessResult_records() { + Renormaliser renormaliser = mock(Renormaliser.class); + JobResultsPersister persister = mock(JobResultsPersister.class); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false); + context.deleteInterimRequired = false; + AutodetectResult result = mock(AutodetectResult.class); + AnomalyRecord record1 = new AnomalyRecord("foo"); + AnomalyRecord record2 = new AnomalyRecord("foo"); + List records = Arrays.asList(record1, record2); + when(result.getRecords()).thenReturn(records); + processor.processResult(context, result); + + verify(persister, times(1)).persistRecords(records); + verifyNoMoreInteractions(persister); + } + + public void testProcessResult_influencers() { + Renormaliser renormaliser = mock(Renormaliser.class); + JobResultsPersister persister = mock(JobResultsPersister.class); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false); + context.deleteInterimRequired = false; + AutodetectResult result = mock(AutodetectResult.class); + Influencer influencer1 = new Influencer("foo", "infField", "infValue"); + Influencer influencer2 = new Influencer("foo", "infField2", "infValue2"); + List influencers = Arrays.asList(influencer1, influencer2); + when(result.getInfluencers()).thenReturn(influencers); + processor.processResult(context, result); + + verify(persister, times(1)).persistInfluencers(influencers); + verifyNoMoreInteractions(persister); + } + public void testProcessResult_categoryDefinition() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java index d57f4146b47..abecbfee66e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AutodetectResultTests.java @@ -14,7 +14,9 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknow import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; +import java.util.ArrayList; import java.util.Date; +import java.util.List; public class AutodetectResultTests extends AbstractSerializingTestCase { @@ -26,6 +28,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase records = null; + List influencers = null; Quantiles quantiles; ModelSnapshot modelSnapshot; ModelSizeStats.Builder modelSizeStats; @@ -39,6 +43,25 @@ public class AutodetectResultTests extends AbstractSerializingTestCase(size); + for (int i = 0; i < size; i++) { + AnomalyRecord record = new AnomalyRecord(jobId); + record.setProbability(randomDoubleBetween(0.0, 1.0, true)); + records.add(record); + } + + } + if (randomBoolean()) { + int size = randomInt(10); + influencers = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10)); + influencer.setProbability(randomDoubleBetween(0.0, 1.0, true)); + influencers.add(influencer); + } + } if (randomBoolean()) { quantiles = new Quantiles(jobId, new Date(randomLong()), randomAsciiOfLengthBetween(1, 20)); } else { @@ -73,8 +96,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase { if (randomBoolean()) { bucket.setId(randomAsciiOfLengthBetween(1, 20)); } - if (randomBoolean()) { - int size = randomInt(10); - List influencers = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), - randomAsciiOfLengthBetween(1, 20)); - influencer.setAnomalyScore(randomDouble()); - influencer.setInitialAnomalyScore(randomDouble()); - influencer.setProbability(randomDouble()); - influencer.setId(randomAsciiOfLengthBetween(1, 20)); - influencer.setInterim(randomBoolean()); - influencer.setTimestamp(new Date(randomLong())); - influencers.add(influencer); - } - bucket.setInfluencers(influencers); - } if (randomBoolean()) { bucket.setInitialAnomalyScore(randomDouble()); } @@ -238,16 +222,6 @@ public class BucketTests extends AbstractSerializingTestCase { assertFalse(bucket2.equals(bucket1)); } - public void testEquals_GivenDifferentInfluencers() { - Bucket bucket1 = new Bucket("foo"); - Influencer influencer = new Influencer("foo", "inf_field", "inf_value"); - Bucket bucket2 = new Bucket("foo"); - bucket2.setInfluencers(Arrays.asList(influencer)); - - assertFalse(bucket1.equals(bucket2)); - assertFalse(bucket2.equals(bucket1)); - } - public void testEquals_GivenDifferentBucketInfluencers() { Bucket bucket1 = new Bucket("foo"); BucketInfluencer influencer1 = new BucketInfluencer("foo"); @@ -266,10 +240,7 @@ public class BucketTests extends AbstractSerializingTestCase { public void testEquals_GivenEqualBuckets() { AnomalyRecord record = new AnomalyRecord("jobId"); - Influencer influencer = new Influencer("jobId", "testField", "testValue"); BucketInfluencer bucketInfluencer = new BucketInfluencer("foo"); - influencer.setProbability(0.1); - influencer.setInitialAnomalyScore(10.0); Date date = new Date(); Bucket bucket1 = new Bucket("foo"); @@ -282,7 +253,6 @@ public class BucketTests extends AbstractSerializingTestCase { bucket1.setRecordCount(4); bucket1.setRecords(Arrays.asList(record)); bucket1.addBucketInfluencer(bucketInfluencer); - bucket1.setInfluencers(Arrays.asList(influencer)); bucket1.setTimestamp(date); Bucket bucket2 = new Bucket("foo"); @@ -295,7 +265,6 @@ public class BucketTests extends AbstractSerializingTestCase { bucket2.setRecordCount(4); bucket2.setRecords(Arrays.asList(record)); bucket2.addBucketInfluencer(bucketInfluencer); - bucket2.setInfluencers(Arrays.asList(influencer)); bucket2.setTimestamp(date); assertTrue(bucket1.equals(bucket2));