diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 4cad2a8468f..8851a2a965f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -44,7 +44,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; -import java.util.Collections; import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -225,8 +224,8 @@ public class AutodetectCommunicator implements Closeable { } // Filters have to be written before detectors - if (update.getFilter() != null) { - autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter())); + if (update.getFilters() != null) { + autodetectProcess.writeUpdateFiltersMessage(update.getFilters()); } // Add detector rules diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index cd8ba103b90..2016d32900f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import joptsimple.internal.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -32,6 +33,7 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; @@ -78,9 +80,11 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Date; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -329,10 +333,10 @@ public class AutodetectProcessManager implements ClusterStateListener { }, handler ); - // Step 2. Set the filter on the message and get scheduled events - ActionListener filterListener = ActionListener.wrap( - filter -> { - updateProcessMessage.setFilter(filter); + // Step 2. Set the filters on the message and get scheduled events + ActionListener> filtersListener = ActionListener.wrap( + filters -> { + updateProcessMessage.setFilters(filters); if (updateParams.isUpdateScheduledEvents()) { jobManager.getJob(jobTask.getJobId(), new ActionListener() { @@ -356,13 +360,17 @@ public class AutodetectProcessManager implements ClusterStateListener { }, handler ); - // Step 1. Get the filter - if (updateParams.getFilter() == null) { - filterListener.onResponse(null); + // All referenced filters must also be updated + Set filterIds = updateParams.extractReferencedFilters(); + + // Step 1. Get the filters + if (filterIds.isEmpty()) { + filtersListener.onResponse(null); } else { - GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(updateParams.getFilter().getId()); + GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(Strings.join(filterIds, ",")); + getFilterRequest.setPageParams(new PageParams(0, filterIds.size())); executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, ActionListener.wrap( - getFilterResponse -> filterListener.onResponse(getFilterResponse.getFilters().results().get(0)), + getFilterResponse -> filtersListener.onResponse(getFilterResponse.getFilters().results()), handler )); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java index 20a2ddd6831..b20541fd2cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java @@ -11,8 +11,10 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; public final class UpdateParams { @@ -72,6 +74,23 @@ public final class UpdateParams { return updateScheduledEvents; } + /** + * Returns all filters referenced by this update + * @return all referenced filters + */ + public Set extractReferencedFilters() { + Set filterIds = new HashSet<>(); + if (filter != null) { + filterIds.add(filter.getId()); + } + if (detectorUpdates != null) { + detectorUpdates.forEach( + detectorUpdate -> detectorUpdate.getRules().forEach( + rule -> filterIds.addAll(rule.extractReferencedFilters()))); + } + return filterIds; + } + public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) { return new Builder(jobUpdate.getJobId()) .modelPlotConfig(jobUpdate.getModelPlotConfig()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java index a0f9662e3a5..1b34b73a1fb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java @@ -19,17 +19,17 @@ public final class UpdateProcessMessage { @Nullable private final ModelPlotConfig modelPlotConfig; @Nullable private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; @Nullable private final List detectorUpdates; - @Nullable private final MlFilter filter; + @Nullable private final List filters; @Nullable private final List scheduledEvents; private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, @Nullable List detectorUpdates, - @Nullable MlFilter filter, List scheduledEvents) { + @Nullable List filters, List scheduledEvents) { this.modelPlotConfig = modelPlotConfig; this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.detectorUpdates = detectorUpdates; - this.filter = filter; + this.filters = filters; this.scheduledEvents = scheduledEvents; } @@ -49,8 +49,8 @@ public final class UpdateProcessMessage { } @Nullable - public MlFilter getFilter() { - return filter; + public List getFilters() { + return filters; } @Nullable @@ -63,7 +63,7 @@ public final class UpdateProcessMessage { @Nullable private ModelPlotConfig modelPlotConfig; @Nullable private PerPartitionCategorizationConfig perPartitionCategorizationConfig; @Nullable private List detectorUpdates; - @Nullable private MlFilter filter; + @Nullable private List filters; @Nullable private List scheduledEvents; public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) { @@ -81,8 +81,8 @@ public final class UpdateProcessMessage { return this; } - public Builder setFilter(MlFilter filter) { - this.filter = filter; + public Builder setFilters(List filters) { + this.filters = filters; return this; } @@ -92,7 +92,7 @@ public final class UpdateProcessMessage { } public UpdateProcessMessage build() { - return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filter, scheduledEvents); + return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filters, scheduledEvents); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java index a406136b8f6..e519ead5e7e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java @@ -8,14 +8,19 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; +import org.elasticsearch.xpack.core.ml.job.config.RuleScope; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.containsInAnyOrder; + public class UpdateParamsTests extends ESTestCase { @@ -44,4 +49,22 @@ public class UpdateParamsTests extends ESTestCase { assertTrue(params.isJobUpdate()); } + public void testExtractReferencedFilters() { + JobUpdate.DetectorUpdate detectorUpdate1 = new JobUpdate.DetectorUpdate(0, "", + Arrays.asList( + new DetectionRule.Builder(RuleScope.builder().include("a", "filter_1")).build(), + new DetectionRule.Builder(RuleScope.builder().include("b", "filter_2")).build() + ) + ); + JobUpdate.DetectorUpdate detectorUpdate2 = new JobUpdate.DetectorUpdate(0, "", + Collections.singletonList(new DetectionRule.Builder(RuleScope.builder().include("c", "filter_3")).build()) + ); + + UpdateParams updateParams = new UpdateParams.Builder("test_job") + .detectorUpdates(Arrays.asList(detectorUpdate1, detectorUpdate2)) + .filter(MlFilter.builder("filter_4").build()) + .build(); + + assertThat(updateParams.extractReferencedFilters(), containsInAnyOrder("filter_1", "filter_2", "filter_3", "filter_4")); + } }