[ML] Throttle the delete-by-query of expired results (#47177)

Due to #47003 many clusters will have built up a
large backlog of expired results. On upgrading to
a version where that bug is fixed users could find
that the first ML daily maintenance task deletes
a very large amount of documents.

This change introduces throttling to the
delete-by-query that the ML daily maintenance uses
to delete expired results to limit it to deleting an
average 200 documents per second. (There is no
throttling for state/forecast documents as these
are expected to be lower volume.)

Additionally a rough time limit of 8 hours is applied
to the whole delete expired data action. (This is only
rough as it won't stop part way through a single
operation - it only checks the timeout between
operations.)

Relates #47103
This commit is contained in:
David Roberts 2019-10-02 08:58:56 +01:00
parent 42c5054e52
commit 4379a3c52b
14 changed files with 316 additions and 50 deletions

View File

@ -65,6 +65,10 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
deleted = in.readBoolean();
}
public boolean isDeleted() {
return deleted;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(deleted);

View File

@ -116,7 +116,13 @@ public class MlDailyMaintenanceService implements Releasable {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}

View File

@ -26,34 +26,54 @@ import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {
// TODO: make configurable in the request
static final Duration MAX_DURATION = Duration.ofHours(8);
private final ThreadPool threadPool;
private final String executor;
private final Client client;
private final ClusterService clusterService;
private final Clock clock;
@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new);
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
Clock.systemUTC());
}
TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}
@Override
protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION);
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier));
}
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
@ -62,25 +82,32 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);
}
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier,
boolean haveAllPreviousDeletionsCompleted) {
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse),
listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread
// the previous action returned in which in the case of a transport_client_boss
// thread is a disaster.
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
false));
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
isTimedOutSupplier);
} else {
logger.info("Completed deletion of expired data");
listener.onResponse(new DeleteExpiredDataAction.Response(true));
if (haveAllPreviousDeletionsCompleted) {
logger.info("Completed deletion of expired ML data");
} else {
logger.info("Halted deletion of expired ML data until next invocation");
}
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -79,6 +80,9 @@ public class JobDataDeleter {
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
try {
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
} catch (Exception e) {
@ -101,6 +105,10 @@ public class JobDataDeleter {
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
@ -116,6 +124,9 @@ public class JobDataDeleter {
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
} catch (Exception e) {
@ -134,6 +145,9 @@ public class JobDataDeleter {
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}

View File

@ -22,6 +22,7 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@ -40,11 +41,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), listener, isTimedOutSupplier);
}
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
@ -56,13 +58,19 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
return;
}
if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
}
private WrappedBatchedJobsIterator newJobIterator() {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -44,6 +45,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
@ -71,10 +73,10 @@ public class ExpiredForecastsRemover implements MlDataRemover {
}
@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener),
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
SearchSourceBuilder source = new SearchSourceBuilder();
@ -84,13 +86,16 @@ public class ExpiredForecastsRemover implements MlDataRemover {
source.size(MAX_FORECASTS);
source.trackTotalHits(true);
// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
@ -99,6 +104,11 @@ public class ExpiredForecastsRemover implements MlDataRemover {
return;
}
if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
@ -157,6 +167,10 @@ public class ExpiredForecastsRemover implements MlDataRemover {
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
return request;
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -88,7 +89,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));

View File

@ -19,6 +19,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
@ -88,6 +89,11 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
// Delete the documents gradually.
// With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes.
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE);
request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5);
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
@ -95,6 +101,9 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
request.setQuery(query);
// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
return request;
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import java.util.function.Supplier;
public interface MlDataRemover {
void remove(ActionListener<Boolean> listener);
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
@ -32,6 +33,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* If for any reason a job is deleted by some of its state documents
@ -51,13 +53,17 @@ public class UnusedStateRemover implements MlDataRemover {
}
@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
try {
List<String> unusedStateDocIds = findUnusedStateDocIds();
if (unusedStateDocIds.size() > 0) {
executeDeleteUnusedStateDocs(unusedStateDocIds, listener);
if (isTimedOutSupplier.get()) {
listener.onResponse(false);
} else {
listener.onResponse(true);
if (unusedStateDocIds.size() > 0) {
executeDeleteUnusedStateDocs(unusedStateDocIds, listener);
} else {
listener.onResponse(true);
}
}
} catch (Exception e) {
listener.onFailure(e);
@ -106,6 +112,10 @@ public class UnusedStateRemover implements MlDataRemover {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));
// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) {

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.junit.After;
import org.junit.Before;
import java.time.Clock;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportDeleteExpiredDataActionTests extends ESTestCase {
private ThreadPool threadPool;
private TransportDeleteExpiredDataAction transportDeleteExpiredDataAction;
/**
* A data remover that only checks for timeouts.
*/
private static class DummyDataRemover implements MlDataRemover {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
listener.onResponse(isTimedOutSupplier.get() == false);
}
}
@Before
public void setup() {
threadPool = new TestThreadPool("TransportDeleteExpiredDataActionTests thread pool");
TransportService transportService = mock(TransportService.class);
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService,
new ActionFilters(Collections.emptySet()), client, clusterService, Clock.systemUTC());
}
@After
public void teardown() {
threadPool.shutdown();
}
public void testDeleteExpiredDataIterationNoTimeout() {
final int numRemovers = randomIntBetween(2, 5);
List<MlDataRemover> removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList());
AtomicBoolean succeeded = new AtomicBoolean();
ActionListener<DeleteExpiredDataAction.Response> finalListener = ActionListener.wrap(
response -> succeeded.set(response.isDeleted()),
e -> fail(e.getMessage())
);
Supplier<Boolean> isTimedOutSupplier = () -> false;
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true);
assertTrue(succeeded.get());
}
public void testDeleteExpiredDataIterationWithTimeout() {
final int numRemovers = randomIntBetween(2, 5);
AtomicInteger removersRemaining = new AtomicInteger(randomIntBetween(0, numRemovers - 1));
List<MlDataRemover> removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList());
AtomicBoolean succeeded = new AtomicBoolean();
ActionListener<DeleteExpiredDataAction.Response> finalListener = ActionListener.wrap(
response -> succeeded.set(response.isDeleted()),
e -> fail(e.getMessage())
);
Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true);
assertFalse(succeeded.get());
}
}

