[ML] Audit message when nightly maintenance times out (#63252) (#63330)

During deletion of old ml data set the delete by query timeout to 8 hours and
audit a job message when the nightly maintenance task times out.
This commit is contained in:
David Kyle 2020-10-06 16:19:37 +01:00 committed by GitHub
parent 058c55da6a
commit ea32b4ab82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 127 additions and 21 deletions

View File

@ -100,6 +100,14 @@ dynamic setting in version 7.1. As a result, changes to its value after node
startup are used only after every node in the cluster is running version 7.1 or
higher. The maximum permitted value is `512`.
`xpack.ml.nightly_maintenance_requests_per_second`::
(<<cluster-update-settings,Dynamic>>) The rate at which the nightly maintenance task
deletes expired model snapshots and results. The setting is a proxy to the
[requests_per_second](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#_throttling_delete_requests)
parameter used in the Delete by query requests and controls throttling.
Valid values must be greater than `0.0` or equal to `-1.0` where `-1.0` means a default value
is used. Defaults to `-1.0`
`xpack.ml.node_concurrent_job_allocations`::
(<<cluster-update-settings,Dynamic>>) The maximum number of jobs that can
concurrently be in the `opening` state on each node. Typically, jobs spend a

View File

@ -33,6 +33,9 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
// The special ID that means the message applies to all jobs/resources.
public static final String All_RESOURCES_ID = "";
private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
static final int MAX_BUFFER_SIZE = 1000;

View File

@ -62,6 +62,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
private Float requestsPerSecond;
private TimeValue timeout;
private String jobId;
private String [] expandedJobIds;
public Request() {}
@ -111,6 +112,20 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
return this;
}
/**
* Not serialized, the expanded job Ids should only be used
* on the executing node.
* @return The expanded Ids in the case where {@code jobId} is not `_all`
* otherwise null.
*/
public String [] getExpandedJobIds() {
return expandedJobIds;
}
public void setExpandedJobIds(String [] expandedJobIds) {
this.expandedJobIds = expandedJobIds;
}
@Override
public ActionRequestValidationException validate() {
if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
@ -128,12 +143,13 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
Request request = (Request) o;
return Objects.equals(requestsPerSecond, request.requestsPerSecond)
&& Objects.equals(jobId, request.jobId)
&& Objects.equals(expandedJobIds, request.expandedJobIds)
&& Objects.equals(timeout, request.timeout);
}
@Override
public int hashCode() {
return Objects.hash(requestsPerSecond, timeout, jobId);
return Objects.hash(requestsPerSecond, timeout, jobId, expandedJobIds);
}
@Override
@ -146,6 +162,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalString(jobId);
}
// expandedJobIds are set on the node and not part of serialisation
}
}
@ -159,7 +176,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
private static final ParseField DELETED = new ParseField("deleted");
private boolean deleted;
private final boolean deleted;
public Response(boolean deleted) {
this.deleted = deleted;

View File

@ -16,12 +16,14 @@ import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -53,8 +55,6 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private static final Logger logger = LogManager.getLogger(TransportDeleteExpiredDataAction.class);
static final Duration DEFAULT_MAX_DURATION = Duration.ofHours(8);
private final ThreadPool threadPool;
private final String executor;
private final OriginSettingClient client;
@ -62,18 +62,21 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private final Clock clock;
private final JobConfigProvider jobConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final AnomalyDetectionAuditor auditor;
@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider) {
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
AnomalyDetectionAuditor auditor) {
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
jobConfigProvider, jobResultsProvider, Clock.systemUTC());
jobConfigProvider, jobResultsProvider, auditor, Clock.systemUTC());
}
TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, Clock clock) {
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
AnomalyDetectionAuditor auditor, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
@ -82,6 +85,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
this.clock = clock;
this.jobConfigProvider = jobConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.auditor = auditor;
}
@Override
@ -89,7 +93,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data");
Instant timeoutTime = Instant.now(clock).plus(
request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
request.getTimeout() == null ? Duration.ofMillis(MlDataRemover.DEFAULT_MAX_DURATION.getMillis()) :
Duration.ofMillis(request.getTimeout().millis())
);
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
@ -97,7 +102,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService);
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(request.getJobId())) {
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)
@ -107,6 +112,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
jobBuilders -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
String [] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
request.setExpandedJobIds(jobIds);
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, auditor);
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
}
@ -133,10 +140,11 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
(float) (AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes :
Float.POSITIVE_INFINITY;
}
deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
deleteExpiredData(request, dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
}
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
void deleteExpiredData(DeleteExpiredDataAction.Request request,
Iterator<MlDataRemover> mlDataRemoversIterator,
float requestsPerSecond,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier,
@ -146,6 +154,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse ->
deleteExpiredData(
request,
mlDataRemoversIterator,
requestsPerSecond,
listener,
@ -163,9 +172,28 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
} else {
if (haveAllPreviousDeletionsCompleted) {
logger.info("Completed deletion of expired ML data");
} else {
if (isTimedOutSupplier.get()) {
TimeValue timeoutPeriod = request.getTimeout() == null ? MlDataRemover.DEFAULT_MAX_DURATION :
request.getTimeout();
String msg = "Deleting expired ML data was cancelled after the timeout period of [" +
timeoutPeriod + "] was exceeded. The setting [xpack.ml.nightly_maintenance_requests_per_second] " +
"controls the deletion rate, consider increasing the value to assist in pruning old data";
logger.warn(msg);
if (Strings.isNullOrEmpty(request.getJobId())
|| Strings.isAllOrWildcard(request.getJobId())
|| request.getExpandedJobIds() == null) {
auditor.warning(AbstractAuditor.All_RESOURCES_ID, msg);
} else {
for (String jobId : request.getExpandedJobIds()) {
auditor.warning(jobId, msg);
}
}
} else {
logger.info("Halted deletion of expired ML data until next invocation");
}
}
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}

View File

@ -175,6 +175,7 @@ public class ExpiredForecastsRemover implements MlDataRemover {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
request.setTimeout(DEFAULT_MAX_DURATION);
request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);

View File

@ -123,6 +123,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)
// We are deleting old data, we should simply proceed as a version conflict could mean that another deletion is taking place
.setAbortOnVersionConflict(false)
.setTimeout(DEFAULT_MAX_DURATION)
.setRequestsPerSecond(requestsPerSec);
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));

