[7.x] Delete expired data by job (#57337) (#57796)

Deleting expired data can take a long time leading to timeouts if there
are many jobs. Often the problem is due to a few large jobs which 
prevent the regular maintenance of the remaining jobs. This change adds
a job_id parameter to the delete expired data endpoint to help clean up
those problematic jobs.
This commit is contained in:
David Kyle 2020-06-08 13:00:23 +01:00 committed by GitHub
parent 004eb8bd7e
commit 08d1286de7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 339 additions and 95 deletions

View File

@ -11,7 +11,9 @@ Deletes expired and unused machine learning data.
[[ml-delete-expired-data-request]]
==== {api-request-title}
`DELETE _ml/_delete_expired_data`
`DELETE _ml/_delete_expired_data` +
`DELETE _ml/_delete_expired_data/<job_id>`
[[ml-delete-expired-data-prereqs]]
==== {api-prereq-title}
@ -27,6 +29,19 @@ Deletes all job results, model snapshots and forecast data that have exceeded
their `retention days` period. Machine learning state documents that are not
associated with any job are also deleted.
You can limit the request to a single or set of {anomaly-jobs} by using a job identifier,
a group name, a comma-separated list of jobs, or a wildcard expression.
You can delete expired data for all {anomaly-jobs} by using `_all`, by specifying
`*` as the `<job_id>`, or by omitting the `<job_id>`.
[[ml-delete-expired-data-path-parms]]
==== {api-path-parms-title}
`<job_id>`::
(Optional, string)
Identifier for an {anomaly-job}. It can be a job identifier, a group name, or a
wildcard expression.
[[ml-delete-expired-data-request-body]]
==== {api-request-body-title}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -19,6 +20,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.io.IOException;
import java.util.Objects;
@ -46,10 +48,12 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND);
PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())),
TIMEOUT);
PARSER.declareString(Request::setJobId, Job.ID);
}
private Float requestsPerSecond;
private TimeValue timeout;
private String jobId = Metadata.ALL;
public Request() {}
@ -67,6 +71,9 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
this.requestsPerSecond = null;
this.timeout = null;
}
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
jobId = in.readString();
}
}
public Float getRequestsPerSecond() {
@ -77,6 +84,10 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
return timeout;
}
public String getJobId() {
return jobId;
}
public Request setRequestsPerSecond(Float requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
return this;
@ -87,6 +98,11 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
return this;
}
public Request setJobId(String jobId) {
this.jobId = jobId;
return this;
}
@Override
public ActionRequestValidationException validate() {
if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
@ -103,12 +119,13 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(requestsPerSecond, request.requestsPerSecond)
&& Objects.equals(jobId, request.jobId)
&& Objects.equals(timeout, request.timeout);
}
@Override
public int hashCode() {
return Objects.hash(requestsPerSecond, timeout);
return Objects.hash(requestsPerSecond, timeout, jobId);
}
@Override
@ -118,6 +135,9 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
out.writeOptionalFloat(requestsPerSecond);
out.writeOptionalTimeValue(timeout);
}
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeString(jobId);
}
}
}

View File

