[7.x][ML] Audit updates on data frame analytics jobs (#60126) (#60287)

Closes #59652

Backport of #60126
This commit is contained in:
Dimitris Athanasiou 2020-07-28 16:33:35 +03:00 committed by GitHub
parent 4eee6d274d
commit ed7dcff7c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 4 deletions

View File

@ -18,6 +18,8 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE;
@ -159,6 +161,23 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|| (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false);
}
public Set<String> getUpdatedFields() {
Set<String> updatedFields = new TreeSet<>();
if (description != null) {
updatedFields.add(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName());
}
if (modelMemoryLimit != null) {
updatedFields.add(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName());
}
if (allowLazyStart != null) {
updatedFields.add(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName());
}
if (maxNumThreads != null) {
updatedFields.add(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName());
}
return updatedFields;
}
@Override
public boolean equals(Object other) {
if (this == other) {

View File

@ -58,6 +58,7 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATED = "Created analytics with analysis type [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED = "Updated analytics settings: {0}";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";

View File

@ -16,6 +16,7 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -197,6 +198,49 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
}
public void testGetUpdatedFields_GivenAll() {
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
.setDescription("new description")
.setModelMemoryLimit(new ByteSizeValue(1024))
.setAllowLazyStart(true)
.setMaxNumThreads(8)
.build();
assertThat(update.getUpdatedFields(), contains("allow_lazy_start", "description", "max_num_threads", "model_memory_limit"));
}
public void testGetUpdatedFields_GivenAllowLazyStart() {
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
.setAllowLazyStart(false)
.build();
assertThat(update.getUpdatedFields(), contains("allow_lazy_start"));
}
public void testGetUpdatedFields_GivenDescription() {
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
.setDescription("new description")
.build();
assertThat(update.getUpdatedFields(), contains("description"));
}
public void testGetUpdatedFields_GivenMaxNumThreads() {
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
.setMaxNumThreads(3)
.build();
assertThat(update.getUpdatedFields(), contains("max_num_threads"));
}
public void testGetUpdatedFields_GivenModelMemoryLimit() {
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder("test_job")
.setModelMemoryLimit(new ByteSizeValue(1024))
.build();
assertThat(update.getUpdatedFields(), contains("model_memory_limit"));
}
private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) {
return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription()))
&& (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit()))

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.junit.Before;
import java.util.ArrayList;
@ -51,7 +52,8 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
@Before
public void createComponents() throws Exception {
configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry());
configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry(),
new DataFrameAnalyticsAuditor(client(), node().getNodeEnvironment().nodeId()));
waitForMlTemplates();
}

View File

@ -707,7 +707,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
MemoryUsageEstimationProcessManager memoryEstimationProcessManager =
new MemoryUsageEstimationProcessManager(
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry,
dataFrameAnalyticsAuditor);
assert client instanceof NodeClient;
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client,
dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor, indexNameExpressionResolver);

View File

@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.io.IOException;
import java.io.InputStream;
@ -71,10 +72,12 @@ public class DataFrameAnalyticsConfigProvider {
private final Client client;
private final NamedXContentRegistry xContentRegistry;
private final DataFrameAnalyticsAuditor auditor;
public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
public DataFrameAnalyticsConfigProvider(Client client, NamedXContentRegistry xContentRegistry, DataFrameAnalyticsAuditor auditor) {
this.client = Objects.requireNonNull(client);
this.xContentRegistry = xContentRegistry;
this.auditor = Objects.requireNonNull(auditor);
}
/**
@ -98,6 +101,7 @@ public class DataFrameAnalyticsConfigProvider {
ClusterState clusterState,
ActionListener<DataFrameAnalyticsConfig> listener) {
String id = update.getId();
GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(id));
executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(
getResponse -> {
@ -133,7 +137,13 @@ public class DataFrameAnalyticsConfigProvider {
DataFrameAnalyticsConfig updatedConfig = updatedConfigBuilder.build();
// Index the update config
index(updatedConfig, getResponse, listener);
index(updatedConfig, getResponse, ActionListener.wrap(
indexedConfig -> {
auditor.info(id, Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED, update.getUpdatedFields()));
listener.onResponse(indexedConfig);
},
listener::onFailure
));
},
listener::onFailure
));