[ML] Remove data that expired their respective retention period (elastic/x-pack-elasticsearch#610)

This PR introduces a service component that is scheduled to run
once per day, around 30 minutes past midnight, local time.
Listeners can be subscribed to the service in order to get notified
when the next service trigger occurs so that they can execute
management tasks.

Currently, there are 2 such listeners: one is performing removal
of results and the other of model snapshots that have expired
their retention period.

Finally, note that the service is only installed on the master
node to ensure there is only one running on the cluster.

Original commit: elastic/x-pack-elasticsearch@c8edb6016a
This commit is contained in:
Dimitris Athanasiou 2017-02-23 11:06:14 +00:00 committed by GitHub
parent a03e9d36c2
commit d3d73ca095
32 changed files with 1154 additions and 133 deletions

View File

@ -291,7 +291,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
jobProvider,
jobManager,
dataProcessor,
new MlInitializationService(settings, threadPool, clusterService, jobProvider),
new MlInitializationService(settings, threadPool, clusterService, client, jobProvider),
jobDataCountsPersister,
datafeedJobRunner,
persistentActionService,

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
/**
* A service that runs once a day and triggers management tasks.
*/
public class MlDailyManagementService implements Releasable {
private static final Logger LOGGER = Loggers.getLogger(MlDailyManagementService.class);
private final ThreadPool threadPool;
/**
* An interface to abstract the calculation of the delay to the next execution.
* Needed to enable testing.
*/
private final Supplier<TimeValue> schedulerProvider;
private final List<Listener> listeners;
private volatile ScheduledFuture<?> future;
MlDailyManagementService(ThreadPool threadPool, List<Listener> listeners, Supplier<TimeValue> scheduleProvider) {
this.threadPool = Objects.requireNonNull(threadPool);
this.listeners = new ArrayList<>(listeners);
this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
}
public MlDailyManagementService(ThreadPool threadPool, List<Listener> listeners) {
this(threadPool, listeners, createAfterMidnightScheduleProvider());
}
private static Supplier<TimeValue> createAfterMidnightScheduleProvider() {
return () -> {
DateTime now = DateTime.now(ISOChronology.getInstance());
DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30);
return TimeValue.timeValueMillis(next.getMillis() - now.getMillis());
};
}
public void start() {
LOGGER.debug("Starting ML daily management service");
scheduleNext();
}
public void stop() {
LOGGER.debug("Stopping ML daily management service");
if (future != null && future.isCancelled() == false) {
FutureUtils.cancel(future);
}
}
public boolean isStarted() {
return future != null;
}
@Override
public void close() {
stop();
}
private void scheduleNext() {
try {
future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, () -> triggerListeners());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("failed to schedule next management task; shutting down", e);
} else {
throw e;
}
}
}
private void triggerListeners() {
LOGGER.info("triggering scheduled [ML] management tasks");
for (Listener listener : listeners) {
listener.onTrigger();
}
scheduleNext();
}
public interface Listener {
void onTrigger();
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
@ -14,19 +15,23 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
public class MlInitializationService extends AbstractComponent implements ClusterStateListener {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client;
private final JobProvider jobProvider;
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
@ -34,13 +39,22 @@ public class MlInitializationService extends AbstractComponent implements Cluste
private final AtomicBoolean createMlMetaIndexCheck = new AtomicBoolean(false);
private final AtomicBoolean createStateIndexCheck = new AtomicBoolean(false);
public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
JobProvider jobProvider) {
private volatile MlDailyManagementService mlDailyManagementService;
public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
JobProvider jobProvider) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.client = client;
this.jobProvider = jobProvider;
clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
super.beforeStop();
}
});
}
@Override
@ -51,6 +65,9 @@ public class MlInitializationService extends AbstractComponent implements Cluste
createMlAuditIndex(metaData);
createMlMetaIndex(metaData);
createStateIndex(metaData);
installDailyManagementService();
} else {
uninstallDailyManagementService();
}
}
@ -144,4 +161,37 @@ public class MlInitializationService extends AbstractComponent implements Cluste
}
}
}
private void installDailyManagementService() {
if (mlDailyManagementService == null) {
mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList(
new ExpiredResultsRemover(client, clusterService, jobId -> jobProvider.audit(jobId)),
new ExpiredModelSnapshotsRemover(client, clusterService)
));
mlDailyManagementService.start();
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
uninstallDailyManagementService();
}
});
}
}
private void uninstallDailyManagementService() {
if (mlDailyManagementService != null) {
mlDailyManagementService.stop();
mlDailyManagementService = null;
}
}
/** For testing */
MlDailyManagementService getDailyManagementService() {
return mlDailyManagementService;
}
/** For testing */
void setDailyManagementService(MlDailyManagementService service) {
mlDailyManagementService = service;
}
}

View File

@ -159,8 +159,8 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
}
if (deleteCandidates.isEmpty()) {
listener.onFailure(new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())));
listener.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT,
request.getSnapshotId(), request.getJobId())));
}
ModelSnapshot deleteCandidate = deleteCandidates.get(0);

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -73,7 +74,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
static {
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareString(Request::setTimestamp, Bucket.TIMESTAMP);
PARSER.declareString(Request::setTimestamp, Result.TIMESTAMP);
PARSER.declareString(Request::setPartitionValue, PARTITION_VALUE);
PARSER.declareBoolean(Request::setExpand, EXPAND);
PARSER.declareBoolean(Request::setIncludeInterim, INCLUDE_INTERIM);
@ -127,7 +128,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
+ ANOMALY_SCORE.getPreferredName() + ","
+ MAX_NORMALIZED_PROBABILITY.getPreferredName() + "]");
}
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Bucket.TIMESTAMP.getPreferredName());
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName());
}
public String getTimestamp() {
@ -262,7 +263,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (timestamp != null) {
builder.field(Bucket.TIMESTAMP.getPreferredName(), timestamp);
builder.field(Result.TIMESTAMP.getPreferredName(), timestamp);
}
builder.field(EXPAND.getPreferredName(), expand);
builder.field(INCLUDE_INTERIM.getPreferredName(), includeInterim);

View File

@ -275,8 +275,8 @@ UpdateModelSnapshotAction.RequestBuilder> {
getModelSnapshots(request.getJobId(), request.getSnapshotId(), null,
changeCandidates -> {
if (changeCandidates == null || changeCandidates.isEmpty()) {
errorHandler.accept(new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())));
errorHandler.accept(new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT,
request.getSnapshotId(), request.getJobId())));
} else {
handler.accept(changeCandidates);
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
@ -196,7 +197,7 @@ public class DatafeedJobRunner extends AbstractComponent {
private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortField(Result.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();

View File

@ -525,14 +525,16 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays();
this.resultsRetentionDays = job.getResultsRetentionDays();
this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays();
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexName();
this.deleted = job.isDeleted();
}
public void setId(String id) {
public Builder setId(String id) {
this.id = id;
return this;
}
public String getId() {
@ -543,19 +545,22 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return createTime;
}
public void setCustomSettings(Map<String, Object> customSettings) {
public Builder setCustomSettings(Map<String, Object> customSettings) {
this.customSettings = customSettings;
return this;
}
public void setDescription(String description) {
public Builder setDescription(String description) {
this.description = description;
return this;
}
public void setAnalysisConfig(AnalysisConfig.Builder configBuilder) {
public Builder setAnalysisConfig(AnalysisConfig.Builder configBuilder) {
analysisConfig = ExceptionsHelper.requireNonNull(configBuilder, ANALYSIS_CONFIG.getPreferredName()).build();
return this;
}
public void setAnalysisLimits(AnalysisLimits analysisLimits) {
public Builder setAnalysisLimits(AnalysisLimits analysisLimits) {
if (this.analysisLimits != null) {
long oldMemoryLimit = this.analysisLimits.getModelMemoryLimit();
long newMemoryLimit = analysisLimits.getModelMemoryLimit();
@ -566,58 +571,71 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
}
this.analysisLimits = analysisLimits;
return this;
}
public void setCreateTime(Date createTime) {
public Builder setCreateTime(Date createTime) {
this.createTime = createTime;
return this;
}
public void setFinishedTime(Date finishedTime) {
public Builder setFinishedTime(Date finishedTime) {
this.finishedTime = finishedTime;
return this;
}
/**
* Set the wall clock time of the last data upload
* @param lastDataTime Wall clock time
*/
public void setLastDataTime(Date lastDataTime) {
public Builder setLastDataTime(Date lastDataTime) {
this.lastDataTime = lastDataTime;
return this;
}
public void setDataDescription(DataDescription.Builder description) {
public Builder setDataDescription(DataDescription.Builder description) {
dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
return this;
}
public void setModelDebugConfig(ModelDebugConfig modelDebugConfig) {
public Builder setModelDebugConfig(ModelDebugConfig modelDebugConfig) {
this.modelDebugConfig = modelDebugConfig;
return this;
}
public void setBackgroundPersistInterval(Long backgroundPersistInterval) {
public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) {
this.backgroundPersistInterval = backgroundPersistInterval;
return this;
}
public void setRenormalizationWindowDays(Long renormalizationWindowDays) {
public Builder setRenormalizationWindowDays(Long renormalizationWindowDays) {
this.renormalizationWindowDays = renormalizationWindowDays;
return this;
}
public void setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) {
public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) {
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
return this;
}
public void setResultsRetentionDays(Long resultsRetentionDays) {
public Builder setResultsRetentionDays(Long resultsRetentionDays) {
this.resultsRetentionDays = resultsRetentionDays;
return this;
}
public void setModelSnapshotId(String modelSnapshotId) {
public Builder setModelSnapshotId(String modelSnapshotId) {
this.modelSnapshotId = modelSnapshotId;
return this;
}
public void setResultsIndexName(String resultsIndexName) {
public Builder setResultsIndexName(String resultsIndexName) {
this.resultsIndexName = resultsIndexName;
return this;
}
public void setDeleted(boolean deleted) {
public Builder setDeleted(boolean deleted) {
this.deleted = deleted;
return this;
}
public Job build() {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.ArrayDeque;
import java.util.Arrays;
@ -66,7 +67,7 @@ public abstract class BatchedDocumentsIterator<T> {
* @return the iterator itself
*/
public BatchedDocumentsIterator<T> timeRange(long startEpochMs, long endEpochMs) {
filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs);
filterBuilder.timeRange(Result.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs);
return this;
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.Objects;
@ -138,7 +139,7 @@ public final class BucketsQueryBuilder {
private String end;
private String timestamp;
private String partitionValue = null;
private String sortField = Bucket.TIMESTAMP.getPreferredName();
private String sortField = Result.TIMESTAMP.getPreferredName();
private boolean sortDescending = false;
public int getFrom() {

View File

@ -6,15 +6,12 @@
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyCause;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
@ -25,7 +22,10 @@ import org.elasticsearch.xpack.ml.job.results.Influence;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import java.io.IOException;
import java.util.Collection;
@ -137,7 +137,7 @@ public class ElasticsearchMappings {
.field(TYPE, KEYWORD)
.field(COPY_TO, ALL_FIELD_VALUES)
.endObject()
.startObject(Bucket.TIMESTAMP.getPreferredName())
.startObject(Result.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(Bucket.ANOMALY_SCORE.getPreferredName())
@ -212,7 +212,7 @@ public class ElasticsearchMappings {
.startObject(BucketInfluencer.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(BucketInfluencer.TIMESTAMP.getPreferredName())
.startObject(Result.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(BucketInfluencer.BUCKET_SPAN.getPreferredName())

View File

@ -69,7 +69,7 @@ public class JobDataDeleter {
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName());
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
timeRange.lt(new Date().getTime());

View File

@ -509,9 +509,9 @@ public class JobProvider {
throws ResourceNotFoundException {
ResultsFilterBuilder rfb = new ResultsFilterBuilder();
if (query.getTimestamp() != null) {
rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getTimestamp());
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getTimestamp());
} else {
rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.score(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbability())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim());
@ -645,7 +645,7 @@ public class JobProvider {
private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue) {
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), epochStart, epochEnd)
.timeRange(Result.TIMESTAMP.getPreferredName(), epochStart, epochEnd)
.build();
QueryBuilder boolQuery = new BoolQueryBuilder()
@ -653,7 +653,7 @@ public class JobProvider {
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE))
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(sb);
@ -752,7 +752,7 @@ public class JobProvider {
// the scenes, and Elasticsearch documentation claims it's significantly
// slower. Here we rely on the record timestamps being identical to the
// bucket timestamp.
QueryBuilder recordFilter = QueryBuilders.termQuery(Bucket.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime());
QueryBuilder recordFilter = QueryBuilders.termQuery(Result.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime());
ResultsFilterBuilder builder = new ResultsFilterBuilder(recordFilter)
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim);
@ -835,7 +835,7 @@ public class JobProvider {
public void records(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreThreshold())
.score(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbabilityThreshold())
.interim(AnomalyRecord.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
@ -911,7 +911,7 @@ public class JobProvider {
public void influencers(String jobId, InfluencersQuery query, Consumer<QueryPage<Influencer>> handler,
Consumer<Exception> errorHandler) {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
@ -1039,7 +1039,7 @@ public class JobProvider {
fb = new ResultsFilterBuilder();
}
QueryBuilder qb = fb.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build();
QueryBuilder qb = fb.timeRange(Result.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build();
modelSnapshots(jobId, from, size, sortField, sortDescending, qb, handler, errorHandler);
}
@ -1073,13 +1073,7 @@ public class JobProvider {
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
List<ModelSnapshot> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ModelSnapshot modelSnapshot = ModelSnapshot.PARSER.apply(parser, null);
results.add(modelSnapshot);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse modelSnapshot", e);
}
results.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
QueryPage<ModelSnapshot> result =

View File

@ -5,15 +5,22 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.state;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentParser;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -288,4 +295,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
public static String documentId(String jobId, String snapshotId) {
return jobId + "-" + snapshotId;
}
public static ModelSnapshot fromJson(BytesReference bytesReference) {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, bytesReference)) {
return PARSER.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse modelSnapshot", e);
}
}
}

View File

@ -58,7 +58,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public static final ParseField IS_INTERIM = new ParseField("is_interim");
public static final ParseField INFLUENCERS = new ParseField("influencers");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("records");
@ -94,8 +93,9 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE);
@ -154,7 +154,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
@ -252,7 +252,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex);
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(IS_INTERIM.getPreferredName(), isInterim);
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
if (byFieldName != null) {
builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName);
}

View File

@ -36,7 +36,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
*/
private static final ParseField JOB_ID = Job.ID;
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score");
public static final ParseField INITIAL_ANOMALY_SCORE = new ParseField("initial_anomaly_score");
public static final ParseField MAX_NORMALIZED_PROBABILITY = new ParseField("max_normalized_probability");
@ -69,8 +68,9 @@ public class Bucket extends ToXContentToBytes implements Writeable {
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareDouble(Bucket::setAnomalyScore, ANOMALY_SCORE);
PARSER.declareDouble(Bucket::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE);
@ -102,7 +102,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
public Bucket(String jobId, Date timestamp, long bucketSpan) {
this.jobId = jobId;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
}
@ -163,7 +163,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID.getPreferredName(), jobId);
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore);

View File

@ -38,7 +38,6 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final ParseField RAW_ANOMALY_SCORE = new ParseField("raw_anomaly_score");
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField IS_INTERIM = new ParseField("is_interim");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
@ -59,8 +58,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE);
@ -85,7 +85,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public BucketInfluencer(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
@ -129,7 +129,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(RAW_ANOMALY_SCORE.getPreferredName(), rawAnomalyScore);
builder.field(PROBABILITY.getPreferredName(), probability);
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(IS_INTERIM.getPreferredName(), isInterim);

View File

@ -34,7 +34,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
*/
public static final ParseField PROBABILITY = new ParseField("probability");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name");
public static final ParseField INFLUENCER_FIELD_VALUE = new ParseField("influencer_field_value");
@ -58,8 +57,9 @@ public class Influencer extends ToXContentToBytes implements Writeable {
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM);
PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE);
@ -84,7 +84,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
this.jobId = jobId;
influenceField = fieldName;
influenceValue = fieldValue;
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName());
this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName());
this.bucketSpan = bucketSpan;
this.sequenceNum = sequenceNum;
}
@ -132,7 +132,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(Bucket.IS_INTERIM.getPreferredName(), isInterim);
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.endObject();
return builder;
}

View File

@ -34,7 +34,6 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
public static final String RESULT_TYPE_VALUE = "model_debug_output";
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name");
public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value");
public static final ParseField OVER_FIELD_NAME = new ParseField("over_field_name");
@ -59,8 +58,9 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareString(ModelDebugOutput::setPartitionFieldName, PARTITION_FIELD_NAME);
PARSER.declareString(ModelDebugOutput::setPartitionFieldValue, PARTITION_FIELD_VALUE);
PARSER.declareString(ModelDebugOutput::setOverFieldName, OVER_FIELD_NAME);
@ -140,7 +140,7 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
if (timestamp != null) {
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
}
if (partitionFieldName != null) {
builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName);

View File

@ -58,8 +58,8 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + Bucket.TIMESTAMP.getPreferredName() + "]");
}, Bucket.TIMESTAMP, ObjectParser.ValueType.VALUE);
"unexpected token [" + p.currentToken() + "] for [" + Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES);
PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE);
@ -168,7 +168,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.dateField(Bucket.TIMESTAMP.getPreferredName(), Bucket.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);

View File

@ -88,7 +88,6 @@ public final class ReservedFieldNames {
Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(),
Bucket.PROCESSING_TIME_MS.getPreferredName(),
Bucket.PARTITION_SCORES.getPreferredName(),
Bucket.TIMESTAMP.getPreferredName(),
BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(),
BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(),
@ -144,7 +143,8 @@ public final class ReservedFieldNames {
PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(),
Result.RESULT_TYPE.getPreferredName()
Result.RESULT_TYPE.getPreferredName(),
Result.TIMESTAMP.getPreferredName()
};
/**

View File

@ -17,4 +17,5 @@ public class Result {
*/
public static final ParseField TYPE = new ParseField("result");
public static final ParseField RESULT_TYPE = new ParseField("result_type");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
}

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.ml.MlDailyManagementService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Removes job data that expired with respect to their retention period.
*
* <p> The implementation ensures removal happens in asynchronously to avoid
* blocking the thread it was called at for too long. It does so by
* chaining the steps together.
*/
abstract class AbstractExpiredJobDataRemover implements MlDailyManagementService.Listener {
private final ClusterService clusterService;
AbstractExpiredJobDataRemover(ClusterService clusterService) {
this.clusterService = Objects.requireNonNull(clusterService);
}
@Override
public void onTrigger() {
removeData(newJobIterator());
}
private Iterator<Job> newJobIterator() {
List<Job> jobs = new ArrayList<>();
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
if (mlMetadata != null) {
jobs.addAll(mlMetadata.getJobs().values());
}
return createVolatileCursorIterator(jobs);
}
protected static <T> Iterator<T> createVolatileCursorIterator(List<T> items) {
return new VolatileCursorIterator<T>(items);
}
protected void removeData(Iterator<Job> jobIterator) {
if (jobIterator.hasNext() == false) {
return;
}
Job job = jobIterator.next();
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator));
}
private long calcCutoffEpochMs(long retentionDays) {
long startOfDayEpochMs = DateTime.now(ISOChronology.getInstance()).withTimeAtStartOfDay().getMillis();
return startOfDayEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
}
protected abstract Long getRetentionDays(Job job);
/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code onFinish} when they are done in order to continue to the next job.
*/
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish);
protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
}
private static class VolatileCursorIterator<T> implements Iterator<T> {
private final List<T> items;
private volatile int cursor;
private VolatileCursorIterator(List<T> items) {
this.items = items;
this.cursor = 0;
}
@Override
public boolean hasNext() {
return cursor < items.size();
}
@Override
public T next() {
return items.get(cursor++);
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
/**
* Deletes all model snapshots that have expired the configured retention time
* of their respective job with the exception of the currently used snapshot.
* A snapshot is deleted if its timestamp is earlier than the start of the
* current day (local time-zone) minus the retention period.
*/
public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover {
private static final Logger LOGGER = Loggers.getLogger(ExpiredModelSnapshotsRemover.class);
/**
* The max number of snapshots to fetch per job. It is set to 10K, the default for an index as
* we don't change that in our ML indices. It should be more than enough for most cases. If not,
* it will take a few iterations to delete all snapshots, which is OK.
*/
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;
private final Client client;
public ExpiredModelSnapshotsRemover(Client client, ClusterService clusterService) {
super(clusterService);
this.client = Objects.requireNonNull(client);
}
@Override
protected Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
if (job.getModelSnapshotId() == null) {
// No snapshot to remove
onFinish.run();
return;
}
LOGGER.info("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
QueryBuilder excludeFilter = QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()));
searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
client.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e);
onFinish.run();
}
});
}
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, Runnable onFinish) {
if (modelSnapshotIterator.hasNext() == false) {
onFinish.run();
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<DeleteModelSnapshotAction.Response>() {
@Override
public void onResponse(DeleteModelSnapshotAction.Response response) {
LOGGER.trace("[{}] Deleted expired snapshot [{}]", modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
try {
deleteModelSnapshots(modelSnapshotIterator, onFinish);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e);
deleteModelSnapshots(modelSnapshotIterator, onFinish);
}
});
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.function.Function;
/**
* Removes all results that have expired the configured retention time
* of their respective job. A result is deleted if its timestamp is earlier
* than the start of the current day (local time-zone) minus the retention
* period.
*/
public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private static final Logger LOGGER = Loggers.getLogger(ExpiredResultsRemover.class);
private final Client client;
private final Function<String, Auditor> auditorSupplier;
public ExpiredResultsRemover(Client client, ClusterService clusterService, Function<String, Auditor> auditorSupplier) {
super(clusterService);
this.client = Objects.requireNonNull(client);
this.auditorSupplier = Objects.requireNonNull(auditorSupplier);
}
@Override
protected Long getRetentionDays(Job job) {
return job.getResultsRetentionDays();
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
LOGGER.info("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE);
DeleteByQueryRequest request = createDBQRequest(job, Result.TYPE.getPreferredName(), cutoffEpochMs, excludeFilter);
client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
try {
auditResultsWereDeleted(job.getId(), cutoffEpochMs);
onFinish.run();
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e);
onFinish.run();
}
});
}
private DeleteByQueryRequest createDBQRequest(Job job, String type, long cutoffEpochMs, QueryBuilder excludeFilter) {
SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()));
searchRequest.types(type);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query));
return request;
}
private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {
Instant instant = Instant.ofEpochMilli(cutoffEpochMs);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());
String formatted = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(zonedDateTime);
String msg = Messages.getMessage(Messages.JOB_AUDIT_OLD_RESULTS_DELETED, formatted);
auditorSupplier.apply(jobId).info(msg);
}
}

View File

@ -14,9 +14,9 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException;
@ -26,10 +26,10 @@ public class RestGetBucketsAction extends BaseRestHandler {
super(settings);
controller.registerHandler(RestRequest.Method.GET,
MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName()
+ "}/results/buckets/{" + Bucket.TIMESTAMP.getPreferredName() + "}", this);
+ "}/results/buckets/{" + Result.TIMESTAMP.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.POST,
MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName()
+ "}/results/buckets/{" + Bucket.TIMESTAMP.getPreferredName() + "}", this);
+ "}/results/buckets/{" + Result.TIMESTAMP.getPreferredName() + "}", this);
controller.registerHandler(RestRequest.Method.GET,
MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/results/buckets", this);

View File

@ -24,7 +24,7 @@ job.audit.paused = Job paused
job.audit.resumed = Job resumed
job.audit.updated = Job updated: {0}
job.audit.reverted = Job model snapshot reverted to ''{0}''
job.audit.old.results.deleted = Deleted results prior to {0}
job.audit.old.results.deleted = Deleted results prior to {1}
job.audit.snapshot.deleted = Job model snapshot ''{0}'' deleted
job.audit.datafeed.started.from.to = Datafeed started (from: {0} to: {1})
job.audit.datafeed.started.realtime = Datafeed started in real-time

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Mockito.mock;
public class MlDailyManagementServiceTests extends ESTestCase {
private ThreadPool threadPool;
@Before
public void setUpTests() {
threadPool = new TestThreadPool("MlDailyManagementServiceTests");
}
@After
public void stop() throws InterruptedException {
terminate(threadPool);
}
public void testScheduledTriggering() throws InterruptedException {
MlDailyManagementService.Listener listener1 = mock(MlDailyManagementService.Listener.class);
MlDailyManagementService.Listener listener2 = mock(MlDailyManagementService.Listener.class);
int triggerCount = randomIntBetween(2, 4);
CountDownLatch latch = new CountDownLatch(triggerCount);
try (MlDailyManagementService service = createService(latch, Arrays.asList(listener1, listener2))) {
service.start();
latch.await(1, TimeUnit.SECONDS);
}
verify(listener1, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger();
verify(listener2, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger();
}
private MlDailyManagementService createService(CountDownLatch latch, List<MlDailyManagementService.Listener> listeners) {
return new MlDailyManagementService(threadPool, listeners, () -> {
latch.countDown();
return TimeValue.timeValueMillis(100);
});
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -18,17 +19,19 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlInitializationService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import java.net.InetAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -37,19 +40,32 @@ import static org.mockito.Mockito.when;
public class MlInitializationServiceTests extends ESTestCase {
public void testInitialize() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
private ThreadPool threadPool;
private ExecutorService executorService;
private ClusterService clusterService;
private Client client;
private JobProvider jobProvider;
@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class);
client = mock(Client.class);
jobProvider = mock(JobProvider.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture);
}
ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mock(JobProvider.class);
public void testInitialize() throws Exception {
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
@ -64,21 +80,12 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(jobProvider, times(1)).createNotificationMessageIndex(any());
verify(jobProvider, times(1)).createMetaIndex(any());
verify(jobProvider, times(1)).createJobStateIndex(any());
assertThat(initializationService.getDailyManagementService().isStarted(), is(true));
}
public void testInitialize_noMasterNode() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mock(JobProvider.class);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
@ -91,21 +98,12 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(jobProvider, times(0)).createNotificationMessageIndex(any());
verify(jobProvider, times(0)).createMetaIndex(any());
verify(jobProvider, times(0)).createJobStateIndex(any());
assertThat(initializationService.getDailyManagementService(), is(nullValue()));
}
public void testInitialize_alreadyInitialized() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mock(JobProvider.class);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
@ -130,27 +128,20 @@ public class MlInitializationServiceTests extends ESTestCase {
))
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build();
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
initializationService.setDailyManagementService(initialDailyManagementService);
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
verify(jobProvider, times(0)).createNotificationMessageIndex(any());
verify(jobProvider, times(0)).createMetaIndex(any());
verify(jobProvider, times(0)).createJobStateIndex(any());
assertSame(initialDailyManagementService, initializationService.getDailyManagementService());
}
public void testInitialize_onlyOnce() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mock(JobProvider.class);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
@ -167,4 +158,32 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(jobProvider, times(1)).createMetaIndex(any());
verify(jobProvider, times(1)).createJobStateIndex(any());
}
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
initializationService.setDailyManagementService(initialDailyManagementService);
ClusterState masterCs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
ClusterState noMasterCs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.metaData(MetaData.builder())
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", noMasterCs, masterCs));
verify(initialDailyManagementService).stop();
initializationService.clusterChanged(new ClusterChangedEvent("_source", masterCs, noMasterCs));
MlDailyManagementService finalDailyManagementService = initializationService.getDailyManagementService();
assertNotSame(initialDailyManagementService, finalDailyManagementService);
assertThat(initializationService.getDailyManagementService().isStarted(), is(true));
}
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
public class ResultsFilterBuilderTests extends ESTestCase {
private static final String TIMESTAMP = "timestamp";
@ -28,11 +28,11 @@ public class ResultsFilterBuilderTests extends ESTestCase {
public void testBuild_GivenOnlyStartTime() {
QueryBuilder expected = QueryBuilders
.rangeQuery(Bucket.TIMESTAMP.getPreferredName())
.rangeQuery(Result.TIMESTAMP.getPreferredName())
.gte(1000);
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, null)
.timeRange(Result.TIMESTAMP.getPreferredName(), 1000, null)
.build();
assertEquals(expected.toString(), fb.toString());
@ -123,7 +123,7 @@ public class ResultsFilterBuilderTests extends ESTestCase {
public void testBuild_GivenCombination() {
QueryBuilder originalFilter = QueryBuilders.existsQuery("someField");
QueryBuilder timeFilter = QueryBuilders
.rangeQuery(Bucket.TIMESTAMP.getPreferredName())
.rangeQuery(Result.TIMESTAMP.getPreferredName())
.gte(1000)
.lt(2000);
QueryBuilder score1Filter = new ResultsFilterBuilder()
@ -144,7 +144,7 @@ public class ResultsFilterBuilderTests extends ESTestCase {
.filter(termFilter);
QueryBuilder fb = new ResultsFilterBuilder(originalFilter)
.timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, 2000)
.timeRange(Result.TIMESTAMP.getPreferredName(), 1000, 2000)
.score("score1", 50.0)
.score("score2", 80.0)
.interim("isInterim", false)

View File

@ -0,0 +1,259 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
private Client client;
private ClusterService clusterService;
private ClusterState clusterState;
private List<SearchRequest> capturedSearchRequests;
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
private List<SearchResponse> searchResponsesPerCall;
@Before
public void setUpTests() {
capturedSearchRequests = new ArrayList<>();
capturedDeleteModelSnapshotRequests = new ArrayList<>();
searchResponsesPerCall = new ArrayList<>();
clusterService = mock(ClusterService.class);
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
}
public void testOnTrigger_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
));
createExpiredModelSnapshotsRemover().onTrigger();
Mockito.verifyNoMoreInteractions(client);
}
public void testOnTrigger_GivenJobWithoutActiveSnapshot() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
createExpiredModelSnapshotsRemover().onTrigger();
Mockito.verifyNoMoreInteractions(client);
}
public void testOnTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger();
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
}
public void testOnTrigger_GivenClientSearchRequestsFail() throws IOException {
givenClientSearchRequestsFail();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger();
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
}
public void testOnTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException {
givenClientDeleteModelSnapshotRequestsFail();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));
List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().onTrigger();
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
}
private void givenJobs(List<Job> jobs) {
Map<String, Job> jobsMap = new HashMap<>();
jobs.stream().forEach(job -> jobsMap.put(job.getId(), job));
MlMetadata mlMetadata = mock(MlMetadata.class);
when(mlMetadata.getJobs()).thenReturn(jobsMap);
MetaData metadata = mock(MetaData.class);
when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata);
when(clusterState.getMetaData()).thenReturn(metadata);
}
private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
return new ExpiredModelSnapshotsRemover(client, clusterService);
}
private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) {
ModelSnapshot modelSnapshot = new ModelSnapshot(jobId);
modelSnapshot.setSnapshotId(snapshotId);
return modelSnapshot;
}
private static SearchResponse createSearchResponse(List<ModelSnapshot> modelSnapshots) throws IOException {
SearchHit[] hitsArray = new SearchHit[modelSnapshots.size()];
for (int i = 0; i < modelSnapshots.size(); i++) {
hitsArray[i] = new SearchHit(randomInt());
XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
modelSnapshots.get(i).toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
hitsArray[i].sourceRef(jsonBuilder.bytes());
}
SearchHits hits = new SearchHits(hitsArray, hitsArray.length, 1.0f);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(hits);
return searchResponse;
}
private void givenClientRequestsSucceed() {
givenClientRequests(true, true);
}
private void givenClientSearchRequestsFail() {
givenClientRequests(false, true);
}
private void givenClientDeleteModelSnapshotRequestsFail() {
givenClientRequests(true, false);
}
private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) {
doAnswer(new Answer<Void>() {
int callCount = 0;
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1];
capturedSearchRequests.add(searchRequest);
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
if (shouldSearchRequestsSucceed) {
listener.onResponse(searchResponsesPerCall.get(callCount++));
} else {
listener.onFailure(new RuntimeException("search failed"));
}
return null;
}
}).when(client).execute(same(SearchAction.INSTANCE), any(), any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]);
ActionListener<DeleteModelSnapshotAction.Response> listener =
(ActionListener<DeleteModelSnapshotAction.Response>) invocationOnMock.getArguments()[2];
if (shouldDeleteSnapshotRequestsSucceed) {
listener.onResponse(null);
} else {
listener.onFailure(new RuntimeException("delete snapshot failed"));
}
return null;
}
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExpiredResultsRemoverTests extends ESTestCase {
private Client client;
private ClusterService clusterService;
private ClusterState clusterState;
private List<DeleteByQueryRequest> capturedDeleteByQueryRequests;
@Before
public void setUpTests() {
capturedDeleteByQueryRequests = new ArrayList<>();
clusterService = mock(ClusterService.class);
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(null);
return null;
}
}).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any());
}
public void testOnTrigger_GivenNoJobs() {
givenClientRequestsSucceed();
givenJobs(Collections.emptyList());
createExpiredResultsRemover().onTrigger();
Mockito.verifyNoMoreInteractions(client);
}
public void testOnTrigger_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
));
createExpiredResultsRemover().onTrigger();
Mockito.verifyNoMoreInteractions(client);
}
public void testOnTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().onTrigger();
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")}));
}
public void testOnTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException {
givenClientRequestsFailed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().onTrigger();
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")}));
}
private void givenClientRequestsSucceed() {
givenClientRequests(true);
}
private void givenClientRequestsFailed() {
givenClientRequests(false);
}
private void givenClientRequests(boolean shouldSucceed) {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
if (shouldSucceed) {
listener.onResponse(null);
} else {
listener.onFailure(new RuntimeException("failed"));
}
return null;
}
}).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any());
}
private void givenJobs(List<Job> jobs) {
Map<String, Job> jobsMap = new HashMap<>();
jobs.stream().forEach(job -> jobsMap.put(job.getId(), job));
MlMetadata mlMetadata = mock(MlMetadata.class);
when(mlMetadata.getJobs()).thenReturn(jobsMap);
MetaData metadata = mock(MetaData.class);
when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata);
when(clusterState.getMetaData()).thenReturn(metadata);
}
private ExpiredResultsRemover createExpiredResultsRemover() {
return new ExpiredResultsRemover(client, clusterService, jobId -> mock(Auditor.class));
}
}