@ -270,6 +270,15 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return ANOMALY_DETECTOR_JOB_TYPE + "-" + jobId;
}
/**
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
*/
@Nullable
public static String extractJobIdFromDocumentId(String docId) {
String jobId = docId.replaceAll("^" + ANOMALY_DETECTOR_JOB_TYPE +"-", "");
return jobId.equals(docId) ? null : jobId;
}
/**
* Return the Job Id.

View File

@ -15,10 +15,17 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ
@Override
protected Request createTestInstance() {
return new Request(
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")
);
Request request = new Request();
if (randomBoolean()) {
request.setRequestsPerSecond(randomFloat());
}
if (randomBoolean()) {
request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
}
if (randomBoolean()) {
request.setJobId(randomAlphaOfLength(5));
}
return request;
}
@Override
@ -31,6 +38,12 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ
if (version.before(Version.V_7_8_0)) {
return new Request();
}
if (version.before(Version.V_7_9_0)) {
Request request = new Request();
request.setRequestsPerSecond(instance.getRequestsPerSecond());
request.setTimeout(instance.getTimeout());
return request;
}
return instance;
}
}

View File

@ -584,6 +584,15 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
}
public void testDocumentId() {
String jobFoo = "foo";
assertEquals("anomaly_detector-" + jobFoo, Job.documentId(jobFoo));
assertEquals(jobFoo, Job.extractJobIdFromDocumentId(
Job.documentId(jobFoo)
));
assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo"));
}
public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date);

View File

@ -88,9 +88,9 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool),
new UnusedStateRemover(client, clusterService),
new EmptyStateIndexRemover(client)
);

View File

@ -7,13 +7,13 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
@ -23,13 +23,17 @@ import java.io.InputStream;
public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {
public BatchedJobsIterator(OriginSettingClient client, String index) {
private final String jobIdExpression;
public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) {
super(client, index);
this.jobIdExpression = jobIdExpression;
}
@Override
protected QueryBuilder getQuery() {
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ",");
return JobConfigProvider.buildJobWildcardQuery(tokens, true);
}
@Override

View File

@ -518,7 +518,7 @@ public class JobConfigProvider {
boolean allowMissingConfigs,
ActionListener<SortedSet<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName(), null);
@ -579,7 +579,7 @@ public class JobConfigProvider {
*/
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
@ -773,7 +773,7 @@ public class JobConfigProvider {
}
}
private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
public static QueryBuilder buildJobWildcardQuery(String [] tokens, boolean excludeDeleting) {
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
// match all

View File

@ -31,9 +31,11 @@ import java.util.stream.Collectors;
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
private final OriginSettingClient client;
private final String jobIdExpression;
protected final OriginSettingClient client;
AbstractExpiredJobDataRemover(OriginSettingClient client) {
AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) {
this.jobIdExpression = jobIdExpression;
this.client = client;
}
@ -85,7 +87,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName(), jobIdExpression);
return new WrappedBatchedJobsIterator(jobsIterator);
}
@ -112,8 +114,44 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
/**
* BatchedJobsIterator efficiently returns batches of jobs using a scroll
* search but AbstractExpiredJobDataRemover works with one job at a time.
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {
public final long latestTimeMs;
public final long cutoffEpochMs;
public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}
@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}
/**
* A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one
* at a time from the batches returned by {@code BatchedJobsIterator}
*
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
@ -155,39 +193,4 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
return new VolatileCursorIterator<>(jobs);
}
}
/**
* The latest time that cutoffs are measured from is not wall clock time,
* but some other reference point that makes sense for the type of data
* being removed. This class groups the cutoff time with it's "latest"
* reference point.
*/
protected static final class CutoffDetails {
public final long latestTimeMs;
public final long cutoffEpochMs;
public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
this.latestTimeMs = latestTimeMs;
this.cutoffEpochMs = cutoffEpochMs;
}
@Override
public int hashCode() {
return Objects.hash(latestTimeMs, cutoffEpochMs);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof CutoffDetails == false) {
return false;
}
CutoffDetails that = (CutoffDetails) other;
return this.latestTimeMs == that.latestTimeMs &&
this.cutoffEpochMs == that.cutoffEpochMs;
}
}
}

View File

@ -63,12 +63,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;
private final OriginSettingClient client;
private final ThreadPool threadPool;
public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) {
super(jobIdExpression, client);
this.threadPool = Objects.requireNonNull(threadPool);
}

View File

