[7.x] Add audit messages for Data Frame Analytics (#46521) (#46738)

This commit is contained in:
Przemysław Witek 2019-09-16 21:21:38 +02:00 committed by GitHub
parent a9eb538e93
commit e49be611ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 226 additions and 33 deletions

View File

@ -167,20 +167,20 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public class MachineLearningIT extends ESRestHighLevelClientTestCase {

View File

@ -1304,7 +1304,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
/**
* Returns <code>true</code> iff the given index exists otherwise <code>false</code>
*/
protected boolean indexExists(String index) {
protected static boolean indexExists(String index) {
IndicesExistsResponse actionGet = client().admin().indices().prepareExists(index).execute().actionGet();
return actionGet.isExists();
}

View File

@ -56,6 +56,17 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATED = "Created analytics with analysis type [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis";
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";

View File

@ -5,11 +5,18 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@ -23,15 +30,19 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
/**
* Base class of ML integration tests that use a native data_frame_analytics process
@ -151,4 +162,43 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
configBuilder.setAnalysis(regression);
return configBuilder.build();
}
/**
* Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass:
* 1. the number of fetched messages must equal the number of provided prefixes
* AND
* 2. each fetched message must start with the corresponding prefix
*/
protected static void assertThatAuditMessagesMatch(String configId, String... expectedAuditMessagePrefixes) throws Exception {
// Make sure we wrote to the audit
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short analytics job), all without the audit being fully written.
assertBusy(() -> assertTrue(indexExists(AuditorField.NOTIFICATIONS_INDEX)));
assertBusy(() -> {
String[] actualAuditMessages = fetchAllAuditMessages(configId);
assertThat(actualAuditMessages.length, equalTo(expectedAuditMessagePrefixes.length));
for (int i = 0; i < actualAuditMessages.length; i++) {
assertThat(actualAuditMessages[i], startsWith(expectedAuditMessagePrefixes[i]));
}
});
}
@SuppressWarnings("unchecked")
private static String[] fetchAllAuditMessages(String dataFrameAnalyticsId) throws Exception {
RefreshRequest refreshRequest = new RefreshRequest(AuditorField.NOTIFICATIONS_INDEX);
RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201)));
SearchRequest searchRequest = new SearchRequestBuilder(client(), SearchAction.INSTANCE)
.setIndices(AuditorField.NOTIFICATIONS_INDEX)
.addSort("timestamp", SortOrder.ASC)
.setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
.request();
SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
return Arrays.stream(searchResponse.getHits().getHits())
.map(hit -> (String) hit.getSourceAsMap().get("message"))
.toArray(String[]::new);
}
}

View File

@ -102,6 +102,13 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]",
"Finished reindexing to destination index [regression_single_numeric_feature_and_mixed_data_set_source_index_results]",
"Finished analysis");
}
public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception {
@ -161,6 +168,13 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]",
"Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_hundred_source_index_results]",
"Finished analysis");
}
public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception {
@ -230,5 +244,12 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]",
"Finished reindexing to destination index [regression_only_training_data_and_training_percent_is_fifty_source_index_results]",
"Finished analysis");
}
}

View File

@ -115,6 +115,13 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertProgress(id, 100, 100, 100, 100);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [test-outlier-detection-with-few-docs-results]",
"Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
"Finished analysis");
}
public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
@ -162,6 +169,13 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertProgress(id, 100, 100, 100, 100);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Finished analysis");
}
public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Exception {
@ -234,9 +248,16 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertProgress(id, 100, 100, 100, 100);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Finished analysis");
}
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() {
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";
client().admin().indices().prepareCreate(sourceIndex)
@ -284,6 +305,13 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
} else {
logger.debug("We stopped during reindexing: [{}] < [{}]", searchResponse.getHits().getTotalHits().value, docCount);
}
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [test-stop-outlier-detection-with-enough-docs-to-scroll-results]",
"Stopped analytics");
}
public void testOutlierDetectionWithMultipleSourceIndices() throws Exception {
@ -338,6 +366,13 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertProgress(id, 100, 100, 100, 100);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Finished analysis");
}
public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
@ -388,9 +423,16 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertProgress(id, 100, 100, 100, 100);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Started analytics",
"Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Finished analysis");
}
public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() {
public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception {
String sourceIndex = "test-model-memory-limit";
client().admin().indices().prepareCreate(sourceIndex)
@ -429,5 +471,9 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
exception.getMessage(),
startsWith("Cannot start because the configured model memory limit [" + modelMemoryLimit +
"] is lower than the expected memory usage"));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be");
}
}

