[ML] Ensure immutability of MlMetadata (#31957)
The test failure in #31916 revealed that updating rules on a job was modifying the detectors list in-place. That meant the old cluster state and the updated cluster state had no difference and thus the change was not propagated to non-master nodes. This commit fixes that and also reviews all of ML metadata in order to ensure immutability. Closes #31916
This commit is contained in:
parent
e3707efe74
commit
2cfe703299
|
@ -156,14 +156,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
this.jobId = jobId;
|
||||
this.queryDelay = queryDelay;
|
||||
this.frequency = frequency;
|
||||
this.indices = indices;
|
||||
this.types = types;
|
||||
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
|
||||
this.types = types == null ? null : Collections.unmodifiableList(types);
|
||||
this.query = query;
|
||||
this.aggregations = aggregations;
|
||||
this.scriptFields = scriptFields;
|
||||
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
|
||||
this.scrollSize = scrollSize;
|
||||
this.chunkingConfig = chunkingConfig;
|
||||
this.headers = Objects.requireNonNull(headers);
|
||||
this.headers = Collections.unmodifiableMap(headers);
|
||||
}
|
||||
|
||||
public DatafeedConfig(StreamInput in) throws IOException {
|
||||
|
@ -172,19 +172,19 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
this.queryDelay = in.readOptionalTimeValue();
|
||||
this.frequency = in.readOptionalTimeValue();
|
||||
if (in.readBoolean()) {
|
||||
this.indices = in.readList(StreamInput::readString);
|
||||
this.indices = Collections.unmodifiableList(in.readList(StreamInput::readString));
|
||||
} else {
|
||||
this.indices = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.types = in.readList(StreamInput::readString);
|
||||
this.types = Collections.unmodifiableList(in.readList(StreamInput::readString));
|
||||
} else {
|
||||
this.types = null;
|
||||
}
|
||||
this.query = in.readNamedWriteable(QueryBuilder.class);
|
||||
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
|
||||
if (in.readBoolean()) {
|
||||
this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new);
|
||||
this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new));
|
||||
} else {
|
||||
this.scriptFields = null;
|
||||
}
|
||||
|
@ -195,7 +195,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
|||
}
|
||||
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
|
||||
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
|
||||
this.headers = in.readMap(StreamInput::readString, StreamInput::readString);
|
||||
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
|
||||
} else {
|
||||
this.headers = Collections.emptyMap();
|
||||
}
|
||||
|
|
|
@ -352,6 +352,18 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
boolean isNoop(DatafeedConfig datafeed) {
|
||||
return (frequency == null || Objects.equals(frequency, datafeed.getFrequency()))
|
||||
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
|
||||
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
|
||||
&& (types == null || Objects.equals(types, datafeed.getTypes()))
|
||||
&& (query == null || Objects.equals(query, datafeed.getQuery()))
|
||||
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
|
||||
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
|
||||
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
|
||||
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private String id;
|
||||
|
|
|
@ -144,20 +144,20 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
this.latency = latency;
|
||||
this.categorizationFieldName = categorizationFieldName;
|
||||
this.categorizationAnalyzerConfig = categorizationAnalyzerConfig;
|
||||
this.categorizationFilters = categorizationFilters;
|
||||
this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters);
|
||||
this.summaryCountFieldName = summaryCountFieldName;
|
||||
this.influencers = influencers;
|
||||
this.influencers = Collections.unmodifiableList(influencers);
|
||||
this.overlappingBuckets = overlappingBuckets;
|
||||
this.resultFinalizationWindow = resultFinalizationWindow;
|
||||
this.multivariateByFields = multivariateByFields;
|
||||
this.multipleBucketSpans = multipleBucketSpans;
|
||||
this.multipleBucketSpans = multipleBucketSpans == null ? null : Collections.unmodifiableList(multipleBucketSpans);
|
||||
this.usePerPartitionNormalization = usePerPartitionNormalization;
|
||||
}
|
||||
|
||||
public AnalysisConfig(StreamInput in) throws IOException {
|
||||
bucketSpan = in.readTimeValue();
|
||||
categorizationFieldName = in.readOptionalString();
|
||||
categorizationFilters = in.readBoolean() ? in.readList(StreamInput::readString) : null;
|
||||
categorizationFilters = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
|
||||
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
|
||||
categorizationAnalyzerConfig = in.readOptionalWriteable(CategorizationAnalyzerConfig::new);
|
||||
} else {
|
||||
|
@ -165,8 +165,8 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
}
|
||||
latency = in.readOptionalTimeValue();
|
||||
summaryCountFieldName = in.readOptionalString();
|
||||
detectors = in.readList(Detector::new);
|
||||
influencers = in.readList(StreamInput::readString);
|
||||
detectors = Collections.unmodifiableList(in.readList(Detector::new));
|
||||
influencers = Collections.unmodifiableList(in.readList(StreamInput::readString));
|
||||
overlappingBuckets = in.readOptionalBoolean();
|
||||
resultFinalizationWindow = in.readOptionalLong();
|
||||
multivariateByFields = in.readOptionalBoolean();
|
||||
|
@ -176,7 +176,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
for (int i = 0; i < arraySize; i++) {
|
||||
spans.add(in.readTimeValue());
|
||||
}
|
||||
multipleBucketSpans = spans;
|
||||
multipleBucketSpans = Collections.unmodifiableList(spans);
|
||||
} else {
|
||||
multipleBucketSpans = null;
|
||||
}
|
||||
|
@ -487,18 +487,20 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
public Builder(AnalysisConfig analysisConfig) {
|
||||
this.detectors = analysisConfig.detectors;
|
||||
this.detectors = new ArrayList<>(analysisConfig.detectors);
|
||||
this.bucketSpan = analysisConfig.bucketSpan;
|
||||
this.latency = analysisConfig.latency;
|
||||
this.categorizationFieldName = analysisConfig.categorizationFieldName;
|
||||
this.categorizationFilters = analysisConfig.categorizationFilters;
|
||||
this.categorizationFilters = analysisConfig.categorizationFilters == null ? null
|
||||
: new ArrayList<>(analysisConfig.categorizationFilters);
|
||||
this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig;
|
||||
this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
|
||||
this.influencers = analysisConfig.influencers;
|
||||
this.influencers = new ArrayList<>(analysisConfig.influencers);
|
||||
this.overlappingBuckets = analysisConfig.overlappingBuckets;
|
||||
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
|
||||
this.multivariateByFields = analysisConfig.multivariateByFields;
|
||||
this.multipleBucketSpans = analysisConfig.multipleBucketSpans;
|
||||
this.multipleBucketSpans = analysisConfig.multipleBucketSpans == null ? null
|
||||
: new ArrayList<>(analysisConfig.multipleBucketSpans);
|
||||
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
|
||||
}
|
||||
|
||||
|
@ -518,6 +520,10 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
this.detectors = sequentialIndexDetectors;
|
||||
}
|
||||
|
||||
public void setDetector(int detectorIndex, Detector detector) {
|
||||
detectors.set(detectorIndex, detector);
|
||||
}
|
||||
|
||||
public void setBucketSpan(TimeValue bucketSpan) {
|
||||
this.bucketSpan = bucketSpan;
|
||||
}
|
||||
|
@ -543,7 +549,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
public void setInfluencers(List<String> influencers) {
|
||||
this.influencers = influencers;
|
||||
this.influencers = ExceptionsHelper.requireNonNull(influencers, INFLUENCERS.getPreferredName());
|
||||
}
|
||||
|
||||
public void setOverlappingBuckets(Boolean overlappingBuckets) {
|
||||
|
|
|
@ -252,7 +252,7 @@ public class Detector implements ToXContentObject, Writeable {
|
|||
partitionFieldName = in.readOptionalString();
|
||||
useNull = in.readBoolean();
|
||||
excludeFrequent = in.readBoolean() ? ExcludeFrequent.readFromStream(in) : null;
|
||||
rules = in.readList(DetectionRule::new);
|
||||
rules = Collections.unmodifiableList(in.readList(DetectionRule::new));
|
||||
if (in.getVersion().onOrAfter(Version.V_5_5_0)) {
|
||||
detectorIndex = in.readInt();
|
||||
} else {
|
||||
|
@ -508,7 +508,7 @@ public class Detector implements ToXContentObject, Writeable {
|
|||
partitionFieldName = detector.partitionFieldName;
|
||||
useNull = detector.useNull;
|
||||
excludeFrequent = detector.excludeFrequent;
|
||||
rules = new ArrayList<>(detector.getRules());
|
||||
rules = new ArrayList<>(detector.rules);
|
||||
detectorIndex = detector.detectorIndex;
|
||||
}
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
this.jobId = jobId;
|
||||
this.jobType = jobType;
|
||||
this.jobVersion = jobVersion;
|
||||
this.groups = groups;
|
||||
this.groups = Collections.unmodifiableList(groups);
|
||||
this.description = description;
|
||||
this.createTime = createTime;
|
||||
this.finishedTime = finishedTime;
|
||||
|
@ -207,7 +207,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
this.backgroundPersistInterval = backgroundPersistInterval;
|
||||
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
|
||||
this.resultsRetentionDays = resultsRetentionDays;
|
||||
this.customSettings = customSettings;
|
||||
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
|
||||
this.modelSnapshotId = modelSnapshotId;
|
||||
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
|
||||
this.resultsIndexName = resultsIndexName;
|
||||
|
@ -223,7 +223,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
jobVersion = null;
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
groups = in.readList(StreamInput::readString);
|
||||
groups = Collections.unmodifiableList(in.readList(StreamInput::readString));
|
||||
} else {
|
||||
groups = Collections.emptyList();
|
||||
}
|
||||
|
@ -244,7 +244,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
backgroundPersistInterval = in.readOptionalTimeValue();
|
||||
modelSnapshotRetentionDays = in.readOptionalLong();
|
||||
resultsRetentionDays = in.readOptionalLong();
|
||||
customSettings = in.readMap();
|
||||
Map<String, Object> readCustomSettings = in.readMap();
|
||||
customSettings = readCustomSettings == null ? null : Collections.unmodifiableMap(readCustomSettings);
|
||||
modelSnapshotId = in.readOptionalString();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) && in.readBoolean()) {
|
||||
modelSnapshotMinVersion = Version.readVersion(in);
|
||||
|
@ -627,7 +628,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
&& Objects.equals(this.lastDataTime, that.lastDataTime)
|
||||
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
|
||||
&& Objects.equals(this.analysisConfig, that.analysisConfig)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits)
|
||||
&& Objects.equals(this.dataDescription, that.dataDescription)
|
||||
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
|
||||
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
|
||||
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
|
||||
|
@ -1055,6 +1057,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
return Objects.equals(this.id, that.id)
|
||||
&& Objects.equals(this.jobType, that.jobType)
|
||||
&& Objects.equals(this.jobVersion, that.jobVersion)
|
||||
&& Objects.equals(this.groups, that.groups)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.analysisConfig, that.analysisConfig)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits)
|
||||
|
@ -1077,7 +1080,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
|
||||
return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime,
|
||||
finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
|
||||
modelSnapshotMinVersion, resultsIndexName, deleted);
|
||||
|
|
|
@ -373,6 +373,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
*/
|
||||
public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
|
||||
Job.Builder builder = new Job.Builder(source);
|
||||
AnalysisConfig currentAnalysisConfig = source.getAnalysisConfig();
|
||||
AnalysisConfig.Builder newAnalysisConfig = new AnalysisConfig.Builder(currentAnalysisConfig);
|
||||
|
||||
if (groups != null) {
|
||||
builder.setGroups(groups);
|
||||
}
|
||||
|
@ -380,26 +383,23 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
builder.setDescription(description);
|
||||
}
|
||||
if (detectorUpdates != null && detectorUpdates.isEmpty() == false) {
|
||||
AnalysisConfig ac = source.getAnalysisConfig();
|
||||
int numDetectors = ac.getDetectors().size();
|
||||
int numDetectors = currentAnalysisConfig.getDetectors().size();
|
||||
for (DetectorUpdate dd : detectorUpdates) {
|
||||
if (dd.getDetectorIndex() >= numDetectors) {
|
||||
throw ExceptionsHelper.badRequestException("Supplied detector_index [{}] is >= the number of detectors [{}]",
|
||||
dd.getDetectorIndex(), numDetectors);
|
||||
}
|
||||
|
||||
Detector.Builder detectorbuilder = new Detector.Builder(ac.getDetectors().get(dd.getDetectorIndex()));
|
||||
Detector.Builder detectorBuilder = new Detector.Builder(currentAnalysisConfig.getDetectors().get(dd.getDetectorIndex()));
|
||||
if (dd.getDescription() != null) {
|
||||
detectorbuilder.setDetectorDescription(dd.getDescription());
|
||||
detectorBuilder.setDetectorDescription(dd.getDescription());
|
||||
}
|
||||
if (dd.getRules() != null) {
|
||||
detectorbuilder.setRules(dd.getRules());
|
||||
detectorBuilder.setRules(dd.getRules());
|
||||
}
|
||||
ac.getDetectors().set(dd.getDetectorIndex(), detectorbuilder.build());
|
||||
}
|
||||
|
||||
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(ac);
|
||||
builder.setAnalysisConfig(acBuilder);
|
||||
newAnalysisConfig.setDetector(dd.getDetectorIndex(), detectorBuilder.build());
|
||||
}
|
||||
}
|
||||
if (modelPlotConfig != null) {
|
||||
builder.setModelPlotConfig(modelPlotConfig);
|
||||
|
@ -422,9 +422,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
builder.setResultsRetentionDays(resultsRetentionDays);
|
||||
}
|
||||
if (categorizationFilters != null) {
|
||||
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(source.getAnalysisConfig());
|
||||
analysisConfigBuilder.setCategorizationFilters(categorizationFilters);
|
||||
builder.setAnalysisConfig(analysisConfigBuilder);
|
||||
newAnalysisConfig.setCategorizationFilters(categorizationFilters);
|
||||
}
|
||||
if (customSettings != null) {
|
||||
builder.setCustomSettings(customSettings);
|
||||
|
@ -446,9 +444,48 @@ public class JobUpdate implements Writeable, ToXContentObject {
|
|||
if (jobVersion != null) {
|
||||
builder.setJobVersion(jobVersion);
|
||||
}
|
||||
|
||||
builder.setAnalysisConfig(newAnalysisConfig);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
boolean isNoop(Job job) {
|
||||
return (groups == null || Objects.equals(groups, job.getGroups()))
|
||||
&& (description == null || Objects.equals(description, job.getDescription()))
|
||||
&& (modelPlotConfig == null || Objects.equals(modelPlotConfig, job.getModelPlotConfig()))
|
||||
&& (analysisLimits == null || Objects.equals(analysisLimits, job.getAnalysisLimits()))
|
||||
&& updatesDetectors(job) == false
|
||||
&& (renormalizationWindowDays == null || Objects.equals(renormalizationWindowDays, job.getRenormalizationWindowDays()))
|
||||
&& (backgroundPersistInterval == null || Objects.equals(backgroundPersistInterval, job.getBackgroundPersistInterval()))
|
||||
&& (modelSnapshotRetentionDays == null || Objects.equals(modelSnapshotRetentionDays, job.getModelSnapshotRetentionDays()))
|
||||
&& (resultsRetentionDays == null || Objects.equals(resultsRetentionDays, job.getResultsRetentionDays()))
|
||||
&& (categorizationFilters == null
|
||||
|| Objects.equals(categorizationFilters, job.getAnalysisConfig().getCategorizationFilters()))
|
||||
&& (customSettings == null || Objects.equals(customSettings, job.getCustomSettings()))
|
||||
&& (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId()))
|
||||
&& (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion()))
|
||||
&& (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory()))
|
||||
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()));
|
||||
}
|
||||
|
||||
boolean updatesDetectors(Job job) {
|
||||
AnalysisConfig analysisConfig = job.getAnalysisConfig();
|
||||
if (detectorUpdates == null) {
|
||||
return false;
|
||||
}
|
||||
for (DetectorUpdate detectorUpdate : detectorUpdates) {
|
||||
if (detectorUpdate.description == null && detectorUpdate.rules == null) {
|
||||
continue;
|
||||
}
|
||||
Detector detector = analysisConfig.getDetectors().get(detectorUpdate.detectorIndex);
|
||||
if (Objects.equals(detectorUpdate.description, detector.getDetectorDescription()) == false
|
||||
|| Objects.equals(detectorUpdate.rules, detector.getRules()) == false) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
|
|
|
@ -60,7 +60,7 @@ public class RuleScope implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
public RuleScope(Map<String, FilterRef> scope) {
|
||||
this.scope = Objects.requireNonNull(scope);
|
||||
this.scope = Collections.unmodifiableMap(scope);
|
||||
}
|
||||
|
||||
public RuleScope(StreamInput in) throws IOException {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.datafeed;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -24,6 +25,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|||
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -31,6 +33,7 @@ import java.util.List;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpdate> {
|
||||
|
||||
|
@ -40,8 +43,12 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
}
|
||||
|
||||
public static DatafeedUpdate createRandomized(String datafeedId) {
|
||||
return createRandomized(datafeedId, null);
|
||||
}
|
||||
|
||||
public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed) {
|
||||
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId);
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean() && datafeed == null) {
|
||||
builder.setJobId(randomAlphaOfLength(10));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
|
@ -68,7 +75,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
}
|
||||
builder.setScriptFields(scriptFields);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean() && datafeed == null) {
|
||||
// can only test with a single agg as the xcontent order gets randomized by test base class and then
|
||||
// the actual xcontent isn't the same and test fail.
|
||||
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
|
||||
|
@ -184,6 +191,25 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
|||
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
|
||||
}
|
||||
|
||||
public void testApply_GivenRandomUpdates_AssertImmutability() {
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId());
|
||||
if (datafeed.getAggregations() != null) {
|
||||
DatafeedConfig.Builder withoutAggs = new DatafeedConfig.Builder(datafeed);
|
||||
withoutAggs.setAggregations(null);
|
||||
datafeed = withoutAggs.build();
|
||||
}
|
||||
DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed);
|
||||
while (update.isNoop(datafeed)) {
|
||||
update = createRandomized(datafeed.getId(), datafeed);
|
||||
}
|
||||
|
||||
DatafeedConfig updatedDatafeed = update.apply(datafeed, Collections.emptyMap());
|
||||
|
||||
assertThat(datafeed, not(equalTo(updatedDatafeed)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) {
|
||||
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.job.config;
|
|||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -18,10 +19,13 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
||||
|
@ -30,7 +34,15 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
|
||||
@Override
|
||||
protected JobUpdate createTestInstance() {
|
||||
JobUpdate.Builder update = new JobUpdate.Builder(randomAlphaOfLength(4));
|
||||
return createRandom(randomAlphaOfLength(4), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a completely random update when the job is null
|
||||
* or a random update that is is valid for the given job
|
||||
*/
|
||||
public JobUpdate createRandom(String jobId, @Nullable Job job) {
|
||||
JobUpdate.Builder update = new JobUpdate.Builder(jobId);
|
||||
if (randomBoolean()) {
|
||||
int groupsNum = randomIntBetween(0, 10);
|
||||
List<String> groups = new ArrayList<>(groupsNum);
|
||||
|
@ -43,28 +55,16 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
update.setDescription(randomAlphaOfLength(20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomInt(10);
|
||||
List<JobUpdate.DetectorUpdate> detectorUpdates = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
String detectorDescription = null;
|
||||
if (randomBoolean()) {
|
||||
detectorDescription = randomAlphaOfLength(12);
|
||||
}
|
||||
List<DetectionRule> detectionRules = null;
|
||||
if (randomBoolean()) {
|
||||
detectionRules = new ArrayList<>();
|
||||
detectionRules.add(new DetectionRule.Builder(
|
||||
Collections.singletonList(new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, 5))).build());
|
||||
}
|
||||
detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules));
|
||||
}
|
||||
List<JobUpdate.DetectorUpdate> detectorUpdates = job == null ? createRandomDetectorUpdates()
|
||||
: createRandomDetectorUpdatesForJob(job);
|
||||
update.setDetectorUpdates(detectorUpdates);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
update.setModelPlotConfig(new ModelPlotConfig(randomBoolean(), randomAlphaOfLength(10)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
update.setAnalysisLimits(AnalysisLimitsTests.createRandomized());
|
||||
update.setAnalysisLimits(AnalysisLimits.validateAndSetDefaults(AnalysisLimitsTests.createRandomized(), null,
|
||||
AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
update.setRenormalizationWindowDays(randomNonNegativeLong());
|
||||
|
@ -78,7 +78,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
if (randomBoolean()) {
|
||||
update.setResultsRetentionDays(randomNonNegativeLong());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean() && jobSupportsCategorizationFilters(job)) {
|
||||
update.setCategorizationFilters(Arrays.asList(generateRandomStringArray(10, 10, false)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
|
@ -100,6 +100,77 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
return update.build();
|
||||
}
|
||||
|
||||
private static boolean jobSupportsCategorizationFilters(@Nullable Job job) {
|
||||
if (job == null) {
|
||||
return true;
|
||||
}
|
||||
if (job.getAnalysisConfig().getCategorizationFieldName() == null) {
|
||||
return false;
|
||||
}
|
||||
if (job.getAnalysisConfig().getCategorizationAnalyzerConfig() != null) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static List<JobUpdate.DetectorUpdate> createRandomDetectorUpdates() {
|
||||
int size = randomInt(10);
|
||||
List<JobUpdate.DetectorUpdate> detectorUpdates = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
String detectorDescription = null;
|
||||
if (randomBoolean()) {
|
||||
detectorDescription = randomAlphaOfLength(12);
|
||||
}
|
||||
List<DetectionRule> detectionRules = null;
|
||||
if (randomBoolean()) {
|
||||
detectionRules = new ArrayList<>();
|
||||
detectionRules.add(new DetectionRule.Builder(
|
||||
Collections.singletonList(new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, 5))).build());
|
||||
}
|
||||
detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules));
|
||||
}
|
||||
return detectorUpdates;
|
||||
}
|
||||
|
||||
private static List<JobUpdate.DetectorUpdate> createRandomDetectorUpdatesForJob(Job job) {
|
||||
AnalysisConfig analysisConfig = job.getAnalysisConfig();
|
||||
int size = randomInt(analysisConfig.getDetectors().size());
|
||||
List<JobUpdate.DetectorUpdate> detectorUpdates = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
String detectorDescription = null;
|
||||
if (randomBoolean()) {
|
||||
detectorDescription = randomAlphaOfLength(12);
|
||||
}
|
||||
int rulesSize = randomBoolean() ? randomIntBetween(1, 5) : 0;
|
||||
List<DetectionRule> detectionRules = rulesSize == 0 ? null : new ArrayList<>(rulesSize);
|
||||
for (int ruleIndex = 0; ruleIndex < rulesSize; ++ruleIndex) {
|
||||
int detectorIndex = randomInt(analysisConfig.getDetectors().size() - 1);
|
||||
Detector detector = analysisConfig.getDetectors().get(detectorIndex);
|
||||
List<String> analysisFields = detector.extractAnalysisFields();
|
||||
if (randomBoolean() || analysisFields.isEmpty()) {
|
||||
detectionRules.add(new DetectionRule.Builder(Collections.singletonList(new RuleCondition(
|
||||
randomFrom(RuleCondition.AppliesTo.values()), randomFrom(Operator.values()), randomDouble()))).build());
|
||||
} else {
|
||||
RuleScope.Builder ruleScope = RuleScope.builder();
|
||||
int scopeSize = randomIntBetween(1, analysisFields.size());
|
||||
Set<String> analysisFieldsPickPot = new HashSet<>(analysisFields);
|
||||
for (int scopeIndex = 0; scopeIndex < scopeSize; ++scopeIndex) {
|
||||
String scopedField = randomFrom(analysisFieldsPickPot);
|
||||
analysisFieldsPickPot.remove(scopedField);
|
||||
if (randomBoolean()) {
|
||||
ruleScope.include(scopedField, MlFilterTests.randomValidFilterId());
|
||||
} else {
|
||||
ruleScope.exclude(scopedField, MlFilterTests.randomValidFilterId());
|
||||
}
|
||||
}
|
||||
detectionRules.add(new DetectionRule.Builder(ruleScope).build());
|
||||
}
|
||||
}
|
||||
detectorUpdates.add(new JobUpdate.DetectorUpdate(i, detectorDescription, detectionRules));
|
||||
}
|
||||
return detectorUpdates;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<JobUpdate> instanceReader() {
|
||||
return JobUpdate::new;
|
||||
|
@ -156,8 +227,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
jobBuilder.setAnalysisConfig(ac);
|
||||
jobBuilder.setDataDescription(new DataDescription.Builder());
|
||||
jobBuilder.setCreateTime(new Date());
|
||||
Job job = jobBuilder.build();
|
||||
|
||||
Job updatedJob = update.mergeWithJob(jobBuilder.build(), new ByteSizeValue(0L));
|
||||
Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L));
|
||||
|
||||
assertEquals(update.getGroups(), updatedJob.getGroups());
|
||||
assertEquals(update.getDescription(), updatedJob.getDescription());
|
||||
|
@ -172,12 +244,26 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
|
|||
assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
|
||||
assertEquals(update.getJobVersion(), updatedJob.getJobVersion());
|
||||
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
|
||||
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
|
||||
assertEquals(detectorUpdate.getDescription(),
|
||||
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
|
||||
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
|
||||
assertEquals(detectorUpdate.getRules(),
|
||||
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getRules());
|
||||
Detector updatedDetector = updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex());
|
||||
assertNotNull(updatedDetector);
|
||||
assertEquals(detectorUpdate.getDescription(), updatedDetector.getDetectorDescription());
|
||||
assertEquals(detectorUpdate.getRules(), updatedDetector.getRules());
|
||||
}
|
||||
|
||||
assertThat(job, not(equalTo(updatedJob)));
|
||||
}
|
||||
|
||||
public void testMergeWithJob_GivenRandomUpdates_AssertImmutability() {
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
Job job = JobTests.createRandomizedJob();
|
||||
JobUpdate update = createRandom(job.getId(), job);
|
||||
while (update.isNoop(job)) {
|
||||
update = createRandom(job.getId(), job);
|
||||
}
|
||||
|
||||
Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L));
|
||||
|
||||
assertThat(job, not(equalTo(updatedJob)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
|
|||
cleanUp();
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31916")
|
||||
public void testCondition() throws Exception {
|
||||
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
|
||||
new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.LT, 100.0)
|
||||
|
|
Loading…
Reference in New Issue