@ -67,13 +67,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class);
private final OriginSettingClient client;
private final AnomalyDetectionAuditor auditor;
private final ThreadPool threadPool;
public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
public ExpiredResultsRemover(OriginSettingClient client, String jobIdExpression,
AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
super(jobIdExpression, client);
this.auditor = Objects.requireNonNull(auditor);
this.threadPool = Objects.requireNonNull(threadPool);
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
@ -110,10 +109,11 @@ public class UnusedStateRemover implements MlDataRemover {
// and remove cluster service as a member all together.
jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet());
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
while (jobsIterator.hasNext()) {
Deque<Job.Builder> jobs = jobsIterator.next();
jobs.stream().map(Job.Builder::getId).forEach(jobIds::add);
DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(),
QueryBuilders.termQuery(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
while (iterator.hasNext()) {
Deque<String> docIds = iterator.next();
docIds.stream().map(Job::extractJobIdFromDocumentId).filter(Objects::nonNull).forEach(jobIds::add);
}
return jobIds;
}

View File

@ -6,10 +6,12 @@
package org.elasticsearch.xpack.ml.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -22,7 +24,8 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return Collections.emptyList();
return Collections.singletonList(
new Route(DELETE, MachineLearning.BASE_PATH + "_delete_expired_data/{" + Job.ID.getPreferredName() + "}"));
}
@Override
@ -41,9 +44,34 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteExpiredDataAction.Request request = restRequest.hasContent() ?
DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) :
new DeleteExpiredDataAction.Request();
DeleteExpiredDataAction.Request request;
if (restRequest.hasContent()) {
request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null);
} else {
request = new DeleteExpiredDataAction.Request();
String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName());
if (perSecondParam != null) {
try {
request.setRequestsPerSecond(Float.parseFloat(perSecondParam));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse float parameter [" +
DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName() +
"] with value [" + perSecondParam + "]", e);
}
}
String timeoutParam = restRequest.param(DeleteExpiredDataAction.Request.TIMEOUT.getPreferredName());
if (timeoutParam != null) {
request.setTimeout(restRequest.paramAsTime(timeoutParam, null));
}
}
String jobId = restRequest.param(Job.ID.getPreferredName());
if (Strings.isNullOrEmpty(jobId) == false) {
request.setJobId(jobId);
}
return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -51,8 +51,8 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
private int getRetentionDaysCallCount = 0;
ConcreteExpiredJobDataRemover(OriginSettingClient client) {
super(client);
ConcreteExpiredJobDataRemover(String jobId, OriginSettingClient client) {
super(jobId, client);
}
@Override
@ -101,17 +101,6 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
return searchResponse;
}
@SuppressWarnings("unchecked")
static void givenJobs(Client client, List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
doAnswer(invocationOnMock -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
}
private static SearchResponse createSearchResponse(List<? extends ToXContent> toXContents, int totalHits) throws IOException {
SearchHit[] hitsArray = new SearchHit[toXContents.size()];
for (int i = 0; i < toXContents.size(); i++) {
@ -131,7 +120,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
mockSearchResponse(response);
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
remover.remove(1.0f,listener, () -> false);
listener.waitToCompletion();
@ -170,7 +159,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
remover.remove(1.0f,listener, () -> false);
listener.waitToCompletion();
@ -194,7 +183,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
mockSearchResponse(response);
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0);
listener.waitToCompletion();

View File

@ -256,7 +256,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
return null;
}
).when(executor).execute(any());
return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool);
return new ExpiredModelSnapshotsRemover(originSettingClient, "*", threadPool);
}
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {

View File

@ -63,7 +63,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
public void testRemove_GivenNoJobs() throws IOException {
givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList());
givenJobs(client, Collections.emptyList());
createExpiredResultsRemover().remove(1.0f, listener, () -> false);
@ -73,7 +73,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client,
givenJobs(client,
Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
@ -153,6 +153,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime)));
}
@SuppressWarnings("unchecked")
static void givenJobs(Client client, List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
doAnswer(invocationOnMock -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
}
private void givenDBQRequestsSucceed() {
givenDBQRequest(true);
}
@ -208,6 +219,6 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
}
).when(executor).execute(any());
return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool);
return new ExpiredResultsRemover(originSettingClient, "*", mock(AnomalyDetectionAuditor.class), threadPool);
}
}

View File