View File

@ -86,25 +86,24 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
return searchResponse;
}
@SuppressWarnings("unchecked")
public void testRemoveGivenNoJobs() throws IOException {
SearchResponse response = createSearchResponse(Collections.emptyList());
@SuppressWarnings("unchecked")
ActionFuture<SearchResponse> future = mock(ActionFuture.class);
when(future.actionGet()).thenReturn(response);
when(client.search(any())).thenReturn(future);
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client);
remover.remove(listener);
remover.remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
assertEquals(remover.getRetentionDaysCallCount, 0);
assertEquals(0, remover.getRetentionDaysCallCount);
}
@SuppressWarnings("unchecked")
public void testRemoveGivenMulipleBatches() throws IOException {
public void testRemoveGivenMultipleBatches() throws IOException {
// This is testing AbstractExpiredJobDataRemover.WrappedBatchedJobsIterator
int totalHits = 7;
List<SearchResponse> responses = new ArrayList<>();
@ -127,18 +126,45 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
AtomicInteger searchCount = new AtomicInteger(0);
@SuppressWarnings("unchecked")
ActionFuture<SearchResponse> future = mock(ActionFuture.class);
doAnswer(invocationOnMock -> responses.get(searchCount.getAndIncrement())).when(future).actionGet();
when(client.search(any())).thenReturn(future);
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client);
remover.remove(listener);
remover.remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
assertEquals(searchCount.get(), 3);
assertEquals(remover.getRetentionDaysCallCount, 7);
assertEquals(3, searchCount.get());
assertEquals(7, remover.getRetentionDaysCallCount);
}
public void testRemoveGivenTimeOut() throws IOException {
int totalHits = 3;
SearchResponse response = createSearchResponse(Arrays.asList(
JobTests.buildJobBuilder("job1").build(),
JobTests.buildJobBuilder("job2").build(),
JobTests.buildJobBuilder("job3").build()
), totalHits);
final int timeoutAfter = randomIntBetween(0, totalHits - 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
@SuppressWarnings("unchecked")
ActionFuture<SearchResponse> future = mock(ActionFuture.class);
when(future.actionGet()).thenReturn(response);
when(client.search(any())).thenReturn(future);
TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client);
remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0));
listener.waitToCompletion();
assertThat(listener.success, is(false));
assertEquals(timeoutAfter, remover.getRetentionDaysCallCount);
}
static class TestListener implements ActionListener<Boolean> {
@ -157,7 +183,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
latch.countDown();
}
public void waitToCompletion() {
void waitToCompletion() {
try {
latch.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {

View File

@ -31,7 +31,9 @@ import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener;
import static org.hamcrest.Matchers.equalTo;
@ -69,7 +71,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
}
@After
public void shutdownThreadPool() throws InterruptedException {
public void shutdownThreadPool() {
terminate(threadPool);
}
@ -80,7 +82,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("bar").build()
));
createExpiredModelSnapshotsRemover().remove(listener);
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
@ -90,9 +92,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
givenJobs(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
createExpiredModelSnapshotsRemover().remove(listener);
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
@ -110,11 +112,11 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(listener);
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(true));
@ -137,6 +139,28 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
}
public void testRemove_GivenTimeout() throws IOException {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0));
listener.waitToCompletion();
assertThat(listener.success, is(false));
}
public void testRemove_GivenClientSearchRequestsFail() throws IOException {
givenClientSearchRequestsFail();
givenJobs(Arrays.asList(
@ -147,11 +171,11 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(listener);
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(false));
@ -173,11 +197,11 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().remove(listener);
createExpiredModelSnapshotsRemover().remove(listener, () -> false);
listener.waitToCompletion();
assertThat(listener.success, is(false));

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
@ -70,7 +71,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
givenClientRequestsSucceed();
givenJobs(Collections.emptyList());
createExpiredResultsRemover().remove(listener);
createExpiredResultsRemover().remove(listener, () -> false);
verify(listener).onResponse(true);
verify(client).search(any());
@ -84,7 +85,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("bar").build()
));
createExpiredResultsRemover().remove(listener);
createExpiredResultsRemover().remove(listener, () -> false);
verify(listener).onResponse(true);
verify(client).search(any());
@ -99,7 +100,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().remove(listener);
createExpiredResultsRemover().remove(listener, () -> false);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
@ -109,6 +110,22 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
verify(listener).onResponse(true);
}
public void testRemove_GivenTimeout() throws Exception {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0));
assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter));
verify(listener).onResponse(false);
}
public void testRemove_GivenClientRequestsFailed() throws IOException {
givenClientRequestsFailed();
givenJobs(Arrays.asList(
@ -117,7 +134,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().remove(listener);
createExpiredResultsRemover().remove(listener, () -> false);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(1));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);