[7.x][ML] Fix online updates with custom rules referencing filters (#63057) (#63064)

When an opened anomaly detection job is updated with a detection
rule that references a filter, apart from updating the c++ process
with the rule, we also need to update it with the referenced filter.

This commit fixes a bug which led to the job not applying such updates
on-the-fly.

Fixes #62948

Backport of #63057
This commit is contained in:
Dimitris Athanasiou 2020-09-30 16:01:06 +03:00 committed by GitHub
parent a6b903b783
commit e09074d382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 21 deletions

View File

@ -44,7 +44,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -225,8 +224,8 @@ public class AutodetectCommunicator implements Closeable {
}
// Filters have to be written before detectors
if (update.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
if (update.getFilters() != null) {
autodetectProcess.writeUpdateFiltersMessage(update.getFilters());
}
// Add detector rules

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import joptsimple.internal.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
@ -32,6 +33,7 @@ import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
@ -78,9 +80,11 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -329,10 +333,10 @@ public class AutodetectProcessManager implements ClusterStateListener {
}, handler
);
// Step 2. Set the filter on the message and get scheduled events
ActionListener<MlFilter> filterListener = ActionListener.wrap(
filter -> {
updateProcessMessage.setFilter(filter);
// Step 2. Set the filters on the message and get scheduled events
ActionListener<List<MlFilter>> filtersListener = ActionListener.wrap(
filters -> {
updateProcessMessage.setFilters(filters);
if (updateParams.isUpdateScheduledEvents()) {
jobManager.getJob(jobTask.getJobId(), new ActionListener<Job>() {
@ -356,13 +360,17 @@ public class AutodetectProcessManager implements ClusterStateListener {
}, handler
);
// Step 1. Get the filter
if (updateParams.getFilter() == null) {
filterListener.onResponse(null);
// All referenced filters must also be updated
Set<String> filterIds = updateParams.extractReferencedFilters();
// Step 1. Get the filters
if (filterIds.isEmpty()) {
filtersListener.onResponse(null);
} else {
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(updateParams.getFilter().getId());
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(Strings.join(filterIds, ","));
getFilterRequest.setPageParams(new PageParams(0, filterIds.size()));
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, ActionListener.wrap(
getFilterResponse -> filterListener.onResponse(getFilterResponse.getFilters().results().get(0)),
getFilterResponse -> filtersListener.onResponse(getFilterResponse.getFilters().results()),
handler
));
}

View File

@ -11,8 +11,10 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public final class UpdateParams {
@ -72,6 +74,23 @@ public final class UpdateParams {
return updateScheduledEvents;
}
/**
* Returns all filters referenced by this update
* @return all referenced filters
*/
public Set<String> extractReferencedFilters() {
Set<String> filterIds = new HashSet<>();
if (filter != null) {
filterIds.add(filter.getId());
}
if (detectorUpdates != null) {
detectorUpdates.forEach(
detectorUpdate -> detectorUpdate.getRules().forEach(
rule -> filterIds.addAll(rule.extractReferencedFilters())));
}
return filterIds;
}
public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) {
return new Builder(jobUpdate.getJobId())
.modelPlotConfig(jobUpdate.getModelPlotConfig())

View File

@ -19,17 +19,17 @@ public final class UpdateProcessMessage {
@Nullable private final ModelPlotConfig modelPlotConfig;
@Nullable private final PerPartitionCategorizationConfig perPartitionCategorizationConfig;
@Nullable private final List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private final MlFilter filter;
@Nullable private final List<MlFilter> filters;
@Nullable private final List<ScheduledEvent> scheduledEvents;
private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig,
@Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig,
@Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
@Nullable MlFilter filter, List<ScheduledEvent> scheduledEvents) {
@Nullable List<MlFilter> filters, List<ScheduledEvent> scheduledEvents) {
this.modelPlotConfig = modelPlotConfig;
this.perPartitionCategorizationConfig = perPartitionCategorizationConfig;
this.detectorUpdates = detectorUpdates;
this.filter = filter;
this.filters = filters;
this.scheduledEvents = scheduledEvents;
}
@ -49,8 +49,8 @@ public final class UpdateProcessMessage {
}
@Nullable
public MlFilter getFilter() {
return filter;
public List<MlFilter> getFilters() {
return filters;
}
@Nullable
@ -63,7 +63,7 @@ public final class UpdateProcessMessage {
@Nullable private ModelPlotConfig modelPlotConfig;
@Nullable private PerPartitionCategorizationConfig perPartitionCategorizationConfig;
@Nullable private List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private MlFilter filter;
@Nullable private List<MlFilter> filters;
@Nullable private List<ScheduledEvent> scheduledEvents;
public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) {
@ -81,8 +81,8 @@ public final class UpdateProcessMessage {
return this;
}
public Builder setFilter(MlFilter filter) {
this.filter = filter;
public Builder setFilters(List<MlFilter> filters) {
this.filters = filters;
return this;
}
@ -92,7 +92,7 @@ public final class UpdateProcessMessage {
}
public UpdateProcessMessage build() {
return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filter, scheduledEvents);
return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filters, scheduledEvents);
}
}
}

View File

@ -8,14 +8,19 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.containsInAnyOrder;
public class UpdateParamsTests extends ESTestCase {
@ -44,4 +49,22 @@ public class UpdateParamsTests extends ESTestCase {
assertTrue(params.isJobUpdate());
}
public void testExtractReferencedFilters() {
JobUpdate.DetectorUpdate detectorUpdate1 = new JobUpdate.DetectorUpdate(0, "",
Arrays.asList(
new DetectionRule.Builder(RuleScope.builder().include("a", "filter_1")).build(),
new DetectionRule.Builder(RuleScope.builder().include("b", "filter_2")).build()
)
);
JobUpdate.DetectorUpdate detectorUpdate2 = new JobUpdate.DetectorUpdate(0, "",
Collections.singletonList(new DetectionRule.Builder(RuleScope.builder().include("c", "filter_3")).build())
);
UpdateParams updateParams = new UpdateParams.Builder("test_job")
.detectorUpdates(Arrays.asList(detectorUpdate1, detectorUpdate2))
.filter(MlFilter.builder("filter_4").build())
.build();
assertThat(updateParams.extractReferencedFilters(), containsInAnyOrder("filter_1", "filter_2", "filter_3", "filter_4"));
}
}