@ -7,6 +7,18 @@
"stability":"stable",
"url":{
"paths":[
{
"path":"/_ml/_delete_expired_data/{job_id}",
"methods":[
"DELETE"
],
"parts":{
"job_id":{
"type":"string",
"description":"The ID of the job(s) to perform expired data hygiene for"
}
}
},
{
"path":"/_ml/_delete_expired_data",
"methods":[
@ -15,6 +27,18 @@
}
]
},
"params":{
"requests_per_second":{
"type":"number",
"required":false,
"description":"The desired requests per second for the deletion processes."
},
"timeout":{
"type":"time",
"required":false,
"description":"How long can the underlying delete processes run until they are canceled"
}
},
"body":{
"description":"deleting expired data parameters"
}

View File

@ -3,12 +3,11 @@ setup:
features: headers
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
ml.put_job:
job_id: delete-expired-data
job_id: delete-expired-data-a
body: >
{
"job_id": "delete-expired-data",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span" : "1h",
@ -18,7 +17,30 @@ setup:
"field_delimiter":",",
"time_field":"time",
"time_format":"yyyy-MM-dd HH:mm:ssX"
}
},
"results_retention_days" : 1,
"model_snapshot_retention_days" : 1
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
ml.put_job:
job_id: delete-expired-data-b
body: >
{
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span" : "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"field_delimiter":",",
"time_field":"time",
"time_format":"yyyy-MM-dd HH:mm:ssX"
},
"results_retention_days" : 1,
"model_snapshot_retention_days" : 1
}
---
@ -34,3 +56,103 @@ setup:
body: >
{ "timeout": "10h", "requests_per_second": 100000.0 }
- match: { deleted: true}
---
"Test delete expired data with path parameters":
- do:
ml.delete_expired_data:
timeout: "10h"
requests_per_second: 100000.0
- match: { deleted: true}
---
"Test delete expired data with job id":
- do:
headers:
Content-Type: application/json
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
index:
index: .ml-anomalies-shared
id: "delete-expired-data-a_model_snapshot_inactive-snapshot"
body: >
{
"job_id": "delete-expired-data-a",
"timestamp": "2020-05-01T00:00:00Z",
"snapshot_id": "inactive-snapshot",
"description": "first",
"latest_record_time_stamp": "2020-05-01T00:00:00Z",
"latest_result_time_stamp": "2020-05-01T00:00:00Z",
"snapshot_doc_count": 1
}
- do:
headers:
Content-Type: application/json
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
index:
index: .ml-anomalies-shared
id: "delete-expired-data-a_model_snapshot_active-snapshot"
body: >
{
"job_id": "delete-expired-data-a",
"timestamp": "2020-05-10T00:00:00Z",
"snapshot_id": "active-snapshot",
"description": "second",
"latest_record_time_stamp": "2020-05-10T00:00:00Z",
"latest_result_time_stamp": "2020-05-10T00:00:00Z",
"snapshot_doc_count": 1,
"model_size_stats": {
"job_id" : "delete-expired-data-a",
"result_type" : "model_size_stats",
"model_bytes" : 0
},
"quantiles": {
"job_id": "delete-expired-data-a",
"timestamp": 1,
"quantile_state": "quantiles-1"
}
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
indices.refresh:
index: [.ml-anomalies-shared]
- do:
ml.get_model_snapshots:
job_id: delete-expired-data-a
- match: { count: 2 }
# make the above document the current snapshot
- do:
ml.revert_model_snapshot:
job_id: delete-expired-data-a
snapshot_id: active-snapshot
- do:
ml.get_model_snapshots:
job_id: delete-expired-data-a
- match: { count: 2 }
- do:
ml.delete_expired_data:
job_id: delete-expired-data-b
- do:
ml.get_model_snapshots:
job_id: delete-expired-data-a
- match: { count: 2 }
- do:
ml.delete_expired_data:
job_id: delete-expired-data-a
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
indices.refresh: {}
- do:
ml.get_model_snapshots:
job_id: delete-expired-data-a
- match: { count: 1 }