[ML] Set parent task Id on ml expired data removers (#62854) (#62966)

Setting the parent task Id (of the delete expired data action) on the ML
expired data removers makes it easier to track and cancel long running
tasks
This commit is contained in:
David Kyle 2020-10-02 10:14:10 +01:00 committed by GitHub
parent d9d024c17f
commit 279f951700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 82 additions and 33 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
@ -116,7 +117,7 @@ public class UnusedStatsRemoverIT extends BaseMlIntegTestCase {
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client);
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
statsRemover.remove(10000.0f, deletionListener, () -> false);
deletionListener.actionGet();

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
@ -88,11 +89,13 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
);
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
List<MlDataRemover> dataRemovers = createDataRemovers(client, auditor);
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)
);
@ -101,7 +104,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
jobBuilders -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, auditor);
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, auditor);
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
}
);
@ -164,24 +167,28 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
}
}
private List<MlDataRemover> createDataRemovers(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
private List<MlDataRemover> createDataRemovers(OriginSettingClient client,
TaskId parentTaskId,
AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client),
new UnusedStatsRemover(client));
new ExpiredResultsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client,
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
}
private List<MlDataRemover> createDataRemovers(List<Job> jobs, AnomalyDetectionAuditor auditor) {
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor auditor) {
return Arrays.asList(
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client),
new UnusedStatsRemover(client));
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId),
new UnusedStateRemover(client, clusterService, parentTaskId),
new EmptyStateIndexRemover(client, parentTaskId),
new UnusedStatsRemover(client, parentTaskId));
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -27,10 +28,16 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
protected final OriginSettingClient client;
private final Iterator<Job> jobIterator;
private final TaskId parentTaskId;
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator) {
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId) {
this.client = client;
this.jobIterator = jobIterator;
this.parentTaskId = parentTaskId;
}
protected TaskId getParentTaskId() {
return parentTaskId;
}
@Override

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import java.util.Arrays;
@ -27,9 +28,11 @@ import static java.util.stream.Collectors.toSet;
public class EmptyStateIndexRemover implements MlDataRemover {
private final OriginSettingClient client;
private final TaskId parentTaskId;
public EmptyStateIndexRemover(OriginSettingClient client) {
public EmptyStateIndexRemover(OriginSettingClient client, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.parentTaskId = parentTaskId;
}
@Override
@ -70,6 +73,7 @@ public class EmptyStateIndexRemover implements MlDataRemover {
private void getEmptyStateIndices(ActionListener<Set<String>> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern());
indicesStatsRequest.setParentTask(parentTaskId);
client.admin().indices().stats(
indicesStatsRequest,
ActionListener.wrap(
@ -88,6 +92,7 @@ public class EmptyStateIndexRemover implements MlDataRemover {
private void getCurrentStateIndices(ActionListener<Set<String>> listener) {
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
getIndexRequest.setParentTask(parentTaskId);
client.admin().indices().getIndex(
getIndexRequest,
ActionListener.wrap(
@ -102,6 +107,7 @@ public class EmptyStateIndexRemover implements MlDataRemover {
private void executeDeleteEmptyStateIndices(Set<String> emptyStateIndices, ActionListener<Boolean> listener) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0]));
deleteIndexRequest.setParentTask(parentTaskId);
client.admin().indices().delete(
deleteIndexRequest,
ActionListener.wrap(

View File

@ -24,6 +24,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -59,11 +60,13 @@ public class ExpiredForecastsRemover implements MlDataRemover {
private final OriginSettingClient client;
private final ThreadPool threadPool;
private final long cutoffEpochMs;
private final TaskId parentTaskId;
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) {
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
this.parentTaskId = parentTaskId;
}
@Override
@ -90,6 +93,7 @@ public class ExpiredForecastsRemover implements MlDataRemover {
SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
searchRequest.setParentTask(parentTaskId);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}
@ -114,6 +118,8 @@ public class ExpiredForecastsRemover implements MlDataRemover {
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete)
.setRequestsPerSecond(requestsPerSec)
.setAbortOnVersionConflict(false);
request.setParentTask(parentTaskId);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
@ -65,8 +66,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
private final ThreadPool threadPool;
public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator, ThreadPool threadPool) {
super(client, jobIterator);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
ThreadPool threadPool, TaskId parentTaskId) {
super(client, jobIterator, parentTaskId);
this.threadPool = Objects.requireNonNull(threadPool);
}
@ -118,6 +120,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
searchRequest.setParentTask(getParentTaskId());
client.search(searchRequest, ActionListener.wrap(
response -> {
@ -176,6 +179,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
searchRequest.source(source);
searchRequest.setParentTask(getParentTaskId());
long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
@ -233,6 +237,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
deleteSnapshotRequest.setParentTask(getParentTaskId());
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@ -71,9 +72,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;
public ExpiredResultsRemover(OriginSettingClient client, Iterator<Job> jobIterator,
public ExpiredResultsRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId,
AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client, jobIterator);
super(client, jobIterator, parentTaskId);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}
@ -93,6 +94,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs);
request.setParentTask(getParentTaskId());
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
@ -167,6 +169,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
searchRequest.setParentTask(getParentTaskId());
client.search(searchRequest, ActionListener.wrap(
response -> {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -51,10 +52,13 @@ public class UnusedStateRemover implements MlDataRemover {
private final OriginSettingClient client;
private final ClusterService clusterService;
private final TaskId parentTaskId;
public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) {
public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService,
TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.parentTaskId = Objects.requireNonNull(parentTaskId);
}
@Override
@ -142,6 +146,7 @@ public class UnusedStateRemover implements MlDataRemover {
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
deleteByQueryRequest.setParentTask(parentTaskId);
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
@ -163,7 +168,7 @@ public class UnusedStateRemover implements MlDataRemover {
private static class JobIdExtractor {
private static List<Function<String, String>> extractors = Arrays.asList(
private static final List<Function<String, String>> extractors = Arrays.asList(
ModelState::extractJobId,
Quantiles::extractJobId,
CategorizerState::extractJobId,

View File

@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -41,9 +42,11 @@ public class UnusedStatsRemover implements MlDataRemover {
private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class);
private final OriginSettingClient client;
private final TaskId parentTaskId;
public UnusedStatsRemover(OriginSettingClient client) {
public UnusedStatsRemover(OriginSettingClient client, TaskId parentTaskId) {
this.client = Objects.requireNonNull(client);
this.parentTaskId = Objects.requireNonNull(parentTaskId);
}
@Override
@ -97,6 +100,7 @@ public class UnusedStatsRemover implements MlDataRemover {
.setAbortOnVersionConflict(false)
.setRequestsPerSecond(requestsPerSec)
.setQuery(dbq);
deleteByQueryRequest.setParentTask(parentTaskId);
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -48,7 +49,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
private int getRetentionDaysCallCount = 0;
ConcreteExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator) {
super(client, jobIterator);
super(client, jobIterator, new TaskId("test", 0L));
}
@Override

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
@ -59,7 +60,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
listener = mock(ActionListener.class);
deleteIndexRequestCaptor = ArgumentCaptor.forClass(DeleteIndexRequest.class);
remover = new EmptyStateIndexRemover(originSettingClient);
remover = new EmptyStateIndexRemover(originSettingClient, new TaskId("test", 0L));
}
@After

View File

@ -14,6 +14,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
@ -252,7 +253,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
return null;
}
).when(executor).execute(any());
return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool);
return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool, new TaskId("test", 0L));
}
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {

View File

@ -13,6 +13,7 @@ import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
@ -197,6 +198,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
}
).when(executor).execute(any());
return new ExpiredResultsRemover(originSettingClient, jobIterator, mock(AnomalyDetectionAuditor.class), threadPool);
return new ExpiredResultsRemover(originSettingClient, jobIterator, new TaskId("test", 0L),
mock(AnomalyDetectionAuditor.class), threadPool);
}
}