From ed7dcff7c49592363f6e6d024bde0ad18d3cbc3e Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 28 Jul 2020 16:33:35 +0300 Subject: [PATCH] [7.x][ML] Audit updates on data frame analytics jobs (#60126) (#60287) Closes #59652 Backport of #60126 --- .../DataFrameAnalyticsConfigUpdate.java | 19 ++++++++ .../xpack/core/ml/job/messages/Messages.java | 1 + .../DataFrameAnalyticsConfigUpdateTests.java | 44 +++++++++++++++++++ .../DataFrameAnalyticsConfigProviderIT.java | 4 +- .../xpack/ml/MachineLearning.java | 3 +- .../DataFrameAnalyticsConfigProvider.java | 14 +++++- 6 files changed, 81 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java index c6206edf767..5eeeb8628ac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -18,6 +18,8 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE; @@ -159,6 +161,23 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje || (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false); } + public Set getUpdatedFields() { + Set updatedFields = new TreeSet<>(); + if (description != null) { + updatedFields.add(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName()); + } + if (modelMemoryLimit != null) { + updatedFields.add(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName()); + } + if (allowLazyStart != null) { + updatedFields.add(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName()); + } + if (maxNumThreads != null) { + updatedFields.add(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName()); + } + return updatedFields; + } + @Override public boolean equals(Object other) { if (this == other) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 8cbb18e5c06..e3369e461d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -58,6 +58,7 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected"; public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATED = "Created analytics with analysis type [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED = "Updated analytics settings: {0}"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics"; public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java index 6f974fbdc8f..277a7cc450d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.util.Objects; import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -197,6 +198,49 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); } + public void testGetUpdatedFields_GivenAll() { + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job") + .setDescription("new description") + .setModelMemoryLimit(new ByteSizeValue(1024)) + .setAllowLazyStart(true) + .setMaxNumThreads(8) + .build(); + + assertThat(update.getUpdatedFields(), contains("allow_lazy_start", "description", "max_num_threads", "model_memory_limit")); + } + + public void testGetUpdatedFields_GivenAllowLazyStart() { + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job") + .setAllowLazyStart(false) + .build(); + + assertThat(update.getUpdatedFields(), contains("allow_lazy_start")); + } + + public void testGetUpdatedFields_GivenDescription() { + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job") + .setDescription("new description") + .build(); + + assertThat(update.getUpdatedFields(), contains("description")); + } + + public void testGetUpdatedFields_GivenMaxNumThreads() { + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job") + .setMaxNumThreads(3) + .build(); + + assertThat(update.getUpdatedFields(), contains("max_num_threads")); + } + + public void testGetUpdatedFields_GivenModelMemoryLimit() { + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job") + .setModelMemoryLimit(new ByteSizeValue(1024)) + .build(); + + assertThat(update.getUpdatedFields(), contains("model_memory_limit")); + } + private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) { return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription())) && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit())) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java index 1b4b3dc8fa1..0721d56290d 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.junit.Before; import java.util.ArrayList; @@ -51,7 +52,8 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase { @Before public void createComponents() throws Exception { - configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry()); + configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry(), + new DataFrameAnalyticsAuditor(client(), node().getNodeEnvironment().nodeId())); waitForMlTemplates(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index bf0a987a3de..8a320749fc8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -707,7 +707,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, MemoryUsageEstimationProcessManager memoryEstimationProcessManager = new MemoryUsageEstimationProcessManager( threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); - DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry); + DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry, + dataFrameAnalyticsAuditor); assert client instanceof NodeClient; DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client, dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor, indexNameExpressionResolver); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java index 7e4f3e19f40..0d8f50f17c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java @@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import java.io.IOException; import java.io.InputStream; @@ -71,10 +72,12 @@ public class DataFrameAnalyticsConfigProvider { private final Client client; private final NamedXContentRegistry xContentRegistry; + private final DataFrameAnalyticsAuditor auditor; - public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry) { + public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry, DataFrameAnalyticsAuditor auditor) { this.client = Objects.requireNonNull(client); this.xContentRegistry = xContentRegistry; + this.auditor = Objects.requireNonNull(auditor); } /** @@ -98,6 +101,7 @@ public class DataFrameAnalyticsConfigProvider { ClusterState clusterState, ActionListener listener) { String id = update.getId(); + GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(id)); executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap( getResponse -> { @@ -133,7 +137,13 @@ public class DataFrameAnalyticsConfigProvider { DataFrameAnalyticsConfig updatedConfig = updatedConfigBuilder.build(); // Index the update config - index(updatedConfig, getResponse, listener); + index(updatedConfig, getResponse, ActionListener.wrap( + indexedConfig -> { + auditor.info(id, Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED, update.getUpdatedFields())); + listener.onResponse(indexedConfig); + }, + listener::onFailure + )); }, listener::onFailure ));