View File

@ -368,6 +368,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
public MachineLearning(Settings settings, Path configPath) {
@ -472,6 +473,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
@ -562,8 +564,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client);
assert client instanceof NodeClient;
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client,
dataFrameAnalyticsConfigProvider, analyticsProcessManager);
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager(
(NodeClient) client, dataFrameAnalyticsConfigProvider, analyticsProcessManager, dataFrameAnalyticsAuditor);
this.dataFrameAnalyticsManager.set(dataFrameAnalyticsManager);
// Components shared by anomaly detection and data frame analytics
@ -617,7 +619,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
memoryTracker.get(), client),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()),
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings, client, clusterService, dataFrameAnalyticsManager.get(),
memoryTracker.get())
dataFrameAnalyticsAuditor.get(), memoryTracker.get())
);
}

View File

@ -40,14 +40,17 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -65,17 +68,20 @@ public class TransportDeleteDataFrameAnalyticsAction
private final Client client;
private final MlMemoryTracker memoryTracker;
private final DataFrameAnalyticsConfigProvider configProvider;
private final DataFrameAnalyticsAuditor auditor;
@Inject
public TransportDeleteDataFrameAnalyticsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
MlMemoryTracker memoryTracker, DataFrameAnalyticsConfigProvider configProvider) {
MlMemoryTracker memoryTracker, DataFrameAnalyticsConfigProvider configProvider,
DataFrameAnalyticsAuditor auditor) {
super(DeleteDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.memoryTracker = memoryTracker;
this.configProvider = configProvider;
this.auditor = Objects.requireNonNull(auditor);
}
@Override
@ -112,7 +118,7 @@ public class TransportDeleteDataFrameAnalyticsAction
// We clean up the memory tracker on delete because there is no stop; the task stops by itself
memoryTracker.removeDataFrameAnalyticsJob(id);
// Step 2. Delete the config
// Step 3. Delete the config
ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
@ -130,7 +136,7 @@ public class TransportDeleteDataFrameAnalyticsAction
listener::onFailure
);
// Step 1. Delete state
// Step 2. Delete state
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> deleteState(parentTaskClient, id, deleteStateHandler),
listener::onFailure
@ -152,6 +158,7 @@ public class TransportDeleteDataFrameAnalyticsAction
}
assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
LOGGER.info("[{}] Deleted", id);
auditor.info(id, Messages.DATA_FRAME_ANALYTICS_AUDIT_DELETED);
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure

View File

@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.io.IOException;
import java.time.Instant;
@ -64,6 +65,7 @@ public class TransportPutDataFrameAnalyticsAction
private final Client client;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DataFrameAnalyticsAuditor auditor;
private volatile ByteSizeValue maxModelMemoryLimit;
@ -71,7 +73,7 @@ public class TransportPutDataFrameAnalyticsAction
public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
XPackLicenseState licenseState, Client client, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameAnalyticsConfigProvider configProvider) {
DataFrameAnalyticsConfigProvider configProvider, DataFrameAnalyticsAuditor auditor) {
super(PutDataFrameAnalyticsAction.NAME, transportService, actionFilters, PutDataFrameAnalyticsAction.Request::new);
this.licenseState = licenseState;
this.configProvider = configProvider;
@ -81,6 +83,7 @@ public class TransportPutDataFrameAnalyticsAction
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
this.auditor = Objects.requireNonNull(auditor);
maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings);
clusterService.getClusterSettings()
@ -179,7 +182,14 @@ public class TransportPutDataFrameAnalyticsAction
client,
clusterState,
ActionListener.wrap(
unused -> configProvider.put(config, headers, listener),
unused -> configProvider.put(config, headers, ActionListener.wrap(
indexResponse -> {
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, config.getAnalysis().getWriteableName()));
listener.onResponse(indexResponse);
},
listener::onFailure)),
listener::onFailure));
}

View File

@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -63,6 +64,7 @@ import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import java.io.IOException;
@ -91,13 +93,15 @@ public class TransportStartDataFrameAnalyticsAction
private final PersistentTasksService persistentTasksService;
private final DataFrameAnalyticsConfigProvider configProvider;
private final MlMemoryTracker memoryTracker;
private final DataFrameAnalyticsAuditor auditor;
@Inject
public TransportStartDataFrameAnalyticsAction(TransportService transportService, Client client, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, XPackLicenseState licenseState,
IndexNameExpressionResolver indexNameExpressionResolver,
PersistentTasksService persistentTasksService,
DataFrameAnalyticsConfigProvider configProvider, MlMemoryTracker memoryTracker) {
DataFrameAnalyticsConfigProvider configProvider, MlMemoryTracker memoryTracker,
DataFrameAnalyticsAuditor auditor) {
super(StartDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
StartDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
@ -105,6 +109,7 @@ public class TransportStartDataFrameAnalyticsAction
this.persistentTasksService = persistentTasksService;
this.configProvider = configProvider;
this.memoryTracker = memoryTracker;
this.auditor = Objects.requireNonNull(auditor);
}
@Override
@ -146,8 +151,8 @@ public class TransportStartDataFrameAnalyticsAction
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot open data frame analytics [" + request.getId() +
"] because it has already been opened", RestStatus.CONFLICT, e);
e = new ElasticsearchStatusException("Cannot start data frame analytics [" + request.getId() +
"] because it has already been started", RestStatus.CONFLICT, e);
}
listener.onFailure(e);
}
@ -169,6 +174,11 @@ public class TransportStartDataFrameAnalyticsAction
// Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks
ActionListener<EstimateMemoryUsageAction.Response> estimateMemoryUsageListener = ActionListener.wrap(
estimateMemoryUsageResponse -> {
auditor.info(
request.getId(),
Messages.getMessage(
Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE,
estimateMemoryUsageResponse.getExpectedMemoryWithoutDisk()));
// Validate that model memory limit is sufficient to run the analysis
if (configHolder.get().getModelMemoryLimit()
.compareTo(estimateMemoryUsageResponse.getExpectedMemoryWithoutDisk()) < 0) {
@ -302,6 +312,7 @@ public class TransportStartDataFrameAnalyticsAction
// what would have happened if the error had been detected in the "fast fail" validation
cancelAnalyticsStart(task, predicate.exception, listener);
} else {
auditor.info(task.getParams().getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED);
listener.onResponse(new AcknowledgedResponse(true));
}
}
@ -313,8 +324,8 @@ public class TransportStartDataFrameAnalyticsAction
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchException("Starting data frame analytics [" + task.getParams().getId()
+ "] timed out after [" + timeout + "]"));
listener.onFailure(new ElasticsearchException(
"Starting data frame analytics [" + task.getParams().getId() + "] timed out after [" + timeout + "]"));
}
});
}
@ -323,7 +334,7 @@ public class TransportStartDataFrameAnalyticsAction
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile Exception exception;
@ -407,6 +418,7 @@ public class TransportStartDataFrameAnalyticsAction
private final Client client;
private final ClusterService clusterService;
private final DataFrameAnalyticsManager manager;
private final DataFrameAnalyticsAuditor auditor;
private final MlMemoryTracker memoryTracker;
private volatile int maxMachineMemoryPercent;
@ -415,11 +427,12 @@ public class TransportStartDataFrameAnalyticsAction
private volatile ClusterState clusterState;
public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager,
MlMemoryTracker memoryTracker) {
DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker) {
super(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.manager = Objects.requireNonNull(manager);
this.auditor = Objects.requireNonNull(auditor);
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
@ -436,8 +449,8 @@ public class TransportStartDataFrameAnalyticsAction
long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask,
Map<String, String> headers) {
return new DataFrameAnalyticsTask(id, type, action, parentTaskId, headers, client, clusterService, manager,
persistentTask.getParams());
return new DataFrameAnalyticsTask(
id, type, action, parentTaskId, headers, client, clusterService, manager, auditor, persistentTask.getParams());
}
@Override

View File

@ -32,16 +32,19 @@ import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -59,17 +62,20 @@ public class TransportStopDataFrameAnalyticsAction
private final ThreadPool threadPool;
private final PersistentTasksService persistentTasksService;
private final DataFrameAnalyticsConfigProvider configProvider;
private final DataFrameAnalyticsAuditor auditor;
@Inject
public TransportStopDataFrameAnalyticsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool,
PersistentTasksService persistentTasksService,
DataFrameAnalyticsConfigProvider configProvider) {
DataFrameAnalyticsConfigProvider configProvider,
DataFrameAnalyticsAuditor auditor) {
super(StopDataFrameAnalyticsAction.NAME, clusterService, transportService, actionFilters, StopDataFrameAnalyticsAction.Request::new,
StopDataFrameAnalyticsAction.Response::new, StopDataFrameAnalyticsAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.persistentTasksService = persistentTasksService;
this.configProvider = configProvider;
this.auditor = Objects.requireNonNull(auditor);
}
@Override
@ -258,7 +264,10 @@ public class TransportStopDataFrameAnalyticsAction
persistentTasksService.waitForPersistentTasksCondition(persistentTasks ->
filterPersistentTasks(persistentTasks, analyticsIds).isEmpty(),
request.getTimeout(), ActionListener.wrap(
booleanResponse -> listener.onResponse(response),
booleanResponse -> {
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
listener.onResponse(response);
},
listener::onFailure
));
}

View File

@ -32,11 +32,13 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.time.Clock;
import java.util.Objects;
@ -54,12 +56,14 @@ public class DataFrameAnalyticsManager {
private final NodeClient client;
private final DataFrameAnalyticsConfigProvider configProvider;
private final AnalyticsProcessManager processManager;
private final DataFrameAnalyticsAuditor auditor;
public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
AnalyticsProcessManager processManager) {
AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor) {
this.client = Objects.requireNonNull(client);
this.configProvider = Objects.requireNonNull(configProvider);
this.processManager = Objects.requireNonNull(processManager);
this.auditor = Objects.requireNonNull(auditor);
}
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState, ClusterState clusterState) {
@ -158,6 +162,9 @@ public class DataFrameAnalyticsManager {
return;
}
task.setReindexingFinished();
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.ML_ORIGIN,
RefreshAction.INSTANCE,
@ -190,6 +197,9 @@ public class DataFrameAnalyticsManager {
// Create destination index if it does not exist
ActionListener<GetIndexResponse> destIndexListener = ActionListener.wrap(
indexResponse -> {
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX, indexResponse.indices()[0]));
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
@ -198,6 +208,9 @@ public class DataFrameAnalyticsManager {
},
e -> {
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
} else {
@ -225,6 +238,7 @@ public class DataFrameAnalyticsManager {
if (error != null) {
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
} else {
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
}
}),

View File

@ -31,10 +31,12 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.util.Arrays;
import java.util.List;
@ -52,6 +54,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private final Client client;
private final ClusterService clusterService;
private final DataFrameAnalyticsManager analyticsManager;
private final DataFrameAnalyticsAuditor auditor;
private final StartDataFrameAnalyticsAction.TaskParams taskParams;
@Nullable
private volatile Long reindexingTaskId;
@ -61,11 +64,12 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager,
StartDataFrameAnalyticsAction.TaskParams taskParams) {
DataFrameAnalyticsAuditor auditor, StartDataFrameAnalyticsAction.TaskParams taskParams) {
super(id, type, action, MlTasks.DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + taskParams.getId(), parentTask, headers);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.analyticsManager = Objects.requireNonNull(analyticsManager);
this.auditor = Objects.requireNonNull(auditor);
this.taskParams = Objects.requireNonNull(taskParams);
}
@ -159,11 +163,17 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
getParams().getId(), state, reason), e)
));
updatePersistentTaskState(
newTaskState,
ActionListener.wrap(
updatedTask -> {
auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state));
LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state);
},
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
getParams().getId(), state, reason), e)
)
);
}
public void updateReindexTaskProgress(ActionListener<Void> listener) {