[ML] Audit when unassigned datafeeds are stopped (#55667)

Previously audit messages were indexed when datafeeds that were
assigned to a node were stopped, but not datafeeds that were
unassigned at the time they were stopped.

This change adds auditing for the unassigned case.

Backport of #55656
This commit is contained in:
David Roberts 2020-04-23 20:46:35 +01:00 committed by GitHub
parent dd5c96c2ed
commit 46be9959a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 15 deletions

View File

@ -29,16 +29,20 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -52,17 +56,18 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final PersistentTasksService persistentTasksService; private final PersistentTasksService persistentTasksService;
private final DatafeedConfigProvider datafeedConfigProvider; private final DatafeedConfigProvider datafeedConfigProvider;
private final AnomalyDetectionAuditor auditor;
@Inject @Inject
public TransportStopDatafeedAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, public TransportStopDatafeedAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, PersistentTasksService persistentTasksService, ClusterService clusterService, PersistentTasksService persistentTasksService,
DatafeedConfigProvider datafeedConfigProvider) { DatafeedConfigProvider datafeedConfigProvider, AnomalyDetectionAuditor auditor) {
super(StopDatafeedAction.NAME, clusterService, transportService, actionFilters, StopDatafeedAction.Request::new, super(StopDatafeedAction.NAME, clusterService, transportService, actionFilters, StopDatafeedAction.Request::new,
StopDatafeedAction.Response::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME); StopDatafeedAction.Response::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.threadPool = threadPool; this.threadPool = Objects.requireNonNull(threadPool);
this.persistentTasksService = persistentTasksService; this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
this.datafeedConfigProvider = datafeedConfigProvider; this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.auditor = Objects.requireNonNull(auditor);
} }
/** /**
@ -143,7 +148,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()])); request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
if (request.isForce()) { if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, notStoppedDatafeeds); forceStopDatafeed(request, listener, tasks, nodes, notStoppedDatafeeds);
} else { } else {
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds); normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
} }
@ -171,9 +176,12 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
// so can be gracefully stopped simply by removing its persistent task. (Usually // so can be gracefully stopped simply by removing its persistent task. (Usually
// a graceful stop cannot be achieved by simply removing the persistent task, but // a graceful stop cannot be achieved by simply removing the persistent task, but
// if the datafeed has no running code then graceful/forceful are the same.) // if the datafeed has no running code then graceful/forceful are the same.)
// The listener here can be a no-op, as waitForDatafeedStopped() already waits for // The listener here doesn't need to call the final listener, as waitForDatafeedStopped()
// these persistent tasks to disappear. // already waits for these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {})); persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(
r -> auditDatafeedStopped(datafeedTask),
e -> logger.error("[" + datafeedId + "] failed to remove task to stop unassigned datafeed", e))
);
} }
} }
@ -204,8 +212,15 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
super.doExecute(task, request, finalListener); super.doExecute(task, request, finalListener);
} }
private void auditDatafeedStopped(PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask) {
@SuppressWarnings("unchecked")
String jobId =
((PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams>) datafeedTask).getParams().getJobId();
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
}
private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener, private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
PersistentTasksCustomMetadata tasks, final List<String> notStoppedDatafeeds) { PersistentTasksCustomMetadata tasks, DiscoveryNodes nodes, final List<String> notStoppedDatafeeds) {
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size()); final AtomicArray<Exception> failures = new AtomicArray<>(notStoppedDatafeeds.size());
@ -216,6 +231,11 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) { public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
// For force stop, only audit here if the datafeed was unassigned at the time of the stop, hence inactive.
// If the datafeed was active then it audits itself on being cancelled.
if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes)) {
auditDatafeedStopped(datafeedTask);
}
if (counter.incrementAndGet() == notStoppedDatafeeds.size()) { if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures); sendResponseOrFailure(request.getDatafeedId(), listener, failures);
} }

View File

@ -187,13 +187,24 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed()); assertTrue(closeJobResponse.isClosed());
// We should have an audit message indicating that the job was closed // We should have an audit message indicating that the datafeed was stopped
String expectedAuditMessage = closeWithForce ? "Job is closing (forced)" : "Job is closing"; SearchRequest datafeedAuditSearchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
SearchRequest searchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX); datafeedAuditSearchRequest.source().query(new TermsQueryBuilder("message.raw", "Datafeed stopped"));
searchRequest.source().query(new TermsQueryBuilder("message.raw", expectedAuditMessage));
assertBusy(() -> { assertBusy(() -> {
assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)); assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX));
SearchResponse searchResponse = client().search(searchRequest).actionGet(); SearchResponse searchResponse = client().search(datafeedAuditSearchRequest).actionGet();
assertThat(searchResponse.getHits(), notNullValue());
assertThat(searchResponse.getHits().getHits(), arrayWithSize(1));
assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId));
});
// We should have an audit message indicating that the job was closed
String expectedAuditMessage = closeWithForce ? "Job is closing (forced)" : "Job is closing";
SearchRequest jobAuditSearchRequest = new SearchRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
jobAuditSearchRequest.source().query(new TermsQueryBuilder("message.raw", expectedAuditMessage));
assertBusy(() -> {
assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX));
SearchResponse searchResponse = client().search(jobAuditSearchRequest).actionGet();
assertThat(searchResponse.getHits(), notNullValue()); assertThat(searchResponse.getHits(), notNullValue());
assertThat(searchResponse.getHits().getHits(), arrayWithSize(1)); assertThat(searchResponse.getHits().getHits(), arrayWithSize(1));
assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId)); assertThat(searchResponse.getHits().getHits()[0].getSourceAsMap().get("job_id"), is(jobId));