From 279f951700ddb32908fecf2af383c14c10976579 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 2 Oct 2020 10:14:10 +0100 Subject: [PATCH] [ML] Set parent task Id on ml expired data removers (#62854) (#62966) Setting the parent task Id (of the delete expired data action) on the ML expired data removers makes it easier to track and cancel long running tasks --- .../ml/integration/UnusedStatsRemoverIT.java | 3 +- .../TransportDeleteExpiredDataAction.java | 39 +++++++++++-------- .../AbstractExpiredJobDataRemover.java | 9 ++++- .../job/retention/EmptyStateIndexRemover.java | 12 ++++-- .../retention/ExpiredForecastsRemover.java | 8 +++- .../ExpiredModelSnapshotsRemover.java | 9 ++++- .../job/retention/ExpiredResultsRemover.java | 7 +++- .../ml/job/retention/UnusedStateRemover.java | 9 ++++- .../ml/job/retention/UnusedStatsRemover.java | 6 ++- .../AbstractExpiredJobDataRemoverTests.java | 3 +- .../EmptyStateIndexRemoverTests.java | 3 +- .../ExpiredModelSnapshotsRemoverTests.java | 3 +- .../retention/ExpiredResultsRemoverTests.java | 4 +- 13 files changed, 82 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java index 166978ef5d4..9fbc6b73ab7 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; @@ -116,7 +117,7 @@ public class UnusedStatsRemoverIT extends BaseMlIntegTestCase { client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); PlainActionFuture deletionListener = new PlainActionFuture<>(); - UnusedStatsRemover statsRemover = new UnusedStatsRemover(client); + UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L)); statsRemover.remove(10000.0f, deletionListener, () -> false); deletionListener.actionGet(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 2d9f00d1bdf..d8c97d165da 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -88,11 +89,13 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) { - List dataRemovers = createDataRemovers(client, auditor); + List dataRemovers = createDataRemovers(client, taskId, auditor); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( () -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier) ); @@ -101,7 +104,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { List jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); - List dataRemovers = createDataRemovers(jobs, auditor); + List dataRemovers = createDataRemovers(jobs, taskId, auditor); deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier); } ); @@ -164,24 +167,28 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction createDataRemovers(OriginSettingClient client, AnomalyDetectionAuditor auditor) { + private List createDataRemovers(OriginSettingClient client, + TaskId parentTaskId, + AnomalyDetectionAuditor auditor) { return Arrays.asList( - new ExpiredResultsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), auditor, threadPool), - new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool), - new UnusedStateRemover(client, clusterService), - new EmptyStateIndexRemover(client), - new UnusedStatsRemover(client)); + new ExpiredResultsRemover(client, + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), parentTaskId, auditor, threadPool), + new ExpiredForecastsRemover(client, threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, + new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)), threadPool, parentTaskId), + new UnusedStateRemover(client, clusterService, parentTaskId), + new EmptyStateIndexRemover(client, parentTaskId), + new UnusedStatsRemover(client, parentTaskId)); } - private List createDataRemovers(List jobs, AnomalyDetectionAuditor auditor) { + private List createDataRemovers(List jobs, TaskId parentTaskId, AnomalyDetectionAuditor auditor) { return Arrays.asList( - new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), auditor, threadPool), - new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool), - new UnusedStateRemover(client, clusterService), - new EmptyStateIndexRemover(client), - new UnusedStatsRemover(client)); + new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool), + new ExpiredForecastsRemover(client, threadPool, parentTaskId), + new ExpiredModelSnapshotsRemover(client, new VolatileCursorIterator<>(jobs), threadPool, parentTaskId), + new UnusedStateRemover(client, clusterService, parentTaskId), + new EmptyStateIndexRemover(client, parentTaskId), + new UnusedStatsRemover(client, parentTaskId)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index b001e8b537d..9487e14345f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -27,10 +28,16 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { protected final OriginSettingClient client; private final Iterator jobIterator; + private final TaskId parentTaskId; - AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator) { + AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator, TaskId parentTaskId) { this.client = client; this.jobIterator = jobIterator; + this.parentTaskId = parentTaskId; + } + + protected TaskId getParentTaskId() { + return parentTaskId; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java index 4522ff8b4b6..c7f39842a24 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import java.util.Arrays; @@ -25,11 +26,13 @@ import static java.util.stream.Collectors.toSet; * This class deletes empty indices matching .ml-state* pattern that are not pointed at by the .ml-state-write alias. */ public class EmptyStateIndexRemover implements MlDataRemover { - - private final OriginSettingClient client; - public EmptyStateIndexRemover(OriginSettingClient client) { + private final OriginSettingClient client; + private final TaskId parentTaskId; + + public EmptyStateIndexRemover(OriginSettingClient client, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); + this.parentTaskId = parentTaskId; } @Override @@ -70,6 +73,7 @@ public class EmptyStateIndexRemover implements MlDataRemover { private void getEmptyStateIndices(ActionListener> listener) { IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern()); + indicesStatsRequest.setParentTask(parentTaskId); client.admin().indices().stats( indicesStatsRequest, ActionListener.wrap( @@ -88,6 +92,7 @@ public class EmptyStateIndexRemover implements MlDataRemover { private void getCurrentStateIndices(ActionListener> listener) { GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); + getIndexRequest.setParentTask(parentTaskId); client.admin().indices().getIndex( getIndexRequest, ActionListener.wrap( @@ -102,6 +107,7 @@ public class EmptyStateIndexRemover implements MlDataRemover { private void executeDeleteEmptyStateIndices(Set emptyStateIndices, ActionListener listener) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0])); + deleteIndexRequest.setParentTask(parentTaskId); client.admin().indices().delete( deleteIndexRequest, ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index dbe2dc4dea7..a6ee96278cb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -59,11 +60,13 @@ public class ExpiredForecastsRemover implements MlDataRemover { private final OriginSettingClient client; private final ThreadPool threadPool; private final long cutoffEpochMs; + private final TaskId parentTaskId; - public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool) { + public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli(); + this.parentTaskId = parentTaskId; } @Override @@ -90,6 +93,7 @@ public class ExpiredForecastsRemover implements MlDataRemover { SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN); searchRequest.source(source); + searchRequest.setParentTask(parentTaskId); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } @@ -114,6 +118,8 @@ public class ExpiredForecastsRemover implements MlDataRemover { DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete) .setRequestsPerSecond(requestsPerSec) .setAbortOnVersionConflict(false); + + request.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index b3e7a74d85b..b6006a7f941 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; @@ -65,8 +66,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private final ThreadPool threadPool; - public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, ThreadPool threadPool) { - super(client, jobIterator); + public ExpiredModelSnapshotsRemover(OriginSettingClient client, Iterator jobIterator, + ThreadPool threadPool, TaskId parentTaskId) { + super(client, jobIterator, parentTaskId); this.threadPool = Objects.requireNonNull(threadPool); } @@ -118,6 +120,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + searchRequest.setParentTask(getParentTaskId()); client.search(searchRequest, ActionListener.wrap( response -> { @@ -176,6 +179,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); searchRequest.source(source); + searchRequest.setParentTask(getParentTaskId()); long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); @@ -233,6 +237,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover JobSnapshotId idPair = modelSnapshotIterator.next(); DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); + deleteSnapshotRequest.setParentTask(getParentTaskId()); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 0d61fcaa852..fd736f299e6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -71,9 +72,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final AnomalyDetectionAuditor auditor; private final ThreadPool threadPool; - public ExpiredResultsRemover(OriginSettingClient client, Iterator jobIterator, + public ExpiredResultsRemover(OriginSettingClient client, Iterator jobIterator, TaskId parentTaskId, AnomalyDetectionAuditor auditor, ThreadPool threadPool) { - super(client, jobIterator); + super(client, jobIterator, parentTaskId); this.auditor = Objects.requireNonNull(auditor); this.threadPool = Objects.requireNonNull(threadPool); } @@ -93,6 +94,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { ) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); + request.setParentTask(getParentTaskId()); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { @Override @@ -167,6 +169,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)); + searchRequest.setParentTask(getParentTaskId()); client.search(searchRequest, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index c27f6da09ec..4f69b7c021b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -51,10 +52,13 @@ public class UnusedStateRemover implements MlDataRemover { private final OriginSettingClient client; private final ClusterService clusterService; + private final TaskId parentTaskId; - public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService) { + public UnusedStateRemover(OriginSettingClient client, ClusterService clusterService, + TaskId parentTaskId) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); + this.parentTaskId = Objects.requireNonNull(parentTaskId); } @Override @@ -142,6 +146,7 @@ public class UnusedStateRemover implements MlDataRemover { // _doc is the most efficient sort order and will also disable scoring deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { @@ -163,7 +168,7 @@ public class UnusedStateRemover implements MlDataRemover { private static class JobIdExtractor { - private static List> extractors = Arrays.asList( + private static final List> extractors = Arrays.asList( ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java index fa7cb6ae6b2..8487be88af1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -41,9 +42,11 @@ public class UnusedStatsRemover implements MlDataRemover { private static final Logger LOGGER = LogManager.getLogger(UnusedStatsRemover.class); private final OriginSettingClient client; + private final TaskId parentTaskId; - public UnusedStatsRemover(OriginSettingClient client) { + public UnusedStatsRemover(OriginSettingClient client, TaskId parentTaskId) { this.client = Objects.requireNonNull(client); + this.parentTaskId = Objects.requireNonNull(parentTaskId); } @Override @@ -97,6 +100,7 @@ public class UnusedStatsRemover implements MlDataRemover { .setAbortOnVersionConflict(false) .setRequestsPerSecond(requestsPerSec) .setQuery(dbq); + deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 08ae5471139..56be5ba9036 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -48,7 +49,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { private int getRetentionDaysCallCount = 0; ConcreteExpiredJobDataRemover(OriginSettingClient client, Iterator jobIterator) { - super(client, jobIterator); + super(client, jobIterator, new TaskId("test", 0L)); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java index cf253cb8e42..9a2ed3fe209 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; @@ -59,7 +60,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { listener = mock(ActionListener.class); deleteIndexRequestCaptor = ArgumentCaptor.forClass(DeleteIndexRequest.class); - remover = new EmptyStateIndexRemover(originSettingClient); + remover = new EmptyStateIndexRemover(originSettingClient, new TaskId("test", 0L)); } @After diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 97c23286ea8..263e56ce7c6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -252,7 +253,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { return null; } ).when(executor).execute(any()); - return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool); + return new ExpiredModelSnapshotsRemover(originSettingClient, jobIterator, threadPool, new TaskId("test", 0L)); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index a8ee0e5848d..8aafbaaafcc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -197,6 +198,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { } ).when(executor).execute(any()); - return new ExpiredResultsRemover(originSettingClient, jobIterator, mock(AnomalyDetectionAuditor.class), threadPool); + return new ExpiredResultsRemover(originSettingClient, jobIterator, new TaskId("test", 0L), + mock(AnomalyDetectionAuditor.class), threadPool); } }