From 46be9959a066fbfb37caa3c68602b99ad7604a3d Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 23 Apr 2020 20:46:35 +0100 Subject: [PATCH] [ML] Audit when unassigned datafeeds are stopped (#55667) Previously audit messages were indexed when datafeeds that were assigned to a node were stopped, but not datafeeds that were unassigned at the time they were stopped. This change adds auditing for the unassigned case. Backport of #55656 --- .../action/TransportStopDatafeedAction.java | 40 ++++++++++++++----- .../integration/MlDistributedFailureIT.java | 21 +++++++--- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index a5fa49029dc..8c2d5a1d4c6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -29,16 +29,20 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -52,17 +56,18 @@ public class TransportStopDatafeedAction extends TransportTasksAction {}, e -> {})); + // The listener here doesn't need to call the final listener, as waitForDatafeedStopped() + // already waits for these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap( + r -> auditDatafeedStopped(datafeedTask), + e -> logger.error("[" + datafeedId + "] failed to remove task to stop unassigned datafeed", e)) + ); } } @@ -204,8 +212,15 @@ public class TransportStopDatafeedAction extends TransportTasksAction datafeedTask) { + @SuppressWarnings("unchecked") + String jobId = + ((PersistentTasksCustomMetadata.PersistentTask) datafeedTask).getParams().getJobId(); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + } + private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener listener, - PersistentTasksCustomMetadata tasks, final List notStoppedDatafeeds) { + PersistentTasksCustomMetadata tasks, DiscoveryNodes nodes, final List notStoppedDatafeeds) { final AtomicInteger counter = new AtomicInteger(); final AtomicArray failures = new AtomicArray<>(notStoppedDatafeeds.size()); @@ -216,6 +231,11 @@ public class TransportStopDatafeedAction extends TransportTasksAction>() { @Override public void onResponse(PersistentTasksCustomMetadata.PersistentTask persistentTask) { + // For force stop, only audit here if the datafeed was unassigned at the time of the stop, hence inactive. + // If the datafeed was active then it audits itself on being cancelled. + if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes)) { + auditDatafeedStopped(datafeedTask); + } if (counter.incrementAndGet() == notStoppedDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index fbe78556449..3144bf60438 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -187,13 +187,24 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); assertTrue(closeJobResponse.isClosed()); - // We should have an audit message indicating that the job was closed - String expectedAuditMessage = closeWithForce ? "Job is closing (forced)" : "Job is closing"; - SearchRequest searchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX); - searchRequest.source().query(new TermsQueryBuilder("message.raw", expectedAuditMessage)); + // We should have an audit message indicating that the datafeed was stopped + SearchRequest datafeedAuditSearchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX); + datafeedAuditSearchRequest.source().query(new TermsQueryBuilder("message.raw", "Datafeed stopped")); assertBusy(() -> { assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)); - SearchResponse searchResponse = client().search(searchRequest).actionGet(); + SearchResponse searchResponse = client().search(datafeedAuditSearchRequest).actionGet(); + assertThat(searchResponse.getHits(), notNullValue()); + assertThat(searchResponse.getHits().getHits(), arrayWithSize(1)); + assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId)); + }); + + // We should have an audit message indicating that the job was closed + String expectedAuditMessage = closeWithForce ? "Job is closing (forced)" : "Job is closing"; + SearchRequest jobAuditSearchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX); + jobAuditSearchRequest.source().query(new TermsQueryBuilder("message.raw", expectedAuditMessage)); + assertBusy(() -> { + assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)); + SearchResponse searchResponse = client().search(jobAuditSearchRequest).actionGet(); assertThat(searchResponse.getHits(), notNullValue()); assertThat(searchResponse.getHits().getHits(), arrayWithSize(1)); assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId));