[ML] Add end-point for deleting expired results (elastic/x-pack-elasticsearch#670)

This commit adds an end-point to force deletion of expired data:

DELETE /_xpack/ml/_delete_expired_data

A few other things are changed too:

- Delete expired results from now rather than start of day
- Rename MlDaily{Management -> Maintenance}Service
- Refresh job indices when job is closing to ensure latest result
  visibility
- Commit results when quantiles are persisted to ensure they are visible
  for renormalization


Original commit: elastic/x-pack-elasticsearch@8ca5272a94
This commit is contained in:
Dimitris Athanasiou 2017-03-03 11:33:22 +00:00 committed by GitHub
parent 440d005b1a
commit a7a36245c9
19 changed files with 618 additions and 137 deletions

View File

@ -40,6 +40,7 @@ import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.CloseJobService; import org.elasticsearch.xpack.ml.action.CloseJobService;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction;
@ -90,6 +91,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcess
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction;
@ -308,8 +310,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
jobProvider, jobProvider,
jobManager, jobManager,
dataProcessor, dataProcessor,
new MlInitializationService(settings, threadPool, clusterService, client, auditor),
new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool), new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool),
new MlInitializationService(settings, threadPool, clusterService, client),
jobDataCountsPersister, jobDataCountsPersister,
datafeedJobRunner, datafeedJobRunner,
persistentActionService, persistentActionService,
@ -367,7 +369,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
new RestPreviewDatafeedAction(settings, restController), new RestPreviewDatafeedAction(settings, restController),
new RestStartDatafeedAction(settings, restController), new RestStartDatafeedAction(settings, restController),
new RestStopDatafeedAction(settings, restController), new RestStopDatafeedAction(settings, restController),
new RestDeleteModelSnapshotAction(settings, restController) new RestDeleteModelSnapshotAction(settings, restController),
new RestDeleteExpiredDataAction(settings, restController)
); );
} }
@ -413,7 +416,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class), new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class),
new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class) new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class),
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class)
); );
} }

View File

@ -6,29 +6,30 @@
package org.elasticsearch.xpack.ml; package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology; import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* A service that runs once a day and triggers management tasks. * A service that runs once a day and triggers maintenance tasks.
*/ */
public class MlDailyManagementService implements Releasable { public class MlDailyMaintenanceService implements Releasable {
private static final Logger LOGGER = Loggers.getLogger(MlDailyManagementService.class); private static final Logger LOGGER = Loggers.getLogger(MlDailyMaintenanceService.class);
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Client client;
/** /**
* An interface to abstract the calculation of the delay to the next execution. * An interface to abstract the calculation of the delay to the next execution.
@ -36,17 +37,16 @@ public class MlDailyManagementService implements Releasable {
*/ */
private final Supplier<TimeValue> schedulerProvider; private final Supplier<TimeValue> schedulerProvider;
private final List<Listener> listeners;
private volatile ScheduledFuture<?> future; private volatile ScheduledFuture<?> future;
MlDailyManagementService(ThreadPool threadPool, List<Listener> listeners, Supplier<TimeValue> scheduleProvider) { MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier<TimeValue> scheduleProvider) {
this.threadPool = Objects.requireNonNull(threadPool); this.threadPool = Objects.requireNonNull(threadPool);
this.listeners = new ArrayList<>(listeners); this.client = Objects.requireNonNull(client);
this.schedulerProvider = Objects.requireNonNull(scheduleProvider); this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
} }
public MlDailyManagementService(ThreadPool threadPool, List<Listener> listeners) { public MlDailyMaintenanceService(ThreadPool threadPool, Client client) {
this(threadPool, listeners, createAfterMidnightScheduleProvider()); this(threadPool, client, createAfterMidnightScheduleProvider());
} }
private static Supplier<TimeValue> createAfterMidnightScheduleProvider() { private static Supplier<TimeValue> createAfterMidnightScheduleProvider() {
@ -58,12 +58,12 @@ public class MlDailyManagementService implements Releasable {
} }
public void start() { public void start() {
LOGGER.debug("Starting ML daily management service"); LOGGER.debug("Starting ML daily maintenance service");
scheduleNext(); scheduleNext();
} }
public void stop() { public void stop() {
LOGGER.debug("Stopping ML daily management service"); LOGGER.debug("Stopping ML daily maintenance service");
if (future != null && future.isCancelled() == false) { if (future != null && future.isCancelled() == false) {
FutureUtils.cancel(future); FutureUtils.cancel(future);
} }
@ -80,25 +80,23 @@ public class MlDailyManagementService implements Releasable {
private void scheduleNext() { private void scheduleNext() {
try { try {
future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, () -> triggerListeners()); future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, this::triggerTasks);
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) { if (e.isExecutorShutdown()) {
LOGGER.debug("failed to schedule next management task; shutting down", e); LOGGER.debug("failed to schedule next maintenance task; shutting down", e);
} else { } else {
throw e; throw e;
} }
} }
} }
private void triggerListeners() { private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] management tasks"); LOGGER.info("triggering scheduled [ML] maintenance tasks");
for (Listener listener : listeners) { try {
listener.onTrigger(); client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request());
} catch (Exception e) {
LOGGER.error("An error occurred during maintenance tasks execution", e);
} }
scheduleNext(); scheduleNext();
} }
public interface Listener {
void onTrigger();
}
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.ml; package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -17,30 +16,24 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class MlInitializationService extends AbstractComponent implements ClusterStateListener { class MlInitializationService extends AbstractComponent implements ClusterStateListener {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final Client client; private final Client client;
private final Auditor auditor;
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
private volatile MlDailyManagementService mlDailyManagementService;
public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, private volatile MlDailyMaintenanceService mlDailyMaintenanceService;
Auditor auditor) {
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.client = client; this.client = client;
this.auditor = auditor;
clusterService.addListener(this); clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
@ -55,9 +48,9 @@ public class MlInitializationService extends AbstractComponent implements Cluste
if (event.localNodeMaster()) { if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData(); MetaData metaData = event.state().metaData();
installMlMetadata(metaData); installMlMetadata(metaData);
installDailyManagementService(); installDailyMaintenanceService();
} else { } else {
uninstallDailyManagementService(); uninstallDailyMaintenanceService();
} }
} }
@ -87,37 +80,34 @@ public class MlInitializationService extends AbstractComponent implements Cluste
} }
} }
private void installDailyManagementService() { private void installDailyMaintenanceService() {
if (mlDailyManagementService == null) { if (mlDailyMaintenanceService == null) {
mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList((MlDailyManagementService.Listener) mlDailyMaintenanceService = new MlDailyMaintenanceService(threadPool, client);
new ExpiredResultsRemover(client, clusterService, auditor), mlDailyMaintenanceService.start();
new ExpiredModelSnapshotsRemover(client, clusterService)
));
mlDailyManagementService.start();
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
public void beforeStop() { public void beforeStop() {
uninstallDailyManagementService(); uninstallDailyMaintenanceService();
} }
}); });
} }
} }
private void uninstallDailyManagementService() { private void uninstallDailyMaintenanceService() {
if (mlDailyManagementService != null) { if (mlDailyMaintenanceService != null) {
mlDailyManagementService.stop(); mlDailyMaintenanceService.stop();
mlDailyManagementService = null; mlDailyMaintenanceService = null;
} }
} }
/** For testing */ /** For testing */
MlDailyManagementService getDailyManagementService() { MlDailyMaintenanceService getDailyMaintenanceService() {
return mlDailyManagementService; return mlDailyMaintenanceService;
} }
/** For testing */ /** For testing */
void setDailyManagementService(MlDailyManagementService service) { void setDailyMaintenanceService(MlDailyMaintenanceService service) {
mlDailyManagementService = service; mlDailyMaintenanceService = service;
} }
} }

View File

@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
import java.util.Objects;
public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Request, DeleteExpiredDataAction.Response,
DeleteExpiredDataAction.RequestBuilder> {
public static final DeleteExpiredDataAction INSTANCE = new DeleteExpiredDataAction();
public static final String NAME = "cluster:admin/ml/delete_expired_data";
private DeleteExpiredDataAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
public Request() {}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private static final ParseField DELETED = new ParseField("deleted");
private boolean deleted;
public Response(boolean deleted) {
this.deleted = deleted;
}
Response() {}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
deleted = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(deleted);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DELETED.getPreferredName(), deleted);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(deleted, response.deleted);
}
@Override
public int hashCode() {
return Objects.hash(deleted);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ClusterService clusterService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
ClusterService clusterService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
}
private void deleteExpiredData(ActionListener<Response> listener) {
Auditor auditor = new Auditor(client, clusterService);
ExpiredResultsRemover resultsRemover = new ExpiredResultsRemover(client, clusterService, auditor);
resultsRemover.trigger(() -> {
ExpiredModelSnapshotsRemover modelSnapshotsRemover = new ExpiredModelSnapshotsRemover(client, clusterService);
modelSnapshotsRemover.trigger(() -> {
logger.debug("Finished deleting expired data");
listener.onResponse(new Response(true));
});
});
}
}
}

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.JobUpdate;
@ -98,9 +99,16 @@ public class AutoDetectResultProcessor {
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e);
} finally { } finally {
waitUntilRenormalizerIsIdle(); try {
flushListener.clear(); waitUntilRenormalizerIsIdle();
completionLatch.countDown(); persister.commitResultWrites(jobId);
persister.commitStateWrites(jobId);
} catch (IndexNotFoundException e) {
LOGGER.error("[{}] Error while closing: no such index [{}]", jobId, e.getIndex().getName());
} finally {
flushListener.clear();
completionLatch.countDown();
}
} }
} }
@ -157,6 +165,9 @@ public class AutoDetectResultProcessor {
Quantiles quantiles = result.getQuantiles(); Quantiles quantiles = result.getQuantiles();
if (quantiles != null) { if (quantiles != null) {
persister.persistQuantiles(quantiles); persister.persistQuantiles(quantiles);
// We need to make all results written up to these quantiles available for renormalization
context.bulkResultsPersister.executeRequest();
persister.commitResultWrites(context.jobId);
LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", context.jobId); LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", context.jobId);
renormalizer.renormalize(quantiles); renormalizer.renormalize(quantiles);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.ml.MlDailyManagementService;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
@ -18,7 +17,6 @@ import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology; import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -31,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* blocking the thread it was called at for too long. It does so by * blocking the thread it was called at for too long. It does so by
* chaining the steps together. * chaining the steps together.
*/ */
abstract class AbstractExpiredJobDataRemover implements MlDailyManagementService.Listener { abstract class AbstractExpiredJobDataRemover {
private final ClusterService clusterService; private final ClusterService clusterService;
@ -39,9 +37,23 @@ abstract class AbstractExpiredJobDataRemover implements MlDailyManagementService
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);
} }
@Override public void trigger(Runnable onFinish) {
public void onTrigger() { removeData(newJobIterator(), onFinish);
removeData(newJobIterator()); }
private void removeData(Iterator<Job> jobIterator, Runnable onFinish) {
if (jobIterator.hasNext() == false) {
onFinish.run();
return;
}
Job job = jobIterator.next();
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, () -> removeData(jobIterator, onFinish));
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator, onFinish));
} }
private Iterator<Job> newJobIterator() { private Iterator<Job> newJobIterator() {
@ -58,23 +70,9 @@ abstract class AbstractExpiredJobDataRemover implements MlDailyManagementService
return new VolatileCursorIterator<T>(items); return new VolatileCursorIterator<T>(items);
} }
protected void removeData(Iterator<Job> jobIterator) {
if (jobIterator.hasNext() == false) {
return;
}
Job job = jobIterator.next();
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator));
}
private long calcCutoffEpochMs(long retentionDays) { private long calcCutoffEpochMs(long retentionDays) {
long startOfDayEpochMs = DateTime.now(ISOChronology.getInstance()).withTimeAtStartOfDay().getMillis(); long nowEpochMs = DateTime.now(ISOChronology.getInstance()).getMillis();
return startOfDayEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
} }
protected abstract Long getRetentionDays(Job job); protected abstract Long getRetentionDays(Job job);

View File

@ -58,8 +58,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
@Override @Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
LOGGER.info("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); LOGGER.info("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE); DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, Result.TYPE.getPreferredName(), cutoffEpochMs, excludeFilter);
client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() { client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override @Override
@ -80,7 +79,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}); });
} }
private DeleteByQueryRequest createDBQRequest(Job job, String type, long cutoffEpochMs, QueryBuilder excludeFilter) { private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest // We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter // because the constructor of the former wipes the latter
@ -88,7 +87,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
request.setSlices(5); request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
searchRequest.types(type); searchRequest.types(Result.TYPE.getPreferredName());
QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query)); searchRequest.source(new SearchSourceBuilder().query(query));
return request; return request;

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import java.io.IOException;
public class RestDeleteExpiredDataAction extends BaseRestHandler {
public RestDeleteExpiredDataAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.DELETE, MachineLearning.BASE_PATH + "_delete_expired_data", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request();
return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -142,7 +142,6 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase {
AnomalyDetectorsIndex.jobResultsIndexPrefix()).version(Version.CURRENT.id).build()) AnomalyDetectorsIndex.jobResultsIndexPrefix()).version(Version.CURRENT.id).build())
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build(); .build();
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(threadPool, times(0)).executor(anyString()); verify(threadPool, times(0)).executor(anyString());

View File

@ -5,28 +5,32 @@
*/ */
package org.elasticsearch.xpack.ml; package org.elasticsearch.xpack.ml;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class MlDailyManagementServiceTests extends ESTestCase { public class MlDailyManagementServiceTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
private Client client;
@Before @Before
public void setUpTests() { public void setUpTests() {
threadPool = new TestThreadPool("MlDailyManagementServiceTests"); threadPool = new TestThreadPool("MlDailyManagementServiceTests");
client = mock(Client.class);
} }
@After @After
@ -35,21 +39,18 @@ public class MlDailyManagementServiceTests extends ESTestCase {
} }
public void testScheduledTriggering() throws InterruptedException { public void testScheduledTriggering() throws InterruptedException {
MlDailyManagementService.Listener listener1 = mock(MlDailyManagementService.Listener.class);
MlDailyManagementService.Listener listener2 = mock(MlDailyManagementService.Listener.class);
int triggerCount = randomIntBetween(2, 4); int triggerCount = randomIntBetween(2, 4);
CountDownLatch latch = new CountDownLatch(triggerCount); CountDownLatch latch = new CountDownLatch(triggerCount);
try (MlDailyManagementService service = createService(latch, Arrays.asList(listener1, listener2))) { try (MlDailyMaintenanceService service = createService(latch, client)) {
service.start(); service.start();
latch.await(1, TimeUnit.SECONDS); latch.await(1, TimeUnit.SECONDS);
} }
verify(listener1, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger(); verify(client, org.mockito.Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any());
verify(listener2, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger();
} }
private MlDailyManagementService createService(CountDownLatch latch, List<MlDailyManagementService.Listener> listeners) { private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {
return new MlDailyManagementService(threadPool, listeners, () -> { return new MlDailyMaintenanceService(threadPool, client, () -> {
latch.countDown(); latch.countDown();
return TimeValue.timeValueMillis(100); return TimeValue.timeValueMillis(100);
}); });

View File

@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before; import org.junit.Before;
import java.net.InetAddress; import java.net.InetAddress;
@ -41,7 +40,6 @@ public class MlInitializationServiceTests extends ESTestCase {
private ExecutorService executorService; private ExecutorService executorService;
private ClusterService clusterService; private ClusterService clusterService;
private Client client; private Client client;
private Auditor auditor;
@Before @Before
public void setUpMocks() { public void setUpMocks() {
@ -49,7 +47,6 @@ public class MlInitializationServiceTests extends ESTestCase {
executorService = mock(ExecutorService.class); executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class); clusterService = mock(ClusterService.class);
client = mock(Client.class); client = mock(Client.class);
auditor = mock(Auditor.class);
doAnswer(invocation -> { doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run(); ((Runnable) invocation.getArguments()[0]).run();
@ -62,8 +59,7 @@ public class MlInitializationServiceTests extends ESTestCase {
} }
public void testInitialize() throws Exception { public void testInitialize() throws Exception {
MlInitializationService initializationService = MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -75,12 +71,11 @@ public class MlInitializationServiceTests extends ESTestCase {
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true));
} }
public void testInitialize_noMasterNode() throws Exception { public void testInitialize_noMasterNode() throws Exception {
MlInitializationService initializationService = MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -90,13 +85,12 @@ public class MlInitializationServiceTests extends ESTestCase {
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyManagementService(), is(nullValue())); assertThat(initializationService.getDailyMaintenanceService(), is(nullValue()));
} }
public void testInitialize_alreadyInitialized() throws Exception { public void testInitialize_alreadyInitialized() throws Exception {
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
MlInitializationService initializationService = MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -106,18 +100,17 @@ public class MlInitializationServiceTests extends ESTestCase {
.metaData(MetaData.builder() .metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build(); .build();
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyManagementService(initialDailyManagementService); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertSame(initialDailyManagementService, initializationService.getDailyManagementService()); assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService());
} }
public void testInitialize_onlyOnce() throws Exception { public void testInitialize_onlyOnce() throws Exception {
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
MlInitializationService initializationService = MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -133,10 +126,9 @@ public class MlInitializationServiceTests extends ESTestCase {
} }
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
MlInitializationService initializationService = MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
initializationService.setDailyManagementService(initialDailyManagementService);
ClusterState masterCs = ClusterState.builder(new ClusterName("_name")) ClusterState masterCs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -152,11 +144,11 @@ public class MlInitializationServiceTests extends ESTestCase {
.build(); .build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", noMasterCs, masterCs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", noMasterCs, masterCs));
verify(initialDailyManagementService).stop(); verify(initialDailyMaintenanceService).stop();
initializationService.clusterChanged(new ClusterChangedEvent("_source", masterCs, noMasterCs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", masterCs, noMasterCs));
MlDailyManagementService finalDailyManagementService = initializationService.getDailyManagementService(); MlDailyMaintenanceService finalDailyMaintenanceService = initializationService.getDailyMaintenanceService();
assertNotSame(initialDailyManagementService, finalDailyManagementService); assertNotSame(initialDailyMaintenanceService, finalDailyMaintenanceService);
assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true));
} }
} }

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction.Response;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
public class DeleteExpiredDataActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -0,0 +1,254 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.security.Security;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class DeleteExpiredDataIT extends SecurityIntegTestCase {
private static final String DATA_INDEX = "delete-expired-data-test-data";
private static final String DATA_TYPE = "my_type";
private List<Job> jobs;
@Override
protected Settings externalClusterClientSettings() {
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME4);
builder.put(Security.USER_SETTING.getKey(), "elastic:changeme");
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
return builder.build();
}
@Before
public void setUpData() throws IOException {
jobs = new ArrayList<>();
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(DATA_TYPE, "time", "type=date,format=epoch_millis")
.get();
// We are going to create data for last 2 days
long nowMillis = System.currentTimeMillis();
int totalBuckets = 3 * 24;
int normalRate = 10;
int anomalousRate = 100;
int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
indexRequest.source("time", timestamp);
bulkRequestBuilder.add(indexRequest);
}
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertThat(bulkResponse.hasFailures(), is(false));
// Ensure all data is searchable
client().admin().indices().prepareRefresh(DATA_INDEX).get();
}
@After
public void tearDownData() throws Exception {
client().admin().indices().prepareDelete(DATA_INDEX).get();
for (Job job : jobs) {
DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(job.getId() + "-feed");
client().execute(DeleteDatafeedAction.INSTANCE, deleteDatafeedRequest).get();
DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(job.getId());
client().execute(DeleteJobAction.INSTANCE, deleteJobRequest).get();
}
}
public void testDeleteExpiredData() throws Exception {
jobs.add(newJobBuilder("no-retention").build());
jobs.add(newJobBuilder("results-retention").setResultsRetentionDays(1L).build());
jobs.add(newJobBuilder("snapshots-retention").setModelSnapshotRetentionDays(2L).build());
jobs.add(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L).build());
long now = System.currentTimeMillis();
for (Job job : jobs) {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
String datafeedId = job.getId() + "-feed";
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
datafeedConfig.setIndexes(Arrays.asList(DATA_INDEX));
datafeedConfig.setTypes(Arrays.asList(DATA_TYPE));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig.build());
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
// Run up to a day ago
openJob(job.getId());
startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis());
waitUntilJobIsClosed(job.getId());
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47)));
assertThat(getRecords(job.getId()).size(), equalTo(1));
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
assertThat(modelSnapshots.size(), equalTo(1));
String snapshotDocId = job.getId() + "-" + modelSnapshots.get(0).getSnapshotId();
// Update snapshot timestamp to force it out of snapshot retention window
String snapshotUpdate = "{ \"timestamp\": " + (now - TimeValue.timeValueHours(48).getMillis() - 1) + "}";
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), "model_snapshot", snapshotDocId);
updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
client().execute(UpdateAction.INSTANCE, updateSnapshotRequest).get();
}
// Refresh to ensure the snapshot timestamp updates are visible
client().admin().indices().prepareRefresh("*").get();
// We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds)
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
for (Job job : jobs) {
// Run up to now
openJob(job.getId());
startDatafeed(job.getId() + "-feed", 0, now);
waitUntilJobIsClosed(job.getId());
assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(70)));
assertThat(getRecords(job.getId()).size(), equalTo(1));
List<ModelSnapshot> modelSnapshots = getModelSnapshots(job.getId());
assertThat(modelSnapshots.size(), equalTo(2));
}
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();
// no-retention job should have kept all data
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));
assertThat(getRecords("no-retention").size(), equalTo(1));
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));
List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(getRecords("results-retention").size(), equalTo(0));
assertThat(getModelSnapshots("results-retention").size(), equalTo(2));
assertThat(getBuckets("snapshots-retention").size(), is(greaterThanOrEqualTo(70)));
assertThat(getRecords("snapshots-retention").size(), equalTo(1));
assertThat(getModelSnapshots("snapshots-retention").size(), equalTo(1));
buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1));
}
private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder();
detector.setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(3600L);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = new Job.Builder(id);
jobBuilder.setAnalysisConfig(analysisConfig);
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
return jobBuilder;
}
private void openJob(String jobId) throws Exception {
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId);
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
}
private void startDatafeed(String datafeedId, long start, long end) throws Exception {
StartDatafeedAction.Request startRequest = new StartDatafeedAction.Request(datafeedId, start);
startRequest.setEndTime(end);
client().execute(StartDatafeedAction.INSTANCE, startRequest).get();
}
private void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> {
try {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get();
assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private List<Bucket> getBuckets(String jobId) throws Exception {
GetBucketsAction.Request request = new GetBucketsAction.Request(jobId);
GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get();
return response.getBuckets().results();
}
private List<AnomalyRecord> getRecords(String jobId) throws Exception {
GetRecordsAction.Request request = new GetRecordsAction.Request(jobId);
GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).get();
return response.getRecords().results();
}
private List<ModelSnapshot> getModelSnapshots(String jobId) throws Exception {
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).get();
return response.getPage().results();
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
// this method in ESIntegTestCase is not plugin-friendly - it does not account for plugin NamedWritableRegistries
}
}

View File

@ -286,6 +286,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(context, result); processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles); verify(persister, times(1)).persistQuantiles(quantiles);
verify(bulkBuilder).executeRequest();
verify(persister).commitResultWrites(JOB_ID);
verify(renormalizer, times(1)).renormalize(quantiles); verify(renormalizer, times(1)).renormalize(quantiles);
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer); verifyNoMoreInteractions(renormalizer);

View File

@ -42,6 +42,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
@ -52,6 +53,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
private List<SearchRequest> capturedSearchRequests; private List<SearchRequest> capturedSearchRequests;
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests; private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
private List<SearchResponse> searchResponsesPerCall; private List<SearchResponse> searchResponsesPerCall;
private Runnable onFinish;
@Before @Before
public void setUpTests() { public void setUpTests() {
@ -62,30 +64,33 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
clusterState = mock(ClusterState.class); clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState); when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class); client = mock(Client.class);
onFinish = mock(Runnable.class);
} }
public void testOnTrigger_GivenJobsWithoutRetentionPolicy() { public void testTrigger_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build() JobTests.buildJobBuilder("bar").build()
)); ));
createExpiredModelSnapshotsRemover().onTrigger(); createExpiredModelSnapshotsRemover().trigger(onFinish);
verify(onFinish).run();
Mockito.verifyNoMoreInteractions(client); Mockito.verifyNoMoreInteractions(client);
} }
public void testOnTrigger_GivenJobWithoutActiveSnapshot() { public void testTrigger_GivenJobWithoutActiveSnapshot() {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
createExpiredModelSnapshotsRemover().onTrigger(); createExpiredModelSnapshotsRemover().trigger(onFinish);
verify(onFinish).run();
Mockito.verifyNoMoreInteractions(client); Mockito.verifyNoMoreInteractions(client);
} }
public void testOnTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException { public void testTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
@ -99,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger(); createExpiredModelSnapshotsRemover().trigger(onFinish);
assertThat(capturedSearchRequests.size(), equalTo(2)); assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(0);
@ -117,9 +122,11 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
} }
public void testOnTrigger_GivenClientSearchRequestsFail() throws IOException { public void testTrigger_GivenClientSearchRequestsFail() throws IOException {
givenClientSearchRequestsFail(); givenClientSearchRequestsFail();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
@ -133,7 +140,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger(); createExpiredModelSnapshotsRemover().trigger(onFinish);
assertThat(capturedSearchRequests.size(), equalTo(2)); assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(0);
@ -142,9 +149,11 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
verify(onFinish).run();
} }
public void testOnTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException { public void testTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException {
givenClientDeleteModelSnapshotRequestsFail(); givenClientDeleteModelSnapshotRequestsFail();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
@ -158,7 +167,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger(); createExpiredModelSnapshotsRemover().trigger(onFinish);
assertThat(capturedSearchRequests.size(), equalTo(2)); assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0); SearchRequest searchRequest = capturedSearchRequests.get(0);
@ -176,6 +185,8 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
} }
private void givenJobs(List<Job> jobs) { private void givenJobs(List<Job> jobs) {

View File

@ -37,6 +37,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ExpiredResultsRemoverTests extends ESTestCase { public class ExpiredResultsRemoverTests extends ESTestCase {
@ -45,6 +46,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
private ClusterService clusterService; private ClusterService clusterService;
private ClusterState clusterState; private ClusterState clusterState;
private List<DeleteByQueryRequest> capturedDeleteByQueryRequests; private List<DeleteByQueryRequest> capturedDeleteByQueryRequests;
private Runnable onFinish;
@Before @Before
public void setUpTests() { public void setUpTests() {
@ -63,30 +65,33 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
return null; return null;
} }
}).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any()); }).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any());
onFinish = mock(Runnable.class);
} }
public void testOnTrigger_GivenNoJobs() { public void testTrigger_GivenNoJobs() {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Collections.emptyList()); givenJobs(Collections.emptyList());
createExpiredResultsRemover().onTrigger(); createExpiredResultsRemover().trigger(onFinish);
verify(onFinish).run();
Mockito.verifyNoMoreInteractions(client); Mockito.verifyNoMoreInteractions(client);
} }
public void testOnTrigger_GivenJobsWithoutRetentionPolicy() { public void testTrigger_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build() JobTests.buildJobBuilder("bar").build()
)); ));
createExpiredResultsRemover().onTrigger(); createExpiredResultsRemover().trigger(onFinish);
verify(onFinish).run();
Mockito.verifyNoMoreInteractions(client); Mockito.verifyNoMoreInteractions(client);
} }
public void testOnTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { public void testTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException {
givenClientRequestsSucceed(); givenClientRequestsSucceed();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
@ -94,16 +99,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
)); ));
createExpiredResultsRemover().onTrigger(); createExpiredResultsRemover().trigger(onFinish);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1); dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
} }
public void testOnTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { public void testTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException {
givenClientRequestsFailed(); givenClientRequestsFailed();
givenJobs(Arrays.asList( givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(), JobTests.buildJobBuilder("none").build(),
@ -111,13 +117,14 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
)); ));
createExpiredResultsRemover().onTrigger(); createExpiredResultsRemover().trigger(onFinish);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1); dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
} }
private void givenClientRequestsSucceed() { private void givenClientRequestsSucceed() {

View File

@ -138,6 +138,7 @@ cluster:admin/ml/anomaly_detectors/open
cluster:admin/ml/job/update cluster:admin/ml/job/update
indices:data/write/delete/mlbyquery indices:data/write/delete/mlbyquery
cluster:admin/ml/job/update/process cluster:admin/ml/job/update/process
cluster:admin/ml/delete_expired_data
cluster:admin/persistent/create cluster:admin/persistent/create
cluster:admin/persistent/start cluster:admin/persistent/start
cluster:admin/persistent/completion cluster:admin/persistent/completion

View File

@ -0,0 +1,10 @@
{
"xpack.ml.delete_expired_data": {
"methods": [ "DELETE" ],
"url": {
"path": "/_xpack/ml/_delete_expired_data",
"paths": [ "/_xpack/ml/_delete_expired_data" ]
},
"body": null
}
}

View File

@ -6,12 +6,10 @@
package org.elasticsearch.xpack.ml.transforms; package org.elasticsearch.xpack.ml.transforms;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;