View File

@ -7,11 +7,14 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.util.function.Supplier;
public interface MlDataRemover {
TimeValue DEFAULT_MAX_DURATION = TimeValue.timeValueHours(8L);
void remove(float requestsPerSecond, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
/**

View File

@ -142,6 +142,7 @@ public class UnusedStateRemover implements MlDataRemover {
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRequestsPerSecond(requestsPerSec)
.setTimeout(DEFAULT_MAX_DURATION)
.setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));
// _doc is the most efficient sort order and will also disable scoring

View File

@ -99,6 +99,7 @@ public class UnusedStatsRemover implements MlDataRemover {
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRequestsPerSecond(requestsPerSec)
.setTimeout(DEFAULT_MAX_DURATION)
.setQuery(dbq);
deleteByQueryRequest.setParentTask(parentTaskId);

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.junit.After;
import org.junit.Before;
@ -30,13 +31,20 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TransportDeleteExpiredDataActionTests extends ESTestCase {
private ThreadPool threadPool;
private TransportDeleteExpiredDataAction transportDeleteExpiredDataAction;
private AnomalyDetectionAuditor auditor;
/**
* A data remover that only checks for timeouts.
@ -60,9 +68,10 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
auditor = mock(AnomalyDetectionAuditor.class);
transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService,
new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class),
mock(JobResultsProvider.class),
mock(JobResultsProvider.class), auditor,
Clock.systemUTC());
}
@ -85,7 +94,8 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
Supplier<Boolean> isTimedOutSupplier = () -> false;
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
assertTrue(succeeded.get());
}
@ -105,8 +115,41 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
request.setJobId("_all");
transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
assertFalse(succeeded.get());
verify(auditor, times(1)).warning("",
"Deleting expired ML data was cancelled after the timeout period of [8h] was exceeded. " +
"The setting [xpack.ml.nightly_maintenance_requests_per_second] " +
"controls the deletion rate, consider increasing the value to assist in pruning old data");
verifyNoMoreInteractions(auditor);
}
public void testDeleteExpiredDataIterationWithTimeout_GivenJobIds() {
final int numRemovers = randomIntBetween(2, 5);
AtomicInteger removersRemaining = new AtomicInteger(randomIntBetween(0, numRemovers - 1));
List<MlDataRemover> removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList());
AtomicBoolean succeeded = new AtomicBoolean();
ActionListener<DeleteExpiredDataAction.Response> finalListener = ActionListener.wrap(
response -> succeeded.set(response.isDeleted()),
e -> fail(e.getMessage())
);
Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
request.setJobId("foo*");
request.setExpandedJobIds(new String[] {"foo1", "foo2"});
transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
assertFalse(succeeded.get());
verify(auditor, times(1)).warning(eq("foo1"), anyString());
verify(auditor, times(1)).warning(eq("foo2"), anyString());
verifyNoMoreInteractions(auditor);
}
}