children = result.getChildren(childrenType);
+ if (!children.isEmpty()) {
+ double maxChildrenScore = 0.0;
+ for (Normalizable child : children) {
+ maxChildrenScore = Math.max(
+ mergeRecursively(scoresIter, result, hasBigChange, child),
+ maxChildrenScore);
+ }
+ hasBigChange |= result.setMaxChildrenScore(childrenType, maxChildrenScore);
+ }
+ }
+ return result.getNormalizedScore();
+ }
+
+ /**
+ * Encapsulate the logic for deciding whether a change to a normalized score
+ * is "big".
+ *
+ * Current logic is that a big change is a change of at least 1 or more than
+ * than 50% of the higher of the two values.
+ *
+ * @param oldVal The old value of the normalized score
+ * @param newVal The new value of the normalized score
+ * @return true if the update is considered "big"
+ */
+ private static boolean isBigUpdate(double oldVal, double newVal) {
+ if (Math.abs(oldVal - newVal) >= 1.0) {
+ return true;
+ }
+ if (oldVal > newVal) {
+ if (oldVal * 0.5 > newVal) {
+ return true;
+ }
+ } else {
+ if (newVal * 0.5 > oldVal) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerFactory.java
new file mode 100644
index 00000000000..754eb275b73
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+
+/**
+ * Factory interface for creating implementations of {@link Normalizer}
+ */
+public interface NormalizerFactory {
+ /**
+ * Create an implementation of {@link Normalizer}
+ *
+ * @param jobId The job ID
+ * @return The normalizer
+ */
+ Normalizer create(String jobId);
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcess.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcess.java
new file mode 100644
index 00000000000..3812328353f
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcess.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.xpack.prelert.job.process.normalizer.output.NormalizerResultHandler;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Interface representing the native C++ normalizer process
+ */
+public interface NormalizerProcess extends Closeable {
+
+ /**
+ * Write the record to normalizer. The record parameter should not be encoded
+ * (i.e. length encoded) the implementation will appy the corrrect encoding.
+ *
+ * @param record Plain array of strings, implementors of this class should
+ * encode the record appropriately
+ * @throws IOException If the write failed
+ */
+ void writeRecord(String[] record) throws IOException;
+
+ /**
+ * Create a result handler for this process's results.
+ * @return results handler
+ */
+ NormalizerResultHandler createNormalizedResultsHandler();
+
+ /**
+ * Returns true if the process still running.
+ * @return True if the process is still running
+ */
+ boolean isProcessAlive();
+
+ /**
+ * Read any content in the error output buffer.
+ * @return An error message or empty String if no error.
+ */
+ String readError();
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcessFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcessFactory.java
new file mode 100644
index 00000000000..1cc31cf00b4
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerProcessFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Factory interface for creating implementations of {@link NormalizerProcess}
+ */
+public interface NormalizerProcessFactory {
+ /**
+ * Create an implementation of {@link NormalizerProcess}
+ *
+ * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
+ * @return The process
+ */
+ NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, boolean perPartitionNormalization,
+ ExecutorService executorService);
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResult.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResult.java
new file mode 100644
index 00000000000..b1a63e8ff0a
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResult.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.action.support.ToXContentToBytes;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.ParseFieldMatcherSupplier;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Parse the output of the normaliser process, for example:
+ *
+ * {"probability":0.01,"normalized_score":2.2}
+ */
+public class NormalizerResult extends ToXContentToBytes implements Writeable {
+ static final ParseField LEVEL_FIELD = new ParseField("level");
+ static final ParseField PARTITION_FIELD_NAME_FIELD = new ParseField("partition_field_name");
+ static final ParseField PARTITION_FIELD_VALUE_FIELD = new ParseField("partition_field_value");
+ static final ParseField PERSON_FIELD_NAME_FIELD = new ParseField("person_field_name");
+ static final ParseField FUNCTION_NAME_FIELD = new ParseField("function_name");
+ static final ParseField VALUE_FIELD_NAME_FIELD = new ParseField("value_field_name");
+ static final ParseField PROBABILITY_FIELD = new ParseField("probability");
+ static final ParseField NORMALIZED_SCORE_FIELD = new ParseField("normalized_score");
+
+ public static final ObjectParser PARSER = new ObjectParser<>(
+ LEVEL_FIELD.getPreferredName(), NormalizerResult::new);
+
+ static {
+ PARSER.declareString(NormalizerResult::setLevel, LEVEL_FIELD);
+ PARSER.declareString(NormalizerResult::setPartitionFieldName, PARTITION_FIELD_NAME_FIELD);
+ PARSER.declareString(NormalizerResult::setPartitionFieldValue, PARTITION_FIELD_VALUE_FIELD);
+ PARSER.declareString(NormalizerResult::setPersonFieldName, PERSON_FIELD_NAME_FIELD);
+ PARSER.declareString(NormalizerResult::setFunctionName, FUNCTION_NAME_FIELD);
+ PARSER.declareString(NormalizerResult::setValueFieldName, VALUE_FIELD_NAME_FIELD);
+ PARSER.declareDouble(NormalizerResult::setProbability, PROBABILITY_FIELD);
+ PARSER.declareDouble(NormalizerResult::setNormalizedScore, NORMALIZED_SCORE_FIELD);
+ }
+
+ private String level;
+ private String partitionFieldName;
+ private String partitionFieldValue;
+ private String personFieldName;
+ private String functionName;
+ private String valueFieldName;
+ private double probability;
+ private double normalizedScore;
+
+ public NormalizerResult() {
+ }
+
+ public NormalizerResult(StreamInput in) throws IOException {
+ level = in.readOptionalString();
+ partitionFieldName = in.readOptionalString();
+ partitionFieldValue = in.readOptionalString();
+ personFieldName = in.readOptionalString();
+ functionName = in.readOptionalString();
+ valueFieldName = in.readOptionalString();
+ probability = in.readDouble();
+ normalizedScore = in.readDouble();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeOptionalString(level);
+ out.writeOptionalString(partitionFieldName);
+ out.writeOptionalString(partitionFieldValue);
+ out.writeOptionalString(personFieldName);
+ out.writeOptionalString(functionName);
+ out.writeOptionalString(valueFieldName);
+ out.writeDouble(probability);
+ out.writeDouble(normalizedScore);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(LEVEL_FIELD.getPreferredName(), level);
+ builder.field(PARTITION_FIELD_NAME_FIELD.getPreferredName(), partitionFieldName);
+ builder.field(PARTITION_FIELD_VALUE_FIELD.getPreferredName(), partitionFieldValue);
+ builder.field(PERSON_FIELD_NAME_FIELD.getPreferredName(), personFieldName);
+ builder.field(FUNCTION_NAME_FIELD.getPreferredName(), functionName);
+ builder.field(VALUE_FIELD_NAME_FIELD.getPreferredName(), valueFieldName);
+ builder.field(PROBABILITY_FIELD.getPreferredName(), probability);
+ builder.field(NORMALIZED_SCORE_FIELD.getPreferredName(), normalizedScore);
+ builder.endObject();
+ return builder;
+ }
+
+ public String getLevel() {
+ return level;
+ }
+
+ public void setLevel(String level) {
+ this.level = level;
+ }
+
+ public String getPartitionFieldName() {
+ return partitionFieldName;
+ }
+
+ public void setPartitionFieldName(String partitionFieldName) {
+ this.partitionFieldName = partitionFieldName;
+ }
+
+ public String getPartitionFieldValue() {
+ return partitionFieldValue;
+ }
+
+ public void setPartitionFieldValue(String partitionFieldValue) {
+ this.partitionFieldValue = partitionFieldValue;
+ }
+
+ public String getPersonFieldName() {
+ return personFieldName;
+ }
+
+ public void setPersonFieldName(String personFieldName) {
+ this.personFieldName = personFieldName;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public void setFunctionName(String functionName) {
+ this.functionName = functionName;
+ }
+
+ public String getValueFieldName() {
+ return valueFieldName;
+ }
+
+ public void setValueFieldName(String valueFieldName) {
+ this.valueFieldName = valueFieldName;
+ }
+
+ public double getProbability() {
+ return probability;
+ }
+
+ public void setProbability(double probability) {
+ this.probability = probability;
+ }
+
+ public double getNormalizedScore() {
+ return normalizedScore;
+ }
+
+ public void setNormalizedScore(double normalizedScore) {
+ this.normalizedScore = normalizedScore;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(level, partitionFieldName, partitionFieldValue, personFieldName,
+ functionName, valueFieldName, probability, normalizedScore);
+ }
+
+ /**
+ * Compare all the fields.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof NormalizerResult)) {
+ return false;
+ }
+
+ NormalizerResult that = (NormalizerResult)other;
+
+ return Objects.equals(this.level, that.level)
+ && Objects.equals(this.partitionFieldName, that.partitionFieldName)
+ && Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
+ && Objects.equals(this.personFieldName, that.personFieldName)
+ && Objects.equals(this.functionName, that.functionName)
+ && Objects.equals(this.valueFieldName, that.valueFieldName)
+ && this.probability == that.probability
+ && this.normalizedScore == that.normalizedScore;
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/PartitionScoreNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/PartitionScoreNormalizable.java
new file mode 100644
index 00000000000..9bafc460086
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/PartitionScoreNormalizable.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.xpack.prelert.job.results.PartitionScore;
+
+import java.util.Objects;
+
+
+public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
+ private final PartitionScore score;
+
+ public PartitionScoreNormalizable(PartitionScore score) {
+ this.score = Objects.requireNonNull(score);
+ }
+
+ @Override
+ public Level getLevel() {
+ return Level.PARTITION;
+ }
+
+ @Override
+ public String getPartitionFieldName() {
+ return score.getPartitionFieldName();
+ }
+
+ @Override
+ public String getPartitionFieldValue() {
+ return score.getPartitionFieldValue();
+ }
+
+ @Override
+ public String getPersonFieldName() {
+ return null;
+ }
+
+ @Override
+ public String getFunctionName() {
+ return null;
+ }
+
+ @Override
+ public String getValueFieldName() {
+ return null;
+ }
+
+ @Override
+ public double getProbability() {
+ return score.getProbability();
+ }
+
+ @Override
+ public double getNormalizedScore() {
+ return score.getAnomalyScore();
+ }
+
+ @Override
+ public void setNormalizedScore(double normalizedScore) {
+ score.setAnomalyScore(normalizedScore);
+ }
+
+ @Override
+ public void setParentScore(double parentScore) {
+ // Do nothing as it is not holding the parent score.
+ }
+
+ @Override
+ public void resetBigChangeFlag() {
+ score.resetBigNormalizedUpdateFlag();
+ }
+
+ @Override
+ public void raiseBigChangeFlag() {
+ score.raiseBigNormalizedUpdateFlag();
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RecordNormalizable.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RecordNormalizable.java
new file mode 100644
index 00000000000..78264564d06
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RecordNormalizable.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
+
+import java.util.Objects;
+
+
+class RecordNormalizable extends AbstractLeafNormalizable {
+ private final AnomalyRecord record;
+
+ public RecordNormalizable(AnomalyRecord record) {
+ this.record = Objects.requireNonNull(record);
+ }
+
+ @Override
+ public Level getLevel() {
+ return Level.LEAF;
+ }
+
+ @Override
+ public String getPartitionFieldName() {
+ return record.getPartitionFieldName();
+ }
+
+ @Override
+ public String getPartitionFieldValue() {
+ return record.getPartitionFieldValue();
+ }
+
+ @Override
+ public String getPersonFieldName() {
+ String over = record.getOverFieldName();
+ return over != null ? over : record.getByFieldName();
+ }
+
+ @Override
+ public String getFunctionName() {
+ return record.getFunction();
+ }
+
+ @Override
+ public String getValueFieldName() {
+ return record.getFieldName();
+ }
+
+ @Override
+ public double getProbability() {
+ return record.getProbability();
+ }
+
+ @Override
+ public double getNormalizedScore() {
+ return record.getNormalizedProbability();
+ }
+
+ @Override
+ public void setNormalizedScore(double normalizedScore) {
+ record.setNormalizedProbability(normalizedScore);
+ }
+
+ @Override
+ public void setParentScore(double parentScore) {
+ record.setAnomalyScore(parentScore);
+ }
+
+ @Override
+ public void resetBigChangeFlag() {
+ record.resetBigNormalizedUpdateFlag();
+ }
+
+ @Override
+ public void raiseBigChangeFlag() {
+ record.raiseBigNormalizedUpdateFlag();
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormaliser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormalizer.java
similarity index 74%
rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormaliser.java
rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormalizer.java
index de806b2ebf6..220b117f317 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormaliser.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/Renormalizer.java
@@ -7,28 +7,28 @@ package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
-public interface Renormaliser {
+public interface Renormalizer {
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
- void renormalise(Quantiles quantiles);
+ void renormalize(Quantiles quantiles);
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records and aggregate records to the partition
* level
*/
- void renormaliseWithPartition(Quantiles quantiles);
+ void renormalizeWithPartition(Quantiles quantiles);
/**
- * Blocks until the renormaliser is idle and no further normalisation tasks are pending.
+ * Blocks until the renormalizer is idle and no further normalization tasks are pending.
*/
void waitUntilIdle();
/**
- * Shut down the renormaliser
+ * Shut down the renormalizer
*/
boolean shutdown();
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormaliserFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormalizerFactory.java
similarity index 79%
rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormaliserFactory.java
rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormalizerFactory.java
index 8bac84d5084..e710b41bc59 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormaliserFactory.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/RenormalizerFactory.java
@@ -6,6 +6,6 @@
package org.elasticsearch.xpack.prelert.job.process.normalizer;
-public interface RenormaliserFactory {
- Renormaliser create(String jobId);
+public interface RenormalizerFactory {
+ Renormalizer create(String jobId);
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdater.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdater.java
new file mode 100644
index 00000000000..b300ce773c8
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdater.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
+import org.elasticsearch.xpack.prelert.job.Job;
+import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
+import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
+import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
+import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
+import org.elasticsearch.xpack.prelert.job.results.Bucket;
+import org.elasticsearch.xpack.prelert.job.results.Influencer;
+import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Thread safe class that updates the scores of all existing results
+ * with the renormalized scores
+ */
+class ScoresUpdater {
+ private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class);
+
+ /**
+ * Target number of records to renormalize at a time
+ */
+ private static final int TARGET_RECORDS_TO_RENORMALIZE = 100000;
+
+ // 30 days
+ private static final long DEFAULT_RENORMALIZATION_WINDOW_MS = 2592000000L;
+
+ private static final int DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW = 100;
+
+ private static final long SECONDS_IN_DAY = 86400;
+ private static final long MILLISECONDS_IN_SECOND = 1000;
+
+ private final Job job;
+ private final JobProvider jobProvider;
+ private final JobRenormalizer updatesPersister;
+ private final NormalizerFactory normalizerFactory;
+ private int bucketSpan;
+ private long normalizationWindow;
+ private boolean perPartitionNormalization;
+
+ public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizer jobRenormalizer, NormalizerFactory normalizerFactory) {
+ this.job = job;
+ this.jobProvider = Objects.requireNonNull(jobProvider);
+ updatesPersister = Objects.requireNonNull(jobRenormalizer);
+ this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
+ bucketSpan = getBucketSpanOrDefault(job.getAnalysisConfig());
+ normalizationWindow = getNormalizationWindowOrDefault(job);
+ perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig());
+ }
+
+ private static int getBucketSpanOrDefault(AnalysisConfig analysisConfig) {
+ if (analysisConfig != null && analysisConfig.getBucketSpan() != null) {
+ return analysisConfig.getBucketSpan().intValue();
+ }
+ // A bucketSpan value of 0 will result to the default
+ // bucketSpan value being used in the back-end.
+ return 0;
+ }
+
+ private long getNormalizationWindowOrDefault(Job job) {
+ if (job.getRenormalizationWindowDays() != null) {
+ return job.getRenormalizationWindowDays() * SECONDS_IN_DAY * MILLISECONDS_IN_SECOND;
+ }
+ return Math.max(DEFAULT_RENORMALIZATION_WINDOW_MS,
+ DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW * bucketSpan * MILLISECONDS_IN_SECOND);
+ }
+
+ private static boolean getPerPartitionNormalizationOrDefault(AnalysisConfig analysisConfig) {
+ return (analysisConfig != null) && analysisConfig.getUsePerPartitionNormalization();
+ }
+
+ /**
+ * Update the anomaly score field on all previously persisted buckets
+ * and all contained records
+ */
+ public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) {
+ Normalizer normalizer = normalizerFactory.create(job.getId());
+ int[] counts = {0, 0};
+ updateBuckets(normalizer, quantilesState, endBucketEpochMs,
+ windowExtensionMs, counts, perPartitionNormalization);
+ updateInfluencers(normalizer, quantilesState, endBucketEpochMs,
+ windowExtensionMs, counts);
+
+ LOGGER.info("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]);
+ }
+
+ private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
+ long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
+ BatchedDocumentsIterator bucketsIterator =
+ jobProvider.newBatchedBucketsIterator(job.getId())
+ .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
+
+ // Make a list of buckets with their records to be renormalized.
+ // This may be shorter than the original list of buckets for two
+ // reasons:
+ // 1) We don't bother with buckets that have raw score 0 and no
+ // records
+ // 2) We limit the total number of records to be not much more
+ // than 100000
+ List bucketsToRenormalize = new ArrayList<>();
+ int batchRecordCount = 0;
+ int skipped = 0;
+
+ while (bucketsIterator.hasNext()) {
+ // Get a batch of buckets without their records to calculate
+ // how many buckets can be sensibly retrieved
+ Deque buckets = bucketsIterator.next();
+ if (buckets.isEmpty()) {
+ LOGGER.debug("[{}] No buckets to renormalize for job", job.getId());
+ break;
+ }
+
+ while (!buckets.isEmpty()) {
+ Bucket currentBucket = buckets.removeFirst();
+ if (currentBucket.isNormalizable()) {
+ bucketsToRenormalize.add(currentBucket);
+ batchRecordCount += jobProvider.expandBucket(job.getId(), false, currentBucket);
+ } else {
+ ++skipped;
+ }
+
+ if (batchRecordCount >= TARGET_RECORDS_TO_RENORMALIZE) {
+ normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState,
+ batchRecordCount, skipped, counts, perPartitionNormalization);
+
+ bucketsToRenormalize = new ArrayList<>();
+ batchRecordCount = 0;
+ skipped = 0;
+ }
+ }
+ }
+ if (!bucketsToRenormalize.isEmpty()) {
+ normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState,
+ batchRecordCount, skipped, counts, perPartitionNormalization);
+ }
+ }
+
+ private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) {
+ return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs);
+ }
+
+ private void normalizeBuckets(Normalizer normalizer, List buckets, String quantilesState,
+ int recordCount, int skipped, int[] counts, boolean perPartitionNormalization) {
+ LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)",
+ job.getId(), buckets.size(), recordCount, skipped);
+
+ List asNormalizables = buckets.stream()
+ .map(bucket -> new BucketNormalizable(bucket)).collect(Collectors.toList());
+ normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
+
+ for (Bucket bucket : buckets) {
+ updateSingleBucket(bucket, counts, perPartitionNormalization);
+ }
+ }
+
+ /**
+ * Update the anomaly score and unsual score fields on the bucket provided
+ * and all contained records
+ *
+ * @param counts Element 0 will be incremented if we update a document and
+ * element 1 if we don't
+ */
+ private void updateSingleBucket(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
+ updateBucketIfItHasBigChange(bucket, counts, perPartitionNormalization);
+ updateRecordsThatHaveBigChange(bucket, counts);
+ }
+
+ private void updateBucketIfItHasBigChange(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
+ if (bucket.hadBigNormalizedUpdate()) {
+ if (perPartitionNormalization) {
+ updatesPersister.updatePerPartitionMaxProbabilities(bucket.getJobId(), bucket.getRecords());
+ }
+ updatesPersister.updateBucket(bucket);
+
+ ++counts[0];
+ } else {
+ ++counts[1];
+ }
+ }
+
+ private void updateRecordsThatHaveBigChange(Bucket bucket, int[] counts) {
+ List toUpdate = new ArrayList<>();
+
+ for (AnomalyRecord record : bucket.getRecords()) {
+ if (record.hadBigNormalizedUpdate()) {
+ toUpdate.add(record);
+ ++counts[0];
+ } else {
+ ++counts[1];
+ }
+ }
+
+ if (!toUpdate.isEmpty()) {
+ updatesPersister.updateRecords(bucket.getId(), toUpdate);
+ }
+ }
+
+ private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
+ long windowExtensionMs, int[] counts) {
+ BatchedDocumentsIterator influencersIterator = jobProvider
+ .newBatchedInfluencersIterator(job.getId())
+ .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
+ while (influencersIterator.hasNext()) {
+ Deque influencers = influencersIterator.next();
+ if (influencers.isEmpty()) {
+ LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
+ break;
+ }
+
+ LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size());
+ List asNormalizables = influencers.stream()
+ .map(bucket -> new InfluencerNormalizable(bucket)).collect(Collectors.toList());
+ normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
+
+ List toUpdate = new ArrayList<>();
+ for (Influencer influencer : influencers) {
+ if (influencer.hadBigNormalizedUpdate()) {
+ toUpdate.add(influencer);
+ ++counts[0];
+ } else {
+ ++counts[1];
+ }
+ }
+ if (!toUpdate.isEmpty()) {
+ updatesPersister.updateInfluencer(job.getId(), toUpdate);
+ }
+ }
+ }
+}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormaliser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormalizer.java
similarity index 63%
rename from elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormaliser.java
rename to elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormalizer.java
index 2a8f6124f4a..076a3f8dba1 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormaliser.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/noop/NoOpRenormalizer.java
@@ -5,22 +5,22 @@
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer.noop;
-import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
+import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
/**
- * A {@link Renormaliser} implementation that does absolutely nothing
- * This should be removed when the normaliser code is ported
+ * A {@link Renormalizer} implementation that does absolutely nothing
+ * This should be removed when the normalizer code is ported
*/
-public class NoOpRenormaliser implements Renormaliser {
- // NORELEASE Remove once the normaliser code is ported
+public class NoOpRenormalizer implements Renormalizer {
+ // NORELEASE Remove once the normalizer code is ported
@Override
- public void renormalise(Quantiles quantiles) {
+ public void renormalize(Quantiles quantiles) {
}
@Override
- public void renormaliseWithPartition(Quantiles quantiles) {
+ public void renormalizeWithPartition(Quantiles quantiles) {
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandler.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandler.java
new file mode 100644
index 00000000000..5b36d1c337a
--- /dev/null
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer.output;
+
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.ParseFieldMatcher;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContent;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reads normalizer output.
+ */
+public class NormalizerResultHandler extends AbstractComponent {
+
+ private static final int READ_BUF_SIZE = 1024;
+
+ private final InputStream inputStream;
+ private final List normalizedResults;
+
+ public NormalizerResultHandler(Settings settings, InputStream inputStream) {
+ super(settings);
+ this.inputStream = inputStream;
+ normalizedResults = new ArrayList<>();
+ }
+
+ public List getNormalizedResults() {
+ return normalizedResults;
+ }
+
+ public void process() throws IOException {
+ XContent xContent = XContentFactory.xContent(XContentType.JSON);
+ BytesReference bytesRef = null;
+ byte[] readBuf = new byte[READ_BUF_SIZE];
+ for (int bytesRead = inputStream.read(readBuf); bytesRead != -1; bytesRead = inputStream.read(readBuf)) {
+ if (bytesRef == null) {
+ bytesRef = new BytesArray(readBuf, 0, bytesRead);
+ } else {
+ bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
+ }
+ bytesRef = parseResults(xContent, bytesRef);
+ readBuf = new byte[READ_BUF_SIZE];
+ }
+ }
+
+ private BytesReference parseResults(XContent xContent, BytesReference bytesRef) throws IOException {
+ byte marker = xContent.streamSeparator();
+ int from = 0;
+ while (true) {
+ int nextMarker = findNextMarker(marker, bytesRef, from);
+ if (nextMarker == -1) {
+ // No more markers in this block
+ break;
+ }
+ // Ignore blank lines
+ if (nextMarker > from) {
+ parseResult(xContent, bytesRef.slice(from, nextMarker - from));
+ }
+ from = nextMarker + 1;
+ }
+ if (from >= bytesRef.length()) {
+ return null;
+ }
+ return bytesRef.slice(from, bytesRef.length() - from);
+ }
+
+ private void parseResult(XContent xContent, BytesReference bytesRef) throws IOException {
+ XContentParser parser = xContent.createParser(bytesRef);
+ NormalizerResult result = NormalizerResult.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT);
+ normalizedResults.add(result);
+ }
+
+ private static int findNextMarker(byte marker, BytesReference bytesRef, int from) {
+ for (int i = from; i < bytesRef.length(); ++i) {
+ if (bytesRef.get(i) == marker) {
+ return i;
+ }
+ }
+ return -1;
+ }
+}
+
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java
index b49bfd0e9b6..f8698a06bdc 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecord.java
@@ -72,7 +72,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public static final ParseField CAUSES = new ParseField("causes");
/**
- * Normalisation
+ * Normalization
*/
public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score");
public static final ParseField NORMALIZED_PROBABILITY = new ParseField("normalized_probability");
@@ -147,7 +147,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
private List influencers;
- private boolean hadBigNormalisedUpdate;
+ private boolean hadBigNormalizedUpdate;
public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
@@ -190,7 +190,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
if (in.readBoolean()) {
influencers = in.readList(Influence::new);
}
- hadBigNormalisedUpdate = in.readBoolean();
}
@Override
@@ -235,7 +234,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
if (hasInfluencers) {
out.writeList(influencers);
}
- out.writeBoolean(hadBigNormalisedUpdate);
}
@Override
@@ -491,7 +489,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
- // hadBigNormalisedUpdate is deliberately excluded from the hash
+ // hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore,
normalizedProbability, initialNormalizedProbability, typical, actual,
@@ -513,7 +511,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
AnomalyRecord that = (AnomalyRecord) other;
- // hadBigNormalisedUpdate is deliberately excluded from the test
+ // hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(this.jobId, that.jobId)
&& this.detectorIndex == that.detectorIndex
&& this.sequenceNum == that.sequenceNum
@@ -540,15 +538,15 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.influencers, that.influencers);
}
- public boolean hadBigNormalisedUpdate() {
- return this.hadBigNormalisedUpdate;
+ public boolean hadBigNormalizedUpdate() {
+ return this.hadBigNormalizedUpdate;
}
- public void resetBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = false;
+ public void resetBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = false;
}
- public void raiseBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = true;
+ public void raiseBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = true;
}
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java
index bf2149f4267..d4b18dc4a89 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
/**
* Bucket Result POJO
@@ -97,7 +99,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private List records = Collections.emptyList();
private long eventCount;
private boolean isInterim;
- private boolean hadBigNormalisedUpdate;
+ private boolean hadBigNormalizedUpdate;
private List bucketInfluencers = new ArrayList<>();
private long processingTimeMs;
private Map perPartitionMaxProbability = Collections.emptyMap();
@@ -121,7 +123,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
records = in.readList(AnomalyRecord::new);
eventCount = in.readLong();
isInterim = in.readBoolean();
- hadBigNormalisedUpdate = in.readBoolean();
bucketInfluencers = in.readList(BucketInfluencer::new);
processingTimeMs = in.readLong();
perPartitionMaxProbability = (Map) in.readGenericValue();
@@ -140,7 +141,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
out.writeList(records);
out.writeLong(eventCount);
out.writeBoolean(isInterim);
- out.writeBoolean(hadBigNormalisedUpdate);
out.writeList(bucketInfluencers);
out.writeLong(processingTimeMs);
out.writeGenericValue(perPartitionMaxProbability);
@@ -315,7 +315,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
- // hadBigNormalisedUpdate is deliberately excluded from the hash
+ // hadBigNormalizedUpdate is deliberately excluded from the hash
// as is id, which is generated by the datastore
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records,
isInterim, bucketSpan, bucketInfluencers);
@@ -336,7 +336,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
Bucket that = (Bucket) other;
- // hadBigNormalisedUpdate is deliberately excluded from the test
+ // hadBigNormalizedUpdate is deliberately excluded from the test
// as is id, which is generated by the datastore
return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp)
&& (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan)
@@ -346,35 +346,35 @@ public class Bucket extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers);
}
- public boolean hadBigNormalisedUpdate() {
- return hadBigNormalisedUpdate;
+ public boolean hadBigNormalizedUpdate() {
+ return hadBigNormalizedUpdate;
}
- public void resetBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = false;
+ public void resetBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = false;
}
- public void raiseBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = true;
+ public void raiseBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = true;
}
/**
* This method encapsulated the logic for whether a bucket should be
- * normalised. The decision depends on two factors.
+ * normalized. The decision depends on two factors.
*
* The first is whether the bucket has bucket influencers. Since bucket
* influencers were introduced, every bucket must have at least one bucket
* influencer. If it does not, it means it is a bucket persisted with an
- * older version and should not be normalised.
+ * older version and should not be normalized.
*
* The second factor has to do with minimising the number of buckets that
- * are sent for normalisation. Buckets that have no records and a score of
- * zero should not be normalised as their score will not change and they
+ * are sent for normalization. Buckets that have no records and a score of
+ * zero should not be normalized as their score will not change and they
* will just add overhead.
*
- * @return true if the bucket should be normalised or false otherwise
+ * @return true if the bucket should be normalized or false otherwise
*/
- public boolean isNormalisable() {
+ public boolean isNormalizable() {
if (bucketInfluencers == null || bucketInfluencers.isEmpty()) {
return false;
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java
index 879761d51bc..04196ecc2c0 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/BucketInfluencer.java
@@ -30,7 +30,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final String RESULT_TYPE_VALUE = "bucketInfluencer";
public static final ParseField RESULT_TYPE_FIELD = new ParseField(RESULT_TYPE_VALUE);
- /*
+ /**
* Field names
*/
public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name");
@@ -43,6 +43,11 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
+ /**
+ * The influencer field name used for time influencers
+ */
+ public static final String BUCKET_TIME = "bucket_time";
+
public static final ConstructingObjectParser PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0],
(Date) a[1], (long) a[2], (int) a[3]));
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java
index 642ca983c3d..b0c7fb28128 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Influencer.java
@@ -79,7 +79,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
private double probability;
private double initialAnomalyScore;
private double anomalyScore;
- private boolean hadBigNormalisedUpdate;
+ private boolean hadBigNormalizedUpdate;
private boolean isInterim;
public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) {
@@ -99,7 +99,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
probability = in.readDouble();
initialAnomalyScore = in.readDouble();
anomalyScore = in.readDouble();
- hadBigNormalisedUpdate = in.readBoolean();
isInterim = in.readBoolean();
bucketSpan = in.readLong();
sequenceNum = in.readInt();
@@ -114,7 +113,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
out.writeDouble(probability);
out.writeDouble(initialAnomalyScore);
out.writeDouble(anomalyScore);
- out.writeBoolean(hadBigNormalisedUpdate);
out.writeBoolean(isInterim);
out.writeLong(bucketSpan);
out.writeInt(sequenceNum);
@@ -190,22 +188,22 @@ public class Influencer extends ToXContentToBytes implements Writeable {
isInterim = value;
}
- public boolean hadBigNormalisedUpdate() {
- return this.hadBigNormalisedUpdate;
+ public boolean hadBigNormalizedUpdate() {
+ return this.hadBigNormalizedUpdate;
}
- public void resetBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = false;
+ public void resetBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = false;
}
- public void raiseBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = true;
+ public void raiseBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = true;
}
@Override
public int hashCode() {
- // hadBigNormalisedUpdate is deliberately excluded from the hash
+ // hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim,
bucketSpan, sequenceNum);
@@ -227,7 +225,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
Influencer other = (Influencer) obj;
- // hadBigNormalisedUpdate is deliberately excluded from the test
+ // hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp)
&& Objects.equals(influenceField, other.influenceField)
&& Objects.equals(influenceValue, other.influenceValue)
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PartitionScore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PartitionScore.java
index 738f3277af1..22cc0bfcaa9 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PartitionScore.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PartitionScore.java
@@ -23,7 +23,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
private String partitionFieldName;
private double anomalyScore;
private double probability;
- private boolean hadBigNormalisedUpdate;
+ private boolean hadBigNormalizedUpdate;
public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3]));
@@ -36,7 +36,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
}
public PartitionScore(String fieldName, String fieldValue, double anomalyScore, double probability) {
- hadBigNormalisedUpdate = false;
+ hadBigNormalizedUpdate = false;
partitionFieldName = fieldName;
partitionFieldValue = fieldValue;
this.anomalyScore = anomalyScore;
@@ -48,7 +48,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
partitionFieldValue = in.readString();
anomalyScore = in.readDouble();
probability = in.readDouble();
- hadBigNormalisedUpdate = in.readBoolean();
}
@Override
@@ -57,7 +56,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
out.writeString(partitionFieldValue);
out.writeDouble(anomalyScore);
out.writeDouble(probability);
- out.writeBoolean(hadBigNormalisedUpdate);
}
@Override
@@ -120,22 +118,22 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
PartitionScore that = (PartitionScore) other;
- // hadBigNormalisedUpdate is deliberately excluded from the test
+ // hadBigNormalizedUpdate is deliberately excluded from the test
// as is id, which is generated by the datastore
return Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability)
&& (this.anomalyScore == that.anomalyScore);
}
- public boolean hadBigNormalisedUpdate() {
- return hadBigNormalisedUpdate;
+ public boolean hadBigNormalizedUpdate() {
+ return hadBigNormalizedUpdate;
}
- public void resetBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = false;
+ public void resetBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = false;
}
- public void raiseBigNormalisedUpdateFlag() {
- hadBigNormalisedUpdate = true;
+ public void raiseBigNormalizedUpdateFlag() {
+ hadBigNormalizedUpdate = true;
}
}
diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java
index e9e0894742a..fb1e3434648 100644
--- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java
+++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilities.java
@@ -29,7 +29,7 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
- * When per-partition normalisation is enabled this class represents
+ * When per-partition normalization is enabled this class represents
* the max anomalous probabilities of each partition per bucket. These values
* calculated from the bucket's anomaly records.
*/
@@ -124,7 +124,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
Optional first =
perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst();
- return first.isPresent() ? first.get().getMaxNormalisedProbability() : 0.0;
+ return first.isPresent() ? first.get().getMaxNormalizedProbability() : 0.0;
}
/**
@@ -201,7 +201,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
/**
- * Class for partitionValue, maxNormalisedProb pairs
+ * Class for partitionValue, maxNormalizedProb pairs
*/
public static class PartitionProbability extends ToXContentToBytes implements Writeable {
@@ -215,44 +215,44 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
private final String partitionValue;
- private final double maxNormalisedProbability;
+ private final double maxNormalizedProbability;
- PartitionProbability(String partitionName, double maxNormalisedProbability) {
+ PartitionProbability(String partitionName, double maxNormalizedProbability) {
this.partitionValue = partitionName;
- this.maxNormalisedProbability = maxNormalisedProbability;
+ this.maxNormalizedProbability = maxNormalizedProbability;
}
public PartitionProbability(StreamInput in) throws IOException {
partitionValue = in.readString();
- maxNormalisedProbability = in.readDouble();
+ maxNormalizedProbability = in.readDouble();
}
public String getPartitionValue() {
return partitionValue;
}
- public double getMaxNormalisedProbability() {
- return maxNormalisedProbability;
+ public double getMaxNormalizedProbability() {
+ return maxNormalizedProbability;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionValue);
- out.writeDouble(maxNormalisedProbability);
+ out.writeDouble(maxNormalizedProbability);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue)
- .field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalisedProbability)
+ .field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability)
.endObject();
return builder;
}
@Override
public int hashCode() {
- return Objects.hash(partitionValue, maxNormalisedProbability);
+ return Objects.hash(partitionValue, maxNormalizedProbability);
}
@Override
@@ -268,7 +268,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
PartitionProbability that = (PartitionProbability) other;
return Objects.equals(this.partitionValue, that.partitionValue)
- && this.maxNormalisedProbability == that.maxNormalisedProbability;
+ && this.maxNormalizedProbability == that.maxNormalizedProbability;
}
}
}
diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties
index 91ded5d436c..06fe0a129b9 100644
--- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties
+++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/prelert/job/messages/prelert_messages.properties
@@ -93,8 +93,8 @@ job.config.overField.needs.another = over_field_name must be used in conjunction
job.config.overlapping.buckets.incompatible.function = Overlapping buckets cannot be used with function ''{0}''
job.config.multiple.bucketspans.require.bucket_span = Multiple bucket_spans require a bucket_span to be specified
job.config.multiple.bucketspans.must.be.multiple = Multiple bucket_span ''{0}'' must be a multiple of the main bucket_span ''{1}''
-job.config.per.partition.normalisation.requires.partition.field = If the job is configured with Per-Partition Normalization enabled a detector must have a partition field
-job.config.per.partition.normalisation.cannot.use.influencers = A job configured with Per-Partition Normalization cannot use influencers
+job.config.per.partition.normalization.requires.partition.field = If the job is configured with Per-Partition Normalization enabled a detector must have a partition field
+job.config.per.partition.normalization.cannot.use.influencers = A job configured with Per-Partition Normalization cannot use influencers
job.config.update.analysis.limits.parse.error = JSON parse error reading the update value for analysis_limits
job.config.update.analysis.limits.cannot.be.null = Invalid update value for analysis_limits: null
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java
index cd1abbe861b..4d784fd00c3 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/ProcessCtrlTests.java
@@ -130,12 +130,12 @@ public class ProcessCtrlTests extends ESTestCase {
assertTrue(command.contains("--ignoreDowntime"));
}
- public void testBuildNormaliserCommand() throws IOException {
+ public void testBuildNormalizerCommand() throws IOException {
Environment env = new Environment(
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
String jobId = "unit-test-job";
- List command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, pid);
+ List command = ProcessCtrl.buildNormalizerCommand(env, jobId, null, 300, true, pid);
assertEquals(5, command.size());
assertTrue(command.contains(ProcessCtrl.NORMALIZE_PATH));
assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "300"));
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java
index 5b18d50cebf..7c8832b909c 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java
@@ -9,7 +9,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
-import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
+import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
@@ -52,21 +52,21 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
when(parser.parseResults(any())).thenReturn(stream);
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, parser);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, parser);
processor.process(JOB_ID, mock(InputStream.class), randomBoolean());
- verify(renormaliser, times(1)).shutdown();
+ verify(renormalizer, times(1)).shutdown();
assertEquals(0, processor.completionLatch.getCount());
}
public void testProcessResult_bucket() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -83,12 +83,12 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_bucket_deleteInterimRequired() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutodetectResult result = mock(AutodetectResult.class);
@@ -105,11 +105,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -125,11 +125,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records_isPerPartitionNormalization() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
@@ -148,11 +148,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_influencers() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -168,10 +168,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_categoryDefinition() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -185,11 +185,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgement() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -206,11 +206,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -232,10 +232,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelDebugOutput() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -249,10 +249,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelSizeStats() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -267,10 +267,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelSnapshot() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -284,10 +284,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_quantiles() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@@ -297,16 +297,16 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
- verify(renormaliser, times(1)).renormalise(quantiles);
+ verify(renormalizer, times(1)).renormalize(quantiles);
verifyNoMoreInteractions(persister);
- verifyNoMoreInteractions(renormaliser);
+ verifyNoMoreInteractions(renormalizer);
}
public void testProcessResult_quantiles_isPerPartitionNormalization() {
- Renormaliser renormaliser = mock(Renormaliser.class);
+ Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
- AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
+ AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, true, bulkBuilder);
context.deleteInterimRequired = false;
@@ -316,9 +316,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
- verify(renormaliser, times(1)).renormaliseWithPartition(quantiles);
+ verify(renormalizer, times(1)).renormalizeWithPartition(quantiles);
verifyNoMoreInteractions(persister);
- verifyNoMoreInteractions(renormaliser);
+ verifyNoMoreInteractions(renormalizer);
}
}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java
index b3b3b33ca52..1a4d1ad7f6a 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutodetectResultsParserTests.java
@@ -34,7 +34,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"max_normalized_probability\":0, \"anomaly_score\":0,\"record_count\":0,\"event_count\":806,\"bucket_influencers\":["
+ "{\"sequence_num\":1,\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0,"
+ "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\","
- + "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.1, normaliser 2" +
+ + "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.1, normalizer 2" +
".1]\"}}"
+ ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":"
+ "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":1,\"job_id\":\"foo\",\"probability\":0.0637541,"
@@ -58,8 +58,8 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"sequence_num\":6,\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03,"
+ "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": "
+ "{\"job_id\":\"foo\","
- + "\"quantile_state\":\"[normaliser 1.2, normaliser 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
- + "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.3, normaliser 2.3]\"}} ]";
+ + "\"quantile_state\":\"[normalizer 1.2, normalizer 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
+ + "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.3, normalizer 2.3]\"}} ]";
public static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08,"
+ "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\","
@@ -315,13 +315,13 @@ public class AutodetectResultsParserTests extends ESTestCase {
assertEquals(3, quantiles.size());
assertEquals("foo", quantiles.get(0).getJobId());
assertNull(quantiles.get(0).getTimestamp());
- assertEquals("[normaliser 1.1, normaliser 2.1]", quantiles.get(0).getQuantileState());
+ assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState());
assertEquals("foo", quantiles.get(1).getJobId());
assertNull(quantiles.get(1).getTimestamp());
- assertEquals("[normaliser 1.2, normaliser 2.2]", quantiles.get(1).getQuantileState());
+ assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState());
assertEquals("foo", quantiles.get(2).getJobId());
assertNull(quantiles.get(2).getTimestamp());
- assertEquals("[normaliser 1.3, normaliser 2.3]", quantiles.get(2).getQuantileState());
+ assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState());
}
@AwaitsFix(bugUrl = "rewrite this test so it doesn't use ~200 lines of json")
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketInfluencerNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketInfluencerNormalizableTests.java
new file mode 100644
index 00000000000..ee584a64684
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketInfluencerNormalizableTests.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
+import org.junit.Before;
+
+import java.util.Date;
+
+
+public class BucketInfluencerNormalizableTests extends ESTestCase {
+ private static final double EPSILON = 0.0001;
+ private BucketInfluencer bucketInfluencer;
+
+ @Before
+ public void setUpBucketInfluencer() {
+ bucketInfluencer = new BucketInfluencer("foo", new Date(), 600, 1);
+ bucketInfluencer.setInfluencerFieldName("airline");
+ bucketInfluencer.setProbability(0.05);
+ bucketInfluencer.setRawAnomalyScore(3.14);
+ bucketInfluencer.setInitialAnomalyScore(2.0);
+ bucketInfluencer.setAnomalyScore(1.0);
+ }
+
+ public void testIsContainerOnly() {
+ assertFalse(new BucketInfluencerNormalizable(bucketInfluencer).isContainerOnly());
+ }
+
+ public void testGetLevel() {
+ assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer).getLevel());
+
+ BucketInfluencer timeInfluencer = new BucketInfluencer("foo", new Date(), 600, 1);
+ timeInfluencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
+ assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer).getLevel());
+ }
+
+ public void testGetPartitionFieldName() {
+ assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getPartitionFieldName());
+ }
+
+ public void testGetPersonFieldName() {
+ assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer).getPersonFieldName());
+ }
+
+ public void testGetFunctionName() {
+ assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getFunctionName());
+ }
+
+ public void testGetValueFieldName() {
+ assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getValueFieldName());
+ }
+
+ public void testGetProbability() {
+ assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer).getProbability(), EPSILON);
+ }
+
+ public void testGetNormalizedScore() {
+ assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer).getNormalizedScore(), EPSILON);
+ }
+
+ public void testSetNormalizedScore() {
+ BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer);
+
+ normalizable.setNormalizedScore(99.0);
+
+ assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
+ assertEquals(99.0, bucketInfluencer.getAnomalyScore(), EPSILON);
+ }
+
+ public void testGetChildrenTypes() {
+ assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildrenTypes().isEmpty());
+ }
+
+ public void testGetChildren_ByType() {
+ expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).getChildren(0));
+ }
+
+ public void testGetChildren() {
+ assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildren().isEmpty());
+ }
+
+ public void testSetMaxChildrenScore() {
+ expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).setMaxChildrenScore(0, 42.0));
+ }
+
+ public void testSetParentScore() {
+ new BucketInfluencerNormalizable(bucketInfluencer).setParentScore(42.0);
+
+ assertEquals("airline", bucketInfluencer.getInfluencerFieldName());
+ assertEquals(1.0, bucketInfluencer.getAnomalyScore(), EPSILON);
+ assertEquals(3.14, bucketInfluencer.getRawAnomalyScore(), EPSILON);
+ assertEquals(2.0, bucketInfluencer.getInitialAnomalyScore(), EPSILON);
+ assertEquals(0.05, bucketInfluencer.getProbability(), EPSILON);
+ }
+
+ public void testResetBigChangeFlag() {
+ new BucketInfluencerNormalizable(bucketInfluencer).resetBigChangeFlag();
+ }
+
+ public void testRaiseBigChangeFlag() {
+ new BucketInfluencerNormalizable(bucketInfluencer).raiseBigChangeFlag();
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketNormalizableTests.java
new file mode 100644
index 00000000000..d6eb7a69c47
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/BucketNormalizableTests.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
+import org.elasticsearch.xpack.prelert.job.results.Bucket;
+import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
+import org.elasticsearch.xpack.prelert.job.results.PartitionScore;
+import org.junit.Before;
+
+
+public class BucketNormalizableTests extends ESTestCase {
+ private static final double EPSILON = 0.0001;
+ private Bucket bucket;
+
+ @Before
+ public void setUpBucket() {
+ bucket = new Bucket("foo", new Date(), 600);
+
+ BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", bucket.getTimestamp(), 600, 1);
+ bucketInfluencer1.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
+ bucketInfluencer1.setAnomalyScore(42.0);
+ bucketInfluencer1.setProbability(0.01);
+
+ BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", bucket.getTimestamp(), 600, 2);
+ bucketInfluencer2.setInfluencerFieldName("foo");
+ bucketInfluencer2.setAnomalyScore(88.0);
+ bucketInfluencer2.setProbability(0.001);
+
+ bucket.setBucketInfluencers(Arrays.asList(bucketInfluencer1, bucketInfluencer2));
+
+ bucket.setAnomalyScore(88.0);
+ bucket.setMaxNormalizedProbability(2.0);
+
+ AnomalyRecord record1 = new AnomalyRecord("foo", bucket.getTimestamp(), 600, 3);
+ record1.setNormalizedProbability(1.0);
+ AnomalyRecord record2 = new AnomalyRecord("foo", bucket.getTimestamp(), 600, 4);
+ record2.setNormalizedProbability(2.0);
+ bucket.setRecords(Arrays.asList(record1, record2));
+
+ List partitionScores = new ArrayList<>();
+ partitionScores.add(new PartitionScore("pf1", "pv1", 0.2, 0.1));
+ partitionScores.add(new PartitionScore("pf1", "pv2", 0.4, 0.01));
+ bucket.setPartitionScores(partitionScores);
+ }
+
+ public void testIsContainerOnly() {
+ assertTrue(new BucketNormalizable(bucket).isContainerOnly());
+ }
+
+ public void testGetLevel() {
+ assertEquals(Level.ROOT, new BucketNormalizable(bucket).getLevel());
+ }
+
+ public void testGetPartitionFieldName() {
+ assertNull(new BucketNormalizable(bucket).getPartitionFieldName());
+ }
+
+ public void testGetPartitionFieldValue() {
+ assertNull(new BucketNormalizable(bucket).getPartitionFieldValue());
+ }
+
+ public void testGetPersonFieldName() {
+ assertNull(new BucketNormalizable(bucket).getPersonFieldName());
+ }
+
+ public void testGetFunctionName() {
+ assertNull(new BucketNormalizable(bucket).getFunctionName());
+ }
+
+ public void testGetValueFieldName() {
+ assertNull(new BucketNormalizable(bucket).getValueFieldName());
+ }
+
+ public void testGetProbability() {
+ expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).getProbability());
+ }
+
+ public void testGetNormalizedScore() {
+ assertEquals(88.0, new BucketNormalizable(bucket).getNormalizedScore(), EPSILON);
+ }
+
+ public void testSetNormalizedScore() {
+ BucketNormalizable normalizable = new BucketNormalizable(bucket);
+
+ normalizable.setNormalizedScore(99.0);
+
+ assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
+ assertEquals(99.0, bucket.getAnomalyScore(), EPSILON);
+ }
+
+ public void testGetChildren() {
+ List children = new BucketNormalizable(bucket).getChildren();
+
+ assertEquals(6, children.size());
+ assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
+ assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
+ assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(2) instanceof RecordNormalizable);
+ assertEquals(1.0, children.get(2).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(3) instanceof RecordNormalizable);
+ assertEquals(2.0, children.get(3).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(4) instanceof PartitionScoreNormalizable);
+ assertEquals(0.2, children.get(4).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(5) instanceof PartitionScoreNormalizable);
+ assertEquals(0.4, children.get(5).getNormalizedScore(), EPSILON);
+ }
+
+ public void testGetChildren_GivenTypeBucketInfluencer() {
+ List children = new BucketNormalizable(bucket).getChildren(0);
+
+ assertEquals(2, children.size());
+ assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
+ assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
+ assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
+ }
+
+ public void testGetChildren_GivenTypeRecord() {
+ List children = new BucketNormalizable(bucket).getChildren(1);
+
+ assertEquals(2, children.size());
+ assertTrue(children.get(0) instanceof RecordNormalizable);
+ assertEquals(1.0, children.get(0).getNormalizedScore(), EPSILON);
+ assertTrue(children.get(1) instanceof RecordNormalizable);
+ assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON);
+ }
+
+ public void testGetChildren_GivenInvalidType() {
+ expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).getChildren(3));
+ }
+
+ public void testSetMaxChildrenScore_GivenDifferentScores() {
+ BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket);
+
+ assertTrue(bucketNormalizable.setMaxChildrenScore(0, 95.0));
+ assertTrue(bucketNormalizable.setMaxChildrenScore(1, 42.0));
+
+ assertEquals(95.0, bucket.getAnomalyScore(), EPSILON);
+ assertEquals(42.0, bucket.getMaxNormalizedProbability(), EPSILON);
+ }
+
+ public void testSetMaxChildrenScore_GivenSameScores() {
+ BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket);
+
+ assertFalse(bucketNormalizable.setMaxChildrenScore(0, 88.0));
+ assertFalse(bucketNormalizable.setMaxChildrenScore(1, 2.0));
+
+ assertEquals(88.0, bucket.getAnomalyScore(), EPSILON);
+ assertEquals(2.0, bucket.getMaxNormalizedProbability(), EPSILON);
+ }
+
+ public void testSetMaxChildrenScore_GivenInvalidType() {
+ expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).setMaxChildrenScore(3, 95.0));
+ }
+
+ public void testSetParentScore() {
+ expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).setParentScore(42.0));
+ }
+
+ public void testResetBigChangeFlag() {
+ BucketNormalizable normalizable = new BucketNormalizable(bucket);
+ normalizable.raiseBigChangeFlag();
+
+ normalizable.resetBigChangeFlag();
+
+ assertFalse(bucket.hadBigNormalizedUpdate());
+ }
+
+ public void testRaiseBigChangeFlag() {
+ BucketNormalizable normalizable = new BucketNormalizable(bucket);
+ normalizable.resetBigChangeFlag();
+
+ normalizable.raiseBigChangeFlag();
+
+ assertTrue(bucket.hadBigNormalizedUpdate());
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/InfluencerNormalizableTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/InfluencerNormalizableTests.java
new file mode 100644
index 00000000000..c00c0911c15
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/InfluencerNormalizableTests.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.results.Influencer;
+import org.junit.Before;
+
+import java.util.Date;
+
+public class InfluencerNormalizableTests extends ESTestCase {
+ private static final double EPSILON = 0.0001;
+ private Influencer influencer;
+
+ @Before
+ public void setUpInfluencer() {
+ influencer = new Influencer("foo", "airline", "AAL", new Date(), 600, 1);
+ influencer.setAnomalyScore(1.0);
+ influencer.setInitialAnomalyScore(2.0);
+ influencer.setProbability(0.05);
+ }
+
+ public void testIsContainerOnly() {
+ assertFalse(new InfluencerNormalizable(influencer).isContainerOnly());
+ }
+
+ public void testGetLevel() {
+ assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer).getLevel());
+ }
+
+ public void testGetPartitionFieldName() {
+ assertNull(new InfluencerNormalizable(influencer).getPartitionFieldName());
+ }
+
+ public void testGetPartitionFieldValue() {
+ assertNull(new InfluencerNormalizable(influencer).getPartitionFieldValue());
+ }
+
+ public void testGetPersonFieldName() {
+ assertEquals("airline", new InfluencerNormalizable(influencer).getPersonFieldName());
+ }
+
+ public void testGetFunctionName() {
+ assertNull(new InfluencerNormalizable(influencer).getFunctionName());
+ }
+
+ public void testGetValueFieldName() {
+ assertNull(new InfluencerNormalizable(influencer).getValueFieldName());
+ }
+
+ public void testGetProbability() {
+ assertEquals(0.05, new InfluencerNormalizable(influencer).getProbability(), EPSILON);
+ }
+
+ public void testGetNormalizedScore() {
+ assertEquals(1.0, new InfluencerNormalizable(influencer).getNormalizedScore(), EPSILON);
+ }
+
+ public void testSetNormalizedScore() {
+ InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
+
+ normalizable.setNormalizedScore(99.0);
+
+ assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
+ assertEquals(99.0, influencer.getAnomalyScore(), EPSILON);
+ }
+
+ public void testGetChildrenTypes() {
+ assertTrue(new InfluencerNormalizable(influencer).getChildrenTypes().isEmpty());
+ }
+
+ public void testGetChildren_ByType() {
+ expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).getChildren(0));
+ }
+
+ public void testGetChildren() {
+ assertTrue(new InfluencerNormalizable(influencer).getChildren().isEmpty());
+ }
+
+ public void testSetMaxChildrenScore() {
+ expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setMaxChildrenScore(0, 42.0));
+ }
+
+ public void testSetParentScore() {
+ expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setParentScore(42.0));
+ }
+
+ public void testResetBigChangeFlag() {
+ InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
+ normalizable.raiseBigChangeFlag();
+
+ normalizable.resetBigChangeFlag();
+
+ assertFalse(influencer.hadBigNormalizedUpdate());
+ }
+
+ public void testRaiseBigChangeFlag() {
+ InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
+ normalizable.resetBigChangeFlag();
+
+ normalizable.raiseBigChangeFlag();
+
+ assertTrue(influencer.hadBigNormalizedUpdate());
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResultTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResultTests.java
new file mode 100644
index 00000000000..209a604d339
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerResultTests.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.common.ParseFieldMatcher;
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
+
+public class NormalizerResultTests extends AbstractSerializingTestCase {
+
+ private static final double EPSILON = 0.0000000001;
+
+ public void testDefaultConstructor() {
+ NormalizerResult msg = new NormalizerResult();
+ assertNull(msg.getLevel());
+ assertNull(msg.getPartitionFieldName());
+ assertNull(msg.getPartitionFieldValue());
+ assertNull(msg.getPersonFieldName());
+ assertNull(msg.getFunctionName());
+ assertNull(msg.getValueFieldName());
+ assertEquals(0.0, msg.getProbability(), EPSILON);
+ assertEquals(0.0, msg.getNormalizedScore(), EPSILON);
+ }
+
+ @Override
+ protected NormalizerResult createTestInstance() {
+ NormalizerResult msg = new NormalizerResult();
+ msg.setLevel("leaf");
+ msg.setPartitionFieldName("part");
+ msg.setPartitionFieldValue("something");
+ msg.setPersonFieldName("person");
+ msg.setFunctionName("mean");
+ msg.setValueFieldName("value");
+ msg.setProbability(0.005);
+ msg.setNormalizedScore(98.7);
+ return msg;
+ }
+
+ @Override
+ protected Reader instanceReader() {
+ return NormalizerResult::new;
+ }
+
+ @Override
+ protected NormalizerResult parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
+ return NormalizerResult.PARSER.apply(parser, () -> matcher);
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java
new file mode 100644
index 00000000000..c04ad8102b4
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/NormalizerTests.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.results.Bucket;
+import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NormalizerTests extends ESTestCase {
+ private static final String JOB_ID = "foo";
+ private static final String QUANTILES_STATE = "someState";
+ private static final int BUCKET_SPAN = 600;
+ private static final double INITIAL_SCORE = 2.0;
+ private static final double FACTOR = 2.0;
+
+ private Bucket generateBucket(Date timestamp) throws IOException {
+ return new Bucket(JOB_ID, timestamp, BUCKET_SPAN);
+ }
+
+ private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) {
+ BucketInfluencer influencer = new BucketInfluencer(JOB_ID, timestamp, BUCKET_SPAN, 1);
+ influencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
+ influencer.setProbability(probability);
+ influencer.setInitialAnomalyScore(anomalyScore);
+ influencer.setAnomalyScore(anomalyScore);
+ return influencer;
+ }
+
+ public void testNormalize() throws IOException {
+ ExecutorService threadpool = Executors.newScheduledThreadPool(1);
+
+ NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
+ when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false),
+ any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
+ Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
+
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(0.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+
+ List asNormalizables = buckets.stream()
+ .map(b -> new BucketNormalizable(b)).collect(Collectors.toList());
+
+ normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
+
+ threadpool.shutdown();
+
+ assertEquals(1, asNormalizables.size());
+ assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001);
+ }
+}
\ No newline at end of file
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java
new file mode 100644
index 00000000000..cb7aacc319b
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java
@@ -0,0 +1,448 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer;
+
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.List;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
+import org.elasticsearch.xpack.prelert.job.Detector;
+import org.elasticsearch.xpack.prelert.job.Job;
+import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
+import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
+import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
+import org.elasticsearch.xpack.prelert.job.persistence.MockBatchedDocumentsIterator;
+import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
+import org.elasticsearch.xpack.prelert.job.results.Bucket;
+import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
+import org.elasticsearch.xpack.prelert.job.results.Influencer;
+import org.junit.Before;
+import org.mockito.MockitoAnnotations;
+
+public class ScoresUpdaterTests extends ESTestCase {
+ private static final String JOB_ID = "foo";
+ private static final String QUANTILES_STATE = "someState";
+ private static final long DEFAULT_BUCKET_SPAN = 3600;
+ private static final long DEFAULT_START_TIME = 0;
+ private static final long DEFAULT_END_TIME = 3600;
+
+ private JobProvider jobProvider = mock(JobProvider.class);
+ private JobRenormalizer jobRenormalizer = mock(JobRenormalizer.class);
+ private Normalizer normalizer = mock(Normalizer.class);
+ private NormalizerFactory normalizerFactory = mock(NormalizerFactory.class);
+
+ private Job job;
+ private ScoresUpdater scoresUpdater;
+
+ private Bucket generateBucket(Date timestamp) throws IOException {
+ return new Bucket(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN);
+ }
+
+ @Before
+ public void setUpMocks() throws IOException {
+ MockitoAnnotations.initMocks(this);
+
+ Job.Builder jobBuilder = new Job.Builder(JOB_ID);
+ jobBuilder.setRenormalizationWindowDays(1L);
+ List detectors = new ArrayList<>();
+ detectors.add(mock(Detector.class));
+ AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(detectors);
+ configBuilder.setBucketSpan(DEFAULT_BUCKET_SPAN);
+ jobBuilder.setAnalysisConfig(configBuilder);
+
+ job = jobBuilder.build();
+
+ scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizer, normalizerFactory);
+
+ givenProviderReturnsNoBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME);
+ givenProviderReturnsNoInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME);
+ givenNormalizerFactoryReturnsMock();
+ }
+
+ public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(0.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.7, 0.0));
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(0);
+ verifyBucketWasNotUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(0.0);
+ bucket.setBucketInfluencers(null);
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(0);
+ verifyBucketWasNotUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testUpdate_GivenSingleBucketWithoutBigChangeAndNoRecords() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(30.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasNotUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testUpdate_GivenSingleBucketWithoutBigChangeAndRecordsWithoutBigChange() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(30.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
+ List records = new ArrayList<>();
+ AnomalyRecord record1 = createRecordWithoutBigChange();
+ AnomalyRecord record2 = createRecordWithoutBigChange();
+ records.add(record1);
+ records.add(record2);
+ bucket.setRecords(records);
+ bucket.setRecordCount(2);
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasNotUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testUpdate_GivenSingleBucketWithBigChangeAndNoRecords() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testUpdate_GivenSingleBucketWithoutBigChangeAndSomeRecordsWithBigChange() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+
+ List records = new ArrayList<>();
+ AnomalyRecord record1 = createRecordWithBigChange();
+ AnomalyRecord record2 = createRecordWithoutBigChange();
+ AnomalyRecord record3 = createRecordWithBigChange();
+ records.add(record1);
+ records.add(record2);
+ records.add(record3);
+ bucket.setRecords(records);
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasNotUpdated(bucket);
+ verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
+ }
+
+ public void testUpdate_GivenSingleBucketWithBigChangeAndSomeRecordsWithBigChange() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+ List records = new ArrayList<>();
+ AnomalyRecord record1 = createRecordWithBigChange();
+ AnomalyRecord record2 = createRecordWithoutBigChange();
+ AnomalyRecord record3 = createRecordWithBigChange();
+ records.add(record1);
+ records.add(record2);
+ records.add(record3);
+ bucket.setRecords(records);
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasUpdated(bucket);
+ verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
+ }
+
+ public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException {
+ Deque batch1 = new ArrayDeque<>();
+ for (int i = 0; i < 10000; ++i) {
+ Bucket bucket = generateBucket(new Date(i * 1000));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+ batch1.add(bucket);
+ }
+
+ Bucket secondBatchBucket = generateBucket(new Date(10000 * 1000));
+ secondBatchBucket.addBucketInfluencer(createTimeBucketInfluencer(secondBatchBucket.getTimestamp(), 0.04, 42.0));
+ secondBatchBucket.setAnomalyScore(42.0);
+ secondBatchBucket.setMaxNormalizedProbability(50.0);
+ secondBatchBucket.raiseBigNormalizedUpdateFlag();
+ Deque batch2 = new ArrayDeque<>();
+ batch2.add(secondBatchBucket);
+
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch1, batch2);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+
+ // Batch 1 - Just verify first and last were updated as Mockito
+ // is forbiddingly slow when tring to verify all 10000
+ verifyBucketWasUpdated(batch1.getFirst());
+ verifyBucketRecordsWereNotUpdated(batch1.getFirst().getId());
+ verifyBucketWasUpdated(batch1.getLast());
+ verifyBucketRecordsWereNotUpdated(batch1.getLast().getId());
+
+ verifyBucketWasUpdated(secondBatchBucket);
+ verifyBucketRecordsWereNotUpdated(secondBatchBucket.getId());
+ }
+
+ public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondNormalization() throws IOException {
+ Bucket bucket1 = generateBucket(new Date(0));
+ bucket1.setAnomalyScore(42.0);
+ bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0));
+ bucket1.setMaxNormalizedProbability(50.0);
+ bucket1.raiseBigNormalizedUpdateFlag();
+ when(jobProvider.expandBucket(JOB_ID, false, bucket1)).thenReturn(100000);
+
+ Bucket bucket2 = generateBucket(new Date(10000 * 1000));
+ bucket2.addBucketInfluencer(createTimeBucketInfluencer(bucket2.getTimestamp(), 0.04, 42.0));
+ bucket2.setAnomalyScore(42.0);
+ bucket2.setMaxNormalizedProbability(50.0);
+ bucket2.raiseBigNormalizedUpdateFlag();
+
+ Deque batch = new ArrayDeque<>();
+ batch.add(bucket1);
+ batch.add(bucket2);
+ givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(2);
+
+ verifyBucketWasUpdated(bucket1);
+ verifyBucketRecordsWereNotUpdated(bucket1.getId());
+ verifyBucketWasUpdated(bucket2);
+ verifyBucketRecordsWereNotUpdated(bucket2.getId());
+ }
+
+ public void testUpdate_GivenInfluencerWithBigChange() throws IOException {
+ Influencer influencer = new Influencer(JOB_ID, "n", "v", new Date(DEFAULT_START_TIME), 600, 1);
+ influencer.raiseBigNormalizedUpdateFlag();
+
+ Deque influencers = new ArrayDeque<>();
+ influencers.add(influencer);
+ givenProviderReturnsInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME, influencers);
+
+ scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyInfluencerWasUpdated(influencer);
+ }
+
+ public void testDefaultRenormalizationWindowBasedOnTime() throws IOException {
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(2509200000L, 2595600000L, buckets);
+ givenProviderReturnsNoInfluencers(2509200000L, 2595600000L);
+
+ scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testManualRenormalizationWindow() throws IOException {
+
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(3600000, 90000000L, buckets);
+ givenProviderReturnsNoInfluencers(3600000, 90000000L);
+
+ scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ public void testManualRenormalizationWindow_GivenExtension() throws IOException {
+
+ Bucket bucket = generateBucket(new Date(0));
+ bucket.setAnomalyScore(42.0);
+ bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
+ bucket.setMaxNormalizedProbability(50.0);
+ bucket.raiseBigNormalizedUpdateFlag();
+
+ Deque buckets = new ArrayDeque<>();
+ buckets.add(bucket);
+ givenProviderReturnsBuckets(2700000, 90000000L, buckets);
+ givenProviderReturnsNoInfluencers(2700000, 90000000L);
+
+ scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false);
+
+ verifyNormalizerWasInvoked(1);
+ verifyBucketWasUpdated(bucket);
+ verifyBucketRecordsWereNotUpdated(bucket.getId());
+ }
+
+ private void verifyNormalizerWasInvoked(int times) throws IOException {
+ int bucketSpan = job.getAnalysisConfig() == null ? 0
+ : job.getAnalysisConfig().getBucketSpan().intValue();
+ verify(normalizer, times(times)).normalize(
+ eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
+ eq(QUANTILES_STATE));
+ }
+
+ private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) {
+ BucketInfluencer influencer = new BucketInfluencer(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN, 1);
+ influencer.setProbability(probability);
+ influencer.setAnomalyScore(anomalyScore);
+ influencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
+ return influencer;
+ }
+
+ private void givenNormalizerFactoryReturnsMock() {
+ when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer);
+ }
+ private void givenProviderReturnsNoBuckets(long startTime, long endTime) {
+ givenBuckets(startTime, endTime, Collections.emptyList());
+ }
+
+ private void givenProviderReturnsBuckets(long startTime, long endTime, Deque batch1, Deque batch2) {
+ List> batches = new ArrayList<>();
+ batches.add(new ArrayDeque<>(batch1));
+ batches.add(new ArrayDeque<>(batch2));
+ givenBuckets(startTime, endTime, batches);
+ }
+
+ private void givenProviderReturnsBuckets(long startTime, long endTime, Deque buckets) {
+ List> batches = new ArrayList<>();
+ batches.add(new ArrayDeque<>(buckets));
+ givenBuckets(startTime, endTime, batches);
+ }
+
+ private void givenBuckets(long startTime, long endTime, List> batches) {
+ BatchedDocumentsIterator iterator = new MockBatchedDocumentsIterator<>(startTime,
+ endTime, batches);
+ when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(iterator);
+ }
+
+ private void givenProviderReturnsNoInfluencers(long startTime, long endTime) {
+ givenProviderReturnsInfluencers(startTime, endTime, new ArrayDeque<>());
+ }
+
+ private void givenProviderReturnsInfluencers(long startTime, long endTime,
+ Deque influencers) {
+ List> batches = new ArrayList<>();
+ batches.add(new ArrayDeque<>(influencers));
+ BatchedDocumentsIterator iterator = new MockBatchedDocumentsIterator<>(
+ startTime, endTime, batches);
+ when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
+ }
+
+ private void verifyBucketWasUpdated(Bucket bucket) {
+ verify(jobRenormalizer).updateBucket(bucket);
+ }
+
+ private void verifyRecordsWereUpdated(String bucketId, List records) {
+ verify(jobRenormalizer).updateRecords(bucketId, records);
+ }
+
+ private void verifyBucketWasNotUpdated(Bucket bucket) {
+ verify(jobRenormalizer, never()).updateBucket(bucket);
+ }
+
+ private void verifyBucketRecordsWereNotUpdated(String bucketId) {
+ verify(jobRenormalizer, never()).updateRecords(eq(bucketId),
+ anyListOf(AnomalyRecord.class));
+ }
+
+ private static AnomalyRecord createRecordWithoutBigChange() {
+ return createRecord(false);
+ }
+
+ private static AnomalyRecord createRecordWithBigChange() {
+ return createRecord(true);
+ }
+
+ private static AnomalyRecord createRecord(boolean hadBigChange) {
+ AnomalyRecord anomalyRecord = mock(AnomalyRecord.class);
+ when(anomalyRecord.hadBigNormalizedUpdate()).thenReturn(hadBigChange);
+ when(anomalyRecord.getId()).thenReturn("someId");
+ return anomalyRecord;
+ }
+
+ private void verifyInfluencerWasUpdated(Influencer influencer) {
+ List list = new ArrayList<>();
+ list.add(influencer);
+ verify(jobRenormalizer).updateInfluencer(eq(JOB_ID), eq(list));
+ }
+}
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandlerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandlerTests.java
new file mode 100644
index 00000000000..d200d650bc3
--- /dev/null
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/output/NormalizerResultHandlerTests.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.prelert.job.process.normalizer.output;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerResult;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class NormalizerResultHandlerTests extends ESTestCase {
+
+ private static final double EPSILON = 0.0000001;
+
+ public void testParse() throws IOException {
+
+ String testData = "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v1\","
+ + "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ + "\"value_field_name\":\"x\",\"probability\":0.01,\"normalized_score\":88.88}\n"
+ + "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v2\","
+ + "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ + "\"value_field_name\":\"x\",\"probability\":0.02,\"normalized_score\":44.44}\n"
+ + "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v3\","
+ + "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ + "\"value_field_name\":\"x\",\"probability\":0.03,\"normalized_score\":22.22}\n";
+
+ InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+ NormalizerResultHandler handler = new NormalizerResultHandler(Settings.EMPTY, is);
+ handler.process();
+ List results = handler.getNormalizedResults();
+ assertEquals(3, results.size());
+ assertEquals(88.88, results.get(0).getNormalizedScore(), EPSILON);
+ assertEquals(44.44, results.get(1).getNormalizedScore(), EPSILON);
+ assertEquals(22.22, results.get(2).getNormalizedScore(), EPSILON);
+ }
+}
+
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java
index 998e64ab1c9..0b6f5a941fd 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java
@@ -255,56 +255,56 @@ public class BucketTests extends AbstractSerializingTestCase {
assertEquals(bucket1.hashCode(), bucket2.hashCode());
}
- public void testIsNormalisable_GivenNullBucketInfluencers() {
+ public void testIsNormalizable_GivenNullBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(null);
bucket.setAnomalyScore(90.0);
- assertFalse(bucket.isNormalisable());
+ assertFalse(bucket.isNormalizable());
}
- public void testIsNormalisable_GivenEmptyBucketInfluencers() {
+ public void testIsNormalizable_GivenEmptyBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(Collections.emptyList());
bucket.setAnomalyScore(90.0);
- assertFalse(bucket.isNormalisable());
+ assertFalse(bucket.isNormalizable());
}
- public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
+ public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(0);
- assertFalse(bucket.isNormalisable());
+ assertFalse(bucket.isNormalizable());
}
- public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
+ public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(1);
- assertTrue(bucket.isNormalisable());
+ assertTrue(bucket.isNormalizable());
}
- public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
+ public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(0);
- assertTrue(bucket.isNormalisable());
+ assertTrue(bucket.isNormalizable());
}
- public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() {
+ public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(1);
- assertTrue(bucket.isNormalisable());
+ assertTrue(bucket.isNormalizable());
}
public void testPartitionAnomalyScore() {
diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java
index 7135d52ffd3..49677696a5f 100644
--- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java
+++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/PerPartitionMaxProbabilitiesTests.java
@@ -53,10 +53,9 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
assertEquals(2, pProbs.size());
for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) {
if (pProb.getPartitionValue().equals("A")) {
- assertEquals(40.0, pProb.getMaxNormalisedProbability(), 0.0001);
- }
- else {
- assertEquals(90.0, pProb.getMaxNormalisedProbability(), 0.0001);
+ assertEquals(40.0, pProb.getMaxNormalizedProbability(), 0.0001);
+ } else {
+ assertEquals(90.0, pProb.getMaxNormalizedProbability(), 0.0001);
}
}
}
@@ -81,4 +80,4 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
record.setNormalizedProbability(normalizedProbability);
return record;
}
-}
\ No newline at end of file
+}