[ML] Removing old per-partition normalization code (#32816)

[ML] Removing old per-partition normalization code

Per-partition normalization is an old, undocumented feature that was
never used by clients. It has been superseded by per-partition maximum
scoring.

To maintain communication compatibility with nodes prior to 6.5 it is
necessary to maintain/cope with the old wire format
This commit is contained in:
Ed Savage 2018-08-15 13:13:32 +01:00 committed by GitHub
parent 7ff37ae232
commit 8ce1ab3ed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 99 additions and 596 deletions

View File

@ -64,7 +64,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
private static final ParseField OVERLAPPING_BUCKETS = new ParseField("overlapping_buckets");
private static final ParseField RESULT_FINALIZATION_WINDOW = new ParseField("result_finalization_window");
private static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields");
private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization");
public static final String ML_CATEGORY_FIELD = "mlcategory";
public static final Set<String> AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD));
@ -98,7 +97,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
parser.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS);
parser.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW);
parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
parser.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION);
return parser;
}
@ -117,12 +115,11 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
private final Boolean overlappingBuckets;
private final Long resultFinalizationWindow;
private final Boolean multivariateByFields;
private final boolean usePerPartitionNormalization;
private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName,
List<Detector> detectors, List<String> influencers, Boolean overlappingBuckets, Long resultFinalizationWindow,
Boolean multivariateByFields, boolean usePerPartitionNormalization) {
Boolean multivariateByFields) {
this.detectors = detectors;
this.bucketSpan = bucketSpan;
this.latency = latency;
@ -134,7 +131,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
this.overlappingBuckets = overlappingBuckets;
this.resultFinalizationWindow = resultFinalizationWindow;
this.multivariateByFields = multivariateByFields;
this.usePerPartitionNormalization = usePerPartitionNormalization;
}
public AnalysisConfig(StreamInput in) throws IOException {
@ -165,7 +161,12 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
}
}
usePerPartitionNormalization = in.readBoolean();
// BWC for removed per-partition normalization
// Version check is temporarily against the latest to satisfy CI tests
// TODO change to V_6_5_0 after successful backport to 6.x
if (in.getVersion().before(Version.V_7_0_0_alpha1)) {
in.readBoolean();
}
}
@Override
@ -195,7 +196,12 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
out.writeBoolean(false);
}
out.writeBoolean(usePerPartitionNormalization);
// BWC for removed per-partition normalization
// Version check is temporarily against the latest to satisfy CI tests
// TODO change to V_6_5_0 after successful backport to 6.x
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
out.writeBoolean(false);
}
}
/**
@ -299,10 +305,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
return multivariateByFields;
}
public boolean getUsePerPartitionNormalization() {
return usePerPartitionNormalization;
}
/**
* Return the set of fields required by the analysis.
* These are the influencer fields, metric field, partition field,
@ -403,9 +405,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
if (multivariateByFields != null) {
builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
}
if (usePerPartitionNormalization) {
builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization);
}
builder.endObject();
return builder;
}
@ -416,7 +415,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
if (o == null || getClass() != o.getClass()) return false;
AnalysisConfig that = (AnalysisConfig) o;
return Objects.equals(latency, that.latency) &&
usePerPartitionNormalization == that.usePerPartitionNormalization &&
Objects.equals(bucketSpan, that.bucketSpan) &&
Objects.equals(categorizationFieldName, that.categorizationFieldName) &&
Objects.equals(categorizationFilters, that.categorizationFilters) &&
@ -434,7 +432,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
return Objects.hash(
bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency,
summaryCountFieldName, detectors, influencers, overlappingBuckets, resultFinalizationWindow,
multivariateByFields, usePerPartitionNormalization
multivariateByFields
);
}
@ -453,7 +451,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
private Boolean overlappingBuckets;
private Long resultFinalizationWindow;
private Boolean multivariateByFields;
private boolean usePerPartitionNormalization = false;
public Builder(List<Detector> detectors) {
setDetectors(detectors);
@ -472,7 +469,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
this.overlappingBuckets = analysisConfig.overlappingBuckets;
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
this.multivariateByFields = analysisConfig.multivariateByFields;
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
}
public void setDetectors(List<Detector> detectors) {
@ -535,10 +531,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
this.multivariateByFields = multivariateByFields;
}
public void setUsePerPartitionNormalization(boolean usePerPartitionNormalization) {
this.usePerPartitionNormalization = usePerPartitionNormalization;
}
/**
* Checks the configuration is valid
* <ol>
@ -571,16 +563,11 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors);
if (usePerPartitionNormalization) {
checkDetectorsHavePartitionFields(detectors);
checkNoInfluencersAreSet(influencers);
}
verifyNoInconsistentNestedFieldNames();
return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
latency, summaryCountFieldName, detectors, influencers, overlappingBuckets,
resultFinalizationWindow, multivariateByFields, usePerPartitionNormalization);
resultFinalizationWindow, multivariateByFields);
}
private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() {
@ -704,23 +691,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
}
}
private static void checkDetectorsHavePartitionFields(List<Detector> detectors) {
for (Detector detector : detectors) {
if (!Strings.isNullOrEmpty(detector.getPartitionFieldName())) {
return;
}
}
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD));
}
private static void checkNoInfluencersAreSet(List<String> influencers) {
if (!influencers.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS));
}
}
private static boolean isValidRegex(String exp) {
try {
Pattern.compile(exp);

View File

@ -130,10 +130,6 @@ public final class Messages {
"over_field_name cannot be used with function ''{0}''";
public static final String JOB_CONFIG_OVERLAPPING_BUCKETS_INCOMPATIBLE_FUNCTION =
"Overlapping buckets cannot be used with function ''{0}''";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS =
"A job configured with Per-Partition Normalization cannot use influencers";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD =
"If the job is configured with Per-Partition Normalization enabled a detector must have a partition field";
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''";
public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED =
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " +

View File

@ -227,23 +227,6 @@ public class ElasticsearchMappings {
.startObject(Bucket.SCHEDULED_EVENTS.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName())
.field(TYPE, NESTED)
@ -328,7 +311,7 @@ public class ElasticsearchMappings {
}
private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException {
// Forecast Output
builder.startObject(Forecast.FORECAST_LOWER.getPreferredName())
.field(TYPE, DOUBLE)
@ -370,7 +353,7 @@ public class ElasticsearchMappings {
.field(TYPE, LONG)
.endObject();
}
/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder

View File

@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Bucket Result POJO
@ -43,7 +42,6 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucket_influencers");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PARTITION_SCORES = new ParseField("partition_scores");
public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events");
// Used for QueryPage
@ -58,6 +56,19 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ConstructingObjectParser<Bucket, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<Bucket, Void> LENIENT_PARSER = createParser(true);
/* *
* Read and discard the old (prior to 6.5) perPartitionNormalization values
*/
public static Bucket readOldPerPartitionNormalization(StreamInput in) throws IOException {
in.readString();
in.readString();
in.readDouble();
in.readDouble();
in.readDouble();
return null;
}
private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<Bucket, Void> parser = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, ignoreUnknownFields,
a -> new Bucket((String) a[0], (Date) a[1], (long) a[2]));
@ -82,8 +93,6 @@ public class Bucket implements ToXContentObject, Writeable {
parser.declareObjectArray(Bucket::setBucketInfluencers, ignoreUnknownFields ?
BucketInfluencer.LENIENT_PARSER : BucketInfluencer.STRICT_PARSER, BUCKET_INFLUENCERS);
parser.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
parser.declareObjectArray(Bucket::setPartitionScores, ignoreUnknownFields ?
PartitionScore.LENIENT_PARSER : PartitionScore.STRICT_PARSER, PARTITION_SCORES);
parser.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
parser.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS);
@ -100,7 +109,6 @@ public class Bucket implements ToXContentObject, Writeable {
private boolean isInterim;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private List<PartitionScore> partitionScores = Collections.emptyList();
private List<String> scheduledEvents = Collections.emptyList();
public Bucket(String jobId, Date timestamp, long bucketSpan) {
@ -120,7 +128,6 @@ public class Bucket implements ToXContentObject, Writeable {
this.isInterim = other.isInterim;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.partitionScores = new ArrayList<>(other.partitionScores);
this.scheduledEvents = new ArrayList<>(other.scheduledEvents);
}
@ -143,7 +150,10 @@ public class Bucket implements ToXContentObject, Writeable {
if (in.getVersion().before(Version.V_5_5_0)) {
in.readGenericValue();
}
partitionScores = in.readList(PartitionScore::new);
// bwc for perPartitionNormalization
if (in.getVersion().before(Version.V_6_5_0)) {
in.readList(Bucket::readOldPerPartitionNormalization);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
scheduledEvents = in.readList(StreamInput::readString);
if (scheduledEvents.isEmpty()) {
@ -174,7 +184,10 @@ public class Bucket implements ToXContentObject, Writeable {
if (out.getVersion().before(Version.V_5_5_0)) {
out.writeGenericValue(Collections.emptyMap());
}
out.writeList(partitionScores);
// bwc for perPartitionNormalization
if (out.getVersion().before(Version.V_6_5_0)) {
out.writeList(Collections.emptyList());
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(scheduledEvents);
}
@ -195,9 +208,7 @@ public class Bucket implements ToXContentObject, Writeable {
builder.field(Result.IS_INTERIM.getPreferredName(), isInterim);
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
if (partitionScores.isEmpty() == false) {
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
}
if (scheduledEvents.isEmpty() == false) {
builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents);
}
@ -304,14 +315,6 @@ public class Bucket implements ToXContentObject, Writeable {
bucketInfluencers.add(bucketInfluencer);
}
public List<PartitionScore> getPartitionScores() {
return partitionScores;
}
public void setPartitionScores(List<PartitionScore> scores) {
partitionScores = Objects.requireNonNull(scores);
}
public List<String> getScheduledEvents() {
return scheduledEvents;
}
@ -320,24 +323,10 @@ public class Bucket implements ToXContentObject, Writeable {
this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName());
}
public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();
return first.isPresent() ? first.get().getInitialRecordScore() : 0.0;
}
public double partitionAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();
return first.isPresent() ? first.get().getRecordScore() : 0.0;
}
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records,
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents);
isInterim, bucketSpan, bucketInfluencers, processingTimeMs, scheduledEvents);
}
/**
@ -360,7 +349,6 @@ public class Bucket implements ToXContentObject, Writeable {
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
&& Objects.equals(this.partitionScores, that.partitionScores)
&& (this.processingTimeMs == that.processingTimeMs)
&& Objects.equals(this.scheduledEvents, that.scheduledEvents);
}
@ -374,6 +362,6 @@ public class Bucket implements ToXContentObject, Writeable {
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalizable() {
return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0);
return anomalyScore > 0.0;
}
}

View File

@ -1,131 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.results;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class PartitionScore implements ToXContentObject, Writeable {
public static final ParseField PARTITION_SCORE = new ParseField("partition_score");
private final String partitionFieldValue;
private final String partitionFieldName;
private final double initialRecordScore;
private double recordScore;
private double probability;
public static final ConstructingObjectParser<PartitionScore, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<PartitionScore, Void> LENIENT_PARSER = createParser(true);
private static ConstructingObjectParser<PartitionScore, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<PartitionScore, Void> parser = new ConstructingObjectParser<>(PARTITION_SCORE.getPreferredName(),
ignoreUnknownFields, a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3], (Double) a[4]));
parser.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_NAME);
parser.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE);
parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.INITIAL_RECORD_SCORE);
parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.RECORD_SCORE);
parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.PROBABILITY);
return parser;
}
public PartitionScore(String fieldName, String fieldValue, double initialRecordScore, double recordScore, double probability) {
partitionFieldName = fieldName;
partitionFieldValue = fieldValue;
this.initialRecordScore = initialRecordScore;
this.recordScore = recordScore;
this.probability = probability;
}
public PartitionScore(StreamInput in) throws IOException {
partitionFieldName = in.readString();
partitionFieldValue = in.readString();
initialRecordScore = in.readDouble();
recordScore = in.readDouble();
probability = in.readDouble();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionFieldName);
out.writeString(partitionFieldValue);
out.writeDouble(initialRecordScore);
out.writeDouble(recordScore);
out.writeDouble(probability);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName);
builder.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue);
builder.field(AnomalyRecord.INITIAL_RECORD_SCORE.getPreferredName(), initialRecordScore);
builder.field(AnomalyRecord.RECORD_SCORE.getPreferredName(), recordScore);
builder.field(AnomalyRecord.PROBABILITY.getPreferredName(), probability);
builder.endObject();
return builder;
}
public double getInitialRecordScore() {
return initialRecordScore;
}
public double getRecordScore() {
return recordScore;
}
public void setRecordScore(double recordScore) {
this.recordScore = recordScore;
}
public String getPartitionFieldName() {
return partitionFieldName;
}
public String getPartitionFieldValue() {
return partitionFieldValue;
}
public double getProbability() {
return probability;
}
public void setProbability(double probability) {
this.probability = probability;
}
@Override
public int hashCode() {
return Objects.hash(partitionFieldName, partitionFieldValue, probability, initialRecordScore, recordScore);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof PartitionScore == false) {
return false;
}
PartitionScore that = (PartitionScore) other;
// id is excluded from the test as it is generated by the datastore
return Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability)
&& (this.initialRecordScore == that.initialRecordScore) && (this.recordScore == that.recordScore);
}
}

View File

@ -81,7 +81,6 @@ public final class ReservedFieldNames {
Bucket.EVENT_COUNT.getPreferredName(),
Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(),
Bucket.PROCESSING_TIME_MS.getPreferredName(),
Bucket.PARTITION_SCORES.getPreferredName(),
Bucket.SCHEDULED_EVENTS.getPreferredName(),
BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(),

View File

@ -11,7 +11,6 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.core.ml.job.results.PartitionScore;
import java.util.ArrayList;
import java.util.Collections;
@ -53,15 +52,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Res
if (randomBoolean()) {
bucket.setInterim(randomBoolean());
}
if (randomBoolean()) {
int size = randomInt(10);
List<PartitionScore> partitionScores = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partitionScores.add(new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20),
randomDouble(), randomDouble(), randomDouble()));
}
bucket.setPartitionScores(partitionScores);
}
if (randomBoolean()) {
bucket.setProcessingTimeMs(randomLong());
}

View File

@ -97,11 +97,8 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
builder.setResultFinalizationWindow(randomNonNegativeLong());
}
boolean usePerPartitionNormalisation = randomBoolean();
builder.setUsePerPartitionNormalization(usePerPartitionNormalisation);
if (!usePerPartitionNormalisation) { // influencers can't be used with per partition normalisation
builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false)));
}
builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false)));
return builder;
}
@ -690,40 +687,15 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY), e.getMessage());
}
public void testCheckDetectorsHavePartitionFields() {
AnalysisConfig.Builder config = createValidConfig();
config.setUsePerPartitionNormalization(true);
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, config::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD), e.getMessage());
}
public void testCheckDetectorsHavePartitionFields_doesntThrowWhenValid() {
AnalysisConfig.Builder config = createValidConfig();
Detector.Builder builder = new Detector.Builder(config.build().getDetectors().get(0));
builder.setPartitionFieldName("pField");
config.build().getDetectors().set(0, builder.build());
config.setUsePerPartitionNormalization(true);
config.build();
}
public void testCheckNoInfluencersAreSet() {
AnalysisConfig.Builder config = createValidConfig();
Detector.Builder builder = new Detector.Builder(config.build().getDetectors().get(0));
builder.setPartitionFieldName("pField");
config.build().getDetectors().set(0, builder.build());
config.setInfluencers(Arrays.asList("inf1", "inf2"));
config.setUsePerPartitionNormalization(true);
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, config::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS), e.getMessage());
}
public void testVerify_GivenCategorizationFiltersContainInvalidRegex() {
AnalysisConfig.Builder config = createValidCategorizationConfig();
config.setCategorizationFilters(Arrays.asList("foo", "("));
@ -756,7 +728,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
@Override
protected AnalysisConfig mutateInstance(AnalysisConfig instance) {
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(instance);
switch (between(0, 11)) {
switch (between(0, 10)) {
case 0:
List<Detector> detectors = new ArrayList<>(instance.getDetectors());
Detector.Builder detector = new Detector.Builder();
@ -832,7 +804,6 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
List<String> influencers = new ArrayList<>(instance.getInfluencers());
influencers.add(randomAlphaOfLengthBetween(5, 10));
builder.setInfluencers(influencers);
builder.setUsePerPartitionNormalization(false);
break;
case 8:
if (instance.getOverlappingBuckets() == null) {
@ -855,13 +826,6 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
builder.setMultivariateByFields(instance.getMultivariateByFields() == false);
}
break;
case 11:
boolean usePerPartitionNormalization = instance.getUsePerPartitionNormalization() == false;
builder.setUsePerPartitionNormalization(usePerPartitionNormalization);
if (usePerPartitionNormalization) {
builder.setInfluencers(Collections.emptyList());
}
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

View File

@ -384,8 +384,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) ->
new MultiplyingNormalizerProcess(settings, 1.0);
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));

View File

@ -61,7 +61,6 @@ public class AutodetectBuilder {
public static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
public static final String MODEL_CONFIG_ARG = "--modelconfig=";
public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState=";
public static final String PER_PARTITION_NORMALIZATION = "--perPartitionNormalization";
private static final String CONF_EXTENSION = ".conf";
static final String JOB_ID_ARG = "--jobid=";
@ -207,10 +206,6 @@ public class AutodetectBuilder {
if (Boolean.TRUE.equals(analysisConfig.getMultivariateByFields())) {
command.add(MULTIVARIATE_BY_FIELDS_ARG);
}
if (analysisConfig.getUsePerPartitionNormalization()) {
command.add(PER_PARTITION_NORMALIZATION);
}
}
// Input is always length encoded

View File

@ -499,7 +499,7 @@ public class AutodetectProcessManager extends AbstractComponent {
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
renormalizerExecutorService);
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
onProcessCrash(jobTask));

View File

@ -16,12 +16,10 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE;
public class BucketNormalizable extends Normalizable {
private static final List<ChildType> CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, PARTITION_SCORE);
private static final List<ChildType> CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER);
private final Bucket bucket;
@ -117,11 +115,6 @@ public class BucketNormalizable extends Normalizable {
.map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex()))
.collect(Collectors.toList()));
break;
case PARTITION_SCORE:
children.addAll(bucket.getPartitionScores().stream()
.map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex()))
.collect(Collectors.toList()));
break;
default:
throw new IllegalArgumentException("Invalid type: " + type);
}
@ -135,8 +128,6 @@ public class BucketNormalizable extends Normalizable {
double oldScore = bucket.getAnomalyScore();
bucket.setAnomalyScore(maxScore);
return maxScore != oldScore;
case PARTITION_SCORE:
return false;
default:
throw new IllegalArgumentException("Invalid type: " + childrenType);
}

View File

@ -38,20 +38,19 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
boolean perPartitionNormalization, ExecutorService executorService) {
ExecutorService executorService) {
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, jobId,
true, false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan, perPartitionNormalization);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
return new NativeNormalizerProcess(jobId, settings, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), executorService);
}
private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan,
boolean perPartitionNormalization) {
private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan) {
try {
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan, perPartitionNormalization).build();
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
processPipes.addArgs(command);
nativeController.startProcess(command);
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);

View File

@ -11,7 +11,7 @@ import java.util.List;
import java.util.Objects;
public abstract class Normalizable implements ToXContentObject {
public enum ChildType {BUCKET_INFLUENCER, RECORD, PARTITION_SCORE};
public enum ChildType {BUCKET_INFLUENCER, RECORD};
private final String indexName;
private boolean hadBigNormalizedUpdate;

View File

@ -46,15 +46,14 @@ public class Normalizer {
* and normalizes the given results.
*
* @param bucketSpan If <code>null</code> the default is used
* @param perPartitionNormalization Is normalization per partition (rather than per job)?
* @param results Will be updated with the normalized results
* @param quantilesState The state to be used to seed the system change
* normalizer
*/
public void normalize(Integer bucketSpan, boolean perPartitionNormalization,
public void normalize(Integer bucketSpan,
List<? extends Normalizable> results, String quantilesState) {
NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan,
perPartitionNormalization, executorService);
executorService);
NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler();
Future<?> resultsHandlerFuture = executorService.submit(() -> {
try {

View File

@ -29,15 +29,12 @@ public class NormalizerBuilder {
private final String jobId;
private final String quantilesState;
private final Integer bucketSpan;
private final boolean perPartitionNormalization;
public NormalizerBuilder(Environment env, String jobId, String quantilesState, Integer bucketSpan,
boolean perPartitionNormalization) {
public NormalizerBuilder(Environment env, String jobId, String quantilesState, Integer bucketSpan) {
this.env = env;
this.jobId = jobId;
this.quantilesState = quantilesState;
this.bucketSpan = bucketSpan;
this.perPartitionNormalization = perPartitionNormalization;
}
/**
@ -49,9 +46,6 @@ public class NormalizerBuilder {
command.add(NORMALIZE_PATH);
addIfNotNull(bucketSpan, AutodetectBuilder.BUCKET_SPAN_ARG, command);
command.add(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG);
if (perPartitionNormalization) {
command.add(AutodetectBuilder.PER_PARTITION_NORMALIZATION);
}
if (quantilesState != null) {
Path quantilesStateFilePath = AutodetectBuilder.writeNormalizerInitState(jobId, quantilesState, env);

View File

@ -17,6 +17,5 @@ public interface NormalizerProcessFactory {
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @return The process
*/
NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, boolean perPartitionNormalization,
ExecutorService executorService);
NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, ExecutorService executorService);
}

View File

@ -1,87 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.results.PartitionScore;
import java.io.IOException;
import java.util.Objects;
public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
private final PartitionScore score;
public PartitionScoreNormalizable(PartitionScore score, String indexName) {
super(indexName);
this.score = Objects.requireNonNull(score);
}
@Override
public String getId() {
throw new UnsupportedOperationException("PartitionScore has no ID as it should not be persisted outside of the owning bucket");
}
@Override
public Level getLevel() {
return Level.PARTITION;
}
@Override
public String getPartitionFieldName() {
return score.getPartitionFieldName();
}
@Override
public String getPartitionFieldValue() {
return score.getPartitionFieldValue();
}
@Override
public String getPersonFieldName() {
return null;
}
@Override
public String getPersonFieldValue() {
return null;
}
@Override
public String getFunctionName() {
return null;
}
@Override
public String getValueFieldName() {
return null;
}
@Override
public double getProbability() {
return score.getProbability();
}
@Override
public double getNormalizedScore() {
return score.getRecordScore();
}
@Override
public void setNormalizedScore(double normalizedScore) {
score.setRecordScore(normalizedScore);
}
@Override
public void setParentScore(double parentScore) {
// Do nothing as it is not holding the parent score.
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return score.toXContent(builder, params);
}
}

View File

@ -79,12 +79,12 @@ public class ScoresUpdater {
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) {
public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs) {
Normalizer normalizer = normalizerFactory.create(jobId);
int[] counts = {0, 0};
updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts);
updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts);
// The updates will have been persisted in batches throughout the renormalization
// process - this call just catches any leftovers
@ -94,7 +94,7 @@ public class ScoresUpdater {
}
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Result<Bucket>> bucketsIterator =
jobResultsProvider.newBatchedBucketsIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
@ -114,14 +114,14 @@ public class ScoresUpdater {
if (current.result.isNormalizable()) {
bucketsToRenormalize.add(new BucketNormalizable(current.result, current.index));
if (bucketsToRenormalize.size() >= TARGET_BUCKETS_TO_RENORMALIZE) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization);
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts);
bucketsToRenormalize.clear();
}
}
}
}
if (!bucketsToRenormalize.isEmpty()) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization);
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts);
}
}
@ -130,8 +130,8 @@ public class ScoresUpdater {
}
private void normalizeBuckets(Normalizer normalizer, List<BucketNormalizable> normalizableBuckets,
String quantilesState, int[] counts, boolean perPartitionNormalization) {
normalizer.normalize(bucketSpan, perPartitionNormalization, normalizableBuckets, quantilesState);
String quantilesState, int[] counts) {
normalizer.normalize(bucketSpan, normalizableBuckets, quantilesState);
for (BucketNormalizable bucketNormalizable : normalizableBuckets) {
if (bucketNormalizable.hadBigNormalizedUpdate()) {
@ -144,7 +144,7 @@ public class ScoresUpdater {
}
private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobResultsProvider.newBatchedRecordsIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false);
@ -160,14 +160,14 @@ public class ScoresUpdater {
List<Normalizable> asNormalizables = records.stream()
.map(recordResultIndex -> new RecordNormalizable(recordResultIndex.result, recordResultIndex.index))
.collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
normalizer.normalize(bucketSpan, asNormalizables, quantilesState);
persistChanged(counts, asNormalizables);
}
}
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobResultsProvider.newBatchedInfluencersIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false);
@ -183,7 +183,7 @@ public class ScoresUpdater {
List<Normalizable> asNormalizables = influencers.stream()
.map(influencerResultIndex -> new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index))
.collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
normalizer.normalize(bucketSpan, asNormalizables, quantilesState);
persistChanged(counts, asNormalizables);
}

View File

@ -26,7 +26,6 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
private final String jobId;
private final ScoresUpdater scoresUpdater;
private final ExecutorService executorService;
private final boolean isPerPartitionNormalization;
private final Deque<QuantilesWithLatch> quantilesDeque = new ConcurrentLinkedDeque<>();
private final Deque<CountDownLatch> latchDeque = new ConcurrentLinkedDeque<>();
/**
@ -34,12 +33,10 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
*/
private final Semaphore semaphore = new Semaphore(1);
public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService,
boolean isPerPartitionNormalization) {
public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService) {
this.jobId = jobId;
this.scoresUpdater = scoresUpdater;
this.executorService = executorService;
this.isPerPartitionNormalization = isPerPartitionNormalization;
}
@Override
@ -161,8 +158,7 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
jobId, latestBucketTimeMs, earliestBucketTimeMs);
windowExtensionMs = 0;
}
scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs,
isPerPartitionNormalization);
scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs);
latch.countDown();
latch = null;
}

View File

@ -56,7 +56,6 @@ public class AutodetectBuilderTests extends ESTestCase {
acBuilder.setSummaryCountFieldName("summaryField");
acBuilder.setOverlappingBuckets(true);
acBuilder.setMultivariateByFields(true);
acBuilder.setUsePerPartitionNormalization(true);
job.setAnalysisConfig(acBuilder);
DataDescription.Builder dd = new DataDescription.Builder();
@ -66,7 +65,7 @@ public class AutodetectBuilderTests extends ESTestCase {
job.setDataDescription(dd);
List<String> command = autodetectBuilder(job.build()).buildAutodetectCommand();
assertEquals(13, command.size());
assertEquals(12, command.size());
assertTrue(command.contains(AutodetectBuilder.AUTODETECT_PATH));
assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "120"));
assertTrue(command.contains(AutodetectBuilder.LATENCY_ARG + "360"));
@ -80,8 +79,6 @@ public class AutodetectBuilderTests extends ESTestCase {
assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "tf"));
assertTrue(command.contains(AutodetectBuilder.JOB_ID_ARG + "unit-test-job"));
assertTrue(command.contains(AutodetectBuilder.PER_PARTITION_NORMALIZATION));
int expectedPersistInterval = 10800 + AutodetectBuilder.calculateStaggeringInterval(job.getId());
assertTrue(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval));
int expectedMaxQuantileInterval = 21600 + AutodetectBuilder.calculateStaggeringInterval(job.getId());
@ -116,4 +113,4 @@ public class AutodetectBuilderTests extends ESTestCase {
private AutodetectBuilder autodetectBuilder(Job job) {
return new AutodetectBuilder(job, filesToDelete, logger, env, settings, nativeController, processPipes);
}
}
}

View File

@ -172,25 +172,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(persister);
}
public void testProcessResult_records_isPerPartitionNormalization() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
record1.setPartitionFieldValue("pValue");
AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123);
record2.setPartitionFieldValue("pValue");
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
}
public void testProcessResult_influencers() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);

View File

@ -9,10 +9,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.core.ml.job.results.PartitionScore;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@ -46,11 +44,6 @@ public class BucketNormalizableTests extends ESTestCase {
AnomalyRecord record2 = new AnomalyRecord("foo", bucket.getTimestamp(), 600);
record2.setRecordScore(2.0);
bucket.setRecords(Arrays.asList(record1, record2));
List<PartitionScore> partitionScores = new ArrayList<>();
partitionScores.add(new PartitionScore("pf1", "pv1", 0.3, 0.2, 0.1));
partitionScores.add(new PartitionScore("pf1", "pv2", 0.5, 0.4, 0.01));
bucket.setPartitionScores(partitionScores);
}
public void testIsContainerOnly() {
@ -106,15 +99,11 @@ public class BucketNormalizableTests extends ESTestCase {
BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
List<Normalizable> children = bn.getChildren();
assertEquals(4, children.size());
assertEquals(2, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
assertTrue(children.get(2) instanceof PartitionScoreNormalizable);
assertEquals(0.2, children.get(2).getNormalizedScore(), EPSILON);
assertTrue(children.get(3) instanceof PartitionScoreNormalizable);
assertEquals(0.4, children.get(3).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenTypeBucketInfluencer() {
@ -132,7 +121,6 @@ public class BucketNormalizableTests extends ESTestCase {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 42.0));
assertEquals(95.0, bucket.getAnomalyScore(), EPSILON);
}
@ -141,7 +129,6 @@ public class BucketNormalizableTests extends ESTestCase {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 2.0));
assertEquals(88.0, bucket.getAnomalyScore(), EPSILON);
}

View File

@ -21,11 +21,10 @@ public class NormalizerBuilderTests extends ESTestCase {
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
String jobId = "unit-test-job";
List<String> command = new NormalizerBuilder(env, jobId, null, 300, true).build();
assertEquals(4, command.size());
List<String> command = new NormalizerBuilder(env, jobId, null, 300).build();
assertEquals(3, command.size());
assertTrue(command.contains("./normalize"));
assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "300"));
assertTrue(command.contains(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG));
assertTrue(command.contains(AutodetectBuilder.PER_PARTITION_NORMALIZATION));
}
}
}

View File

@ -49,7 +49,7 @@ public class NormalizerTests extends ESTestCase {
ExecutorService threadpool = Executors.newScheduledThreadPool(1);
try {
NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false),
when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN),
any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
@ -58,7 +58,7 @@ public class NormalizerTests extends ESTestCase {
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
List<Normalizable> asNormalizables = Arrays.asList(new BucketNormalizable(bucket, INDEX_NAME));
normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
normalizer.normalize(BUCKET_SPAN, asNormalizables, QUANTILES_STATE);
assertEquals(1, asNormalizables.size());
assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001);

View File

@ -33,7 +33,6 @@ import java.util.Deque;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf;
@ -95,7 +94,7 @@ public class ScoresUpdaterTests extends ESTestCase {
buckets.add(bucket);
givenProviderReturnsBuckets(buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(0);
verifyNothingWasUpdated();
@ -113,7 +112,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(buckets);
givenProviderReturnsRecords(new ArrayDeque<>());
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(1);
verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any());
@ -129,7 +128,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(buckets);
givenProviderReturnsRecords(new ArrayDeque<>());
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(1);
@ -150,7 +149,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(buckets);
givenProviderReturnsRecords(records);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(2);
verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any());
@ -176,7 +175,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(batch1, batch2);
givenProviderReturnsRecords(new ArrayDeque<>());
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(1);
@ -212,7 +211,7 @@ public class ScoresUpdaterTests extends ESTestCase {
recordIter.requireIncludeInterim(false);
when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(2);
}
@ -224,7 +223,7 @@ public class ScoresUpdaterTests extends ESTestCase {
influencers.add(influencer);
givenProviderReturnsInfluencers(influencers);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(1);
verify(jobRenormalizedResultsPersister, times(1)).updateResults(any());
@ -253,7 +252,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsRecords(records);
scoresUpdater.shutdown();
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
scoresUpdater.update(QUANTILES_STATE, 3600, 0);
verifyNormalizerWasInvoked(0);
verify(jobRenormalizedResultsPersister, never()).updateBucket(any());
@ -272,7 +271,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false);
scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(1);
@ -289,7 +288,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false);
scoresUpdater.update(QUANTILES_STATE, 90000000L, 0);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(1);
@ -307,7 +306,7 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsRecords(new ArrayDeque<>());
givenProviderReturnsNoInfluencers();
scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false);
scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(1);
@ -339,7 +338,7 @@ public class ScoresUpdaterTests extends ESTestCase {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
List<Normalizable> normalizables = (List<Normalizable>) invocationOnMock.getArguments()[2];
List<Normalizable> normalizables = (List<Normalizable>) invocationOnMock.getArguments()[1];
for (Normalizable normalizable : normalizables) {
normalizable.raiseBigChangeFlag();
for (Normalizable child : normalizable.getChildren()) {
@ -348,7 +347,7 @@ public class ScoresUpdaterTests extends ESTestCase {
}
return null;
}
}).when(normalizer).normalize(anyInt(), anyBoolean(), anyList(), anyString());
}).when(normalizer).normalize(anyInt(), anyList(), anyString());
}
private void givenProviderReturnsBuckets(Deque<Bucket> batch1, Deque<Bucket> batch2) {
@ -416,7 +415,7 @@ public class ScoresUpdaterTests extends ESTestCase {
private void verifyNormalizerWasInvoked(int times) throws IOException {
int bucketSpan = job.getAnalysisConfig() == null ? 0 : ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue();
verify(normalizer, times(times)).normalize(
eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
eq(bucketSpan), anyListOf(Normalizable.class),
eq(QUANTILES_STATE));
}

View File

@ -18,7 +18,6 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -43,10 +42,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
public void testNormalize() throws InterruptedException {
ExecutorService threadpool = Executors.newScheduledThreadPool(10);
try {
boolean isPerPartitionNormalization = randomBoolean();
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool,
isPerPartitionNormalization);
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool);
// Blast through many sets of quantiles in quick succession, faster than the normalizer can process them
for (int i = 1; i < TEST_SIZE / 2; ++i) {
@ -61,7 +57,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
renormalizer.waitUntilIdle();
ArgumentCaptor<String> stateCaptor = ArgumentCaptor.forClass(String.class);
verify(scoresUpdater, atLeastOnce()).update(stateCaptor.capture(), anyLong(), anyLong(), eq(isPerPartitionNormalization));
verify(scoresUpdater, atLeastOnce()).update(stateCaptor.capture(), anyLong(), anyLong());
List<String> quantilesUsed = stateCaptor.getAllValues();
assertFalse(quantilesUsed.isEmpty());
@ -91,7 +87,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
public void testIsEnabled_GivenNormalizationWindowIsZero() {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
when(scoresUpdater.getNormalizationWindow()).thenReturn(0L);
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean());
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null);
assertThat(renormalizer.isEnabled(), is(false));
}
@ -99,7 +95,7 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
public void testIsEnabled_GivenNormalizationWindowGreaterThanZero() {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
when(scoresUpdater.getNormalizationWindow()).thenReturn(1L);
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean());
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null);
assertThat(renormalizer.isEnabled(), is(true));
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecordTests;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.core.ml.job.results.PartitionScore;
import java.io.IOException;
import java.util.ArrayList;
@ -61,15 +60,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
if (randomBoolean()) {
bucket.setInterim(randomBoolean());
}
if (randomBoolean()) {
int size = randomInt(10);
List<PartitionScore> partitionScores = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partitionScores.add(new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomDouble(),
randomDouble(), randomDouble()));
}
bucket.setPartitionScores(partitionScores);
}
if (randomBoolean()) {
bucket.setProcessingTimeMs(randomLong());
}
@ -235,15 +225,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertFalse(bucket.isNormalizable());
}
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndPartitionsScoresAreNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123));
bucket.setAnomalyScore(0.0);
bucket.setPartitionScores(Collections.singletonList(new PartitionScore("n", "v", 50.0, 40.0, 0.01)));
assertTrue(bucket.isNormalizable());
}
public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123));
@ -260,35 +241,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertTrue(bucket.isNormalizable());
}
public void testPartitionAnomalyScore() {
List<PartitionScore> pScore = new ArrayList<>();
pScore.add(new PartitionScore("pf", "pv1", 11.0, 10.0, 0.1));
pScore.add(new PartitionScore("pf", "pv3", 51.0, 50.0, 0.1));
pScore.add(new PartitionScore("pf", "pv4", 61.0, 60.0, 0.1));
pScore.add(new PartitionScore("pf", "pv2", 41.0, 40.0, 0.1));
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setPartitionScores(pScore);
double initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv1");
assertEquals(11.0, initialAnomalyScore, 0.001);
double anomalyScore = bucket.partitionAnomalyScore("pv1");
assertEquals(10.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv2");
assertEquals(41.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv2");
assertEquals(40.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv3");
assertEquals(51.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv3");
assertEquals(50.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv4");
assertEquals(61.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv4");
assertEquals(60.0, anomalyScore, 0.001);
}
public void testId() {
public void testId() {
Bucket bucket = new Bucket("foo", new Date(123), 60L);
assertEquals("foo_bucket_123_60", bucket.getId());
}

View File

@ -1,54 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.results.PartitionScore;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
public class PartitionScoreTests extends AbstractSerializingTestCase<PartitionScore> {
@Override
protected PartitionScore createTestInstance() {
return new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomDouble(), randomDouble(),
randomDouble());
}
@Override
protected Reader<PartitionScore> instanceReader() {
return PartitionScore::new;
}
@Override
protected PartitionScore doParseInstance(XContentParser parser) {
return PartitionScore.STRICT_PARSER.apply(parser, null);
}
public void testStrictParser() throws IOException {
String json = "{\"partition_field_name\":\"field_1\", \"partition_field_value\":\"x\", \"initial_record_score\": 3," +
" \"record_score\": 3, \"probability\": 0.001, \"foo\":\"bar\"}";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> PartitionScore.STRICT_PARSER.apply(parser, null));
assertThat(e.getMessage(), containsString("unknown field [foo]"));
}
}
public void testLenientParser() throws IOException {
String json = "{\"partition_field_name\":\"field_1\", \"partition_field_value\":\"x\", \"initial_record_score\": 3," +
" \"record_score\": 3, \"probability\": 0.001, \"foo\":\"bar\"}";
try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) {
PartitionScore.LENIENT_PARSER.apply(parser, null);
}
}
}