diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 769da9bd284..0dda4f21347 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -291,7 +291,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { jobProvider, jobManager, dataProcessor, - new MlInitializationService(settings, threadPool, clusterService, jobProvider), + new MlInitializationService(settings, threadPool, clusterService, client, jobProvider), jobDataCountsPersister, datafeedJobRunner, persistentActionService, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyManagementService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyManagementService.java new file mode 100644 index 00000000000..3b7a0e4ffa4 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyManagementService.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.ThreadPool; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +/** + * A service that runs once a day and triggers management tasks. + */ +public class MlDailyManagementService implements Releasable { + + private static final Logger LOGGER = Loggers.getLogger(MlDailyManagementService.class); + + private final ThreadPool threadPool; + + /** + * An interface to abstract the calculation of the delay to the next execution. + * Needed to enable testing. + */ + private final Supplier schedulerProvider; + + private final List listeners; + private volatile ScheduledFuture future; + + MlDailyManagementService(ThreadPool threadPool, List listeners, Supplier scheduleProvider) { + this.threadPool = Objects.requireNonNull(threadPool); + this.listeners = new ArrayList<>(listeners); + this.schedulerProvider = Objects.requireNonNull(scheduleProvider); + } + + public MlDailyManagementService(ThreadPool threadPool, List listeners) { + this(threadPool, listeners, createAfterMidnightScheduleProvider()); + } + + private static Supplier createAfterMidnightScheduleProvider() { + return () -> { + DateTime now = DateTime.now(ISOChronology.getInstance()); + DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30); + return TimeValue.timeValueMillis(next.getMillis() - now.getMillis()); + }; + } + + public void start() { + LOGGER.debug("Starting ML daily management service"); + scheduleNext(); + } + + public void stop() { + LOGGER.debug("Stopping ML daily management service"); + if (future != null && future.isCancelled() == false) { + FutureUtils.cancel(future); + } + } + + public boolean isStarted() { + return future != null; + } + + @Override + public void close() { + stop(); + } + + private void scheduleNext() { + try { + future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, () -> triggerListeners()); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("failed to schedule next management task; shutting down", e); + } else { + throw e; + } + } + } + + private void triggerListeners() { + LOGGER.info("triggering scheduled [ML] management tasks"); + for (Listener listener : listeners) { + listener.onTrigger(); + } + scheduleNext(); + } + + public interface Listener { + void onTrigger(); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 228aa83c587..50e57b2f240 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -14,19 +15,23 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; +import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; public class MlInitializationService extends AbstractComponent implements ClusterStateListener { private final ThreadPool threadPool; private final ClusterService clusterService; + private final Client client; private final JobProvider jobProvider; private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); @@ -34,13 +39,22 @@ public class MlInitializationService extends AbstractComponent implements Cluste private final AtomicBoolean createMlMetaIndexCheck = new AtomicBoolean(false); private final AtomicBoolean createStateIndexCheck = new AtomicBoolean(false); - public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, - JobProvider jobProvider) { + private volatile MlDailyManagementService mlDailyManagementService; + + public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, + JobProvider jobProvider) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; + this.client = client; this.jobProvider = jobProvider; clusterService.addListener(this); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + super.beforeStop(); + } + }); } @Override @@ -51,6 +65,9 @@ public class MlInitializationService extends AbstractComponent implements Cluste createMlAuditIndex(metaData); createMlMetaIndex(metaData); createStateIndex(metaData); + installDailyManagementService(); + } else { + uninstallDailyManagementService(); } } @@ -144,4 +161,37 @@ public class MlInitializationService extends AbstractComponent implements Cluste } } } + + private void installDailyManagementService() { + if (mlDailyManagementService == null) { + mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList( + new ExpiredResultsRemover(client, clusterService, jobId -> jobProvider.audit(jobId)), + new ExpiredModelSnapshotsRemover(client, clusterService) + )); + mlDailyManagementService.start(); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + uninstallDailyManagementService(); + } + }); + } + } + + private void uninstallDailyManagementService() { + if (mlDailyManagementService != null) { + mlDailyManagementService.stop(); + mlDailyManagementService = null; + } + } + + /** For testing */ + MlDailyManagementService getDailyManagementService() { + return mlDailyManagementService; + } + + /** For testing */ + void setDailyManagementService(MlDailyManagementService service) { + mlDailyManagementService = service; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java index 2b6c0df9316..69f38f99938 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java @@ -159,8 +159,8 @@ public class DeleteModelSnapshotAction extends Action request.jobId = jobId, Job.ID); - PARSER.declareString(Request::setTimestamp, Bucket.TIMESTAMP); + PARSER.declareString(Request::setTimestamp, Result.TIMESTAMP); PARSER.declareString(Request::setPartitionValue, PARTITION_VALUE); PARSER.declareBoolean(Request::setExpand, EXPAND); PARSER.declareBoolean(Request::setIncludeInterim, INCLUDE_INTERIM); @@ -127,7 +128,7 @@ public class GetBucketsAction extends Action { getModelSnapshots(request.getJobId(), request.getSnapshotId(), null, changeCandidates -> { if (changeCandidates == null || changeCandidates.isEmpty()) { - errorHandler.accept(new ResourceNotFoundException( - Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId()))); + errorHandler.accept(new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, + request.getSnapshotId(), request.getJobId()))); } else { handler.accept(changeCandidates); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 9ef37b8c079..8b72575ca8e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -196,7 +197,7 @@ public class DatafeedJobRunner extends AbstractComponent { private void gatherInformation(String jobId, BiConsumer, DataCounts> handler, Consumer errorHandler) { BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() - .sortField(Bucket.TIMESTAMP.getPreferredName()) + .sortField(Result.TIMESTAMP.getPreferredName()) .sortDescending(true).size(1) .includeInterim(false) .build(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 6df96a20b2a..55ace759e38 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -525,14 +525,16 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.backgroundPersistInterval = job.getBackgroundPersistInterval(); this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays(); this.resultsRetentionDays = job.getResultsRetentionDays(); + this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays(); this.customSettings = job.getCustomSettings(); this.modelSnapshotId = job.getModelSnapshotId(); this.resultsIndexName = job.getResultsIndexName(); this.deleted = job.isDeleted(); } - public void setId(String id) { + public Builder setId(String id) { this.id = id; + return this; } public String getId() { @@ -543,19 +545,22 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent return createTime; } - public void setCustomSettings(Map customSettings) { + public Builder setCustomSettings(Map customSettings) { this.customSettings = customSettings; + return this; } - public void setDescription(String description) { + public Builder setDescription(String description) { this.description = description; + return this; } - public void setAnalysisConfig(AnalysisConfig.Builder configBuilder) { + public Builder setAnalysisConfig(AnalysisConfig.Builder configBuilder) { analysisConfig = ExceptionsHelper.requireNonNull(configBuilder, ANALYSIS_CONFIG.getPreferredName()).build(); + return this; } - public void setAnalysisLimits(AnalysisLimits analysisLimits) { + public Builder setAnalysisLimits(AnalysisLimits analysisLimits) { if (this.analysisLimits != null) { long oldMemoryLimit = this.analysisLimits.getModelMemoryLimit(); long newMemoryLimit = analysisLimits.getModelMemoryLimit(); @@ -566,58 +571,71 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } } this.analysisLimits = analysisLimits; + return this; } - public void setCreateTime(Date createTime) { + public Builder setCreateTime(Date createTime) { this.createTime = createTime; + return this; } - public void setFinishedTime(Date finishedTime) { + public Builder setFinishedTime(Date finishedTime) { this.finishedTime = finishedTime; + return this; } /** * Set the wall clock time of the last data upload * @param lastDataTime Wall clock time */ - public void setLastDataTime(Date lastDataTime) { + public Builder setLastDataTime(Date lastDataTime) { this.lastDataTime = lastDataTime; + return this; } - public void setDataDescription(DataDescription.Builder description) { + public Builder setDataDescription(DataDescription.Builder description) { dataDescription = ExceptionsHelper.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build(); + return this; } - public void setModelDebugConfig(ModelDebugConfig modelDebugConfig) { + public Builder setModelDebugConfig(ModelDebugConfig modelDebugConfig) { this.modelDebugConfig = modelDebugConfig; + return this; } - public void setBackgroundPersistInterval(Long backgroundPersistInterval) { + public Builder setBackgroundPersistInterval(Long backgroundPersistInterval) { this.backgroundPersistInterval = backgroundPersistInterval; + return this; } - public void setRenormalizationWindowDays(Long renormalizationWindowDays) { + public Builder setRenormalizationWindowDays(Long renormalizationWindowDays) { this.renormalizationWindowDays = renormalizationWindowDays; + return this; } - public void setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) { + public Builder setModelSnapshotRetentionDays(Long modelSnapshotRetentionDays) { this.modelSnapshotRetentionDays = modelSnapshotRetentionDays; + return this; } - public void setResultsRetentionDays(Long resultsRetentionDays) { + public Builder setResultsRetentionDays(Long resultsRetentionDays) { this.resultsRetentionDays = resultsRetentionDays; + return this; } - public void setModelSnapshotId(String modelSnapshotId) { + public Builder setModelSnapshotId(String modelSnapshotId) { this.modelSnapshotId = modelSnapshotId; + return this; } - public void setResultsIndexName(String resultsIndexName) { + public Builder setResultsIndexName(String resultsIndexName) { this.resultsIndexName = resultsIndexName; + return this; } - public void setDeleted(boolean deleted) { + public Builder setDeleted(boolean deleted) { this.deleted = deleted; + return this; } public Job build() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index 63d91b9ef18..03e10aeb372 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -16,6 +16,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.Result; import java.util.ArrayDeque; import java.util.Arrays; @@ -66,7 +67,7 @@ public abstract class BatchedDocumentsIterator { * @return the iterator itself */ public BatchedDocumentsIterator timeRange(long startEpochMs, long endEpochMs) { - filterBuilder.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs); + filterBuilder.timeRange(Result.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs); return this; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BucketsQueryBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BucketsQueryBuilder.java index f6fcdf3726e..d5603952842 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BucketsQueryBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BucketsQueryBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.common.Strings; import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.Result; import java.util.Objects; @@ -138,7 +139,7 @@ public final class BucketsQueryBuilder { private String end; private String timestamp; private String partitionValue = null; - private String sortField = Bucket.TIMESTAMP.getPreferredName(); + private String sortField = Result.TIMESTAMP.getPreferredName(); private boolean sortDescending = false; public int getFrom() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index f50079787ff..e517dfdc3b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -6,15 +6,12 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; -import org.elasticsearch.xpack.ml.notifications.AuditActivity; -import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyCause; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; @@ -25,7 +22,10 @@ import org.elasticsearch.xpack.ml.job.results.Influence; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; +import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.notifications.AuditActivity; +import org.elasticsearch.xpack.ml.notifications.AuditMessage; import java.io.IOException; import java.util.Collection; @@ -137,7 +137,7 @@ public class ElasticsearchMappings { .field(TYPE, KEYWORD) .field(COPY_TO, ALL_FIELD_VALUES) .endObject() - .startObject(Bucket.TIMESTAMP.getPreferredName()) + .startObject(Result.TIMESTAMP.getPreferredName()) .field(TYPE, DATE) .endObject() .startObject(Bucket.ANOMALY_SCORE.getPreferredName()) @@ -212,7 +212,7 @@ public class ElasticsearchMappings { .startObject(BucketInfluencer.PROBABILITY.getPreferredName()) .field(TYPE, DOUBLE) .endObject() - .startObject(BucketInfluencer.TIMESTAMP.getPreferredName()) + .startObject(Result.TIMESTAMP.getPreferredName()) .field(TYPE, DATE) .endObject() .startObject(BucketInfluencer.BUCKET_SPAN.getPreferredName()) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 48e82efb6fc..1fe00118ddd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -69,7 +69,7 @@ public class JobDataDeleter { public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId); - RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName()); + RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()); timeRange.gte(cutoffEpochMs); timeRange.lt(new Date().getTime()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index e1c18ea3ccc..32a91b72442 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -509,9 +509,9 @@ public class JobProvider { throws ResourceNotFoundException { ResultsFilterBuilder rfb = new ResultsFilterBuilder(); if (query.getTimestamp() != null) { - rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getTimestamp()); + rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getTimestamp()); } else { - rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) + rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) .score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter()) .score(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbability()) .interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim()); @@ -645,7 +645,7 @@ public class JobProvider { private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd, String partitionFieldValue) { QueryBuilder timeRangeQuery = new ResultsFilterBuilder() - .timeRange(Bucket.TIMESTAMP.getPreferredName(), epochStart, epochEnd) + .timeRange(Result.TIMESTAMP.getPreferredName(), epochStart, epochEnd) .build(); QueryBuilder boolQuery = new BoolQueryBuilder() @@ -653,7 +653,7 @@ public class JobProvider { .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE)) .filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue)); - FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); + FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.sort(sb); @@ -752,7 +752,7 @@ public class JobProvider { // the scenes, and Elasticsearch documentation claims it's significantly // slower. Here we rely on the record timestamps being identical to the // bucket timestamp. - QueryBuilder recordFilter = QueryBuilders.termQuery(Bucket.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime()); + QueryBuilder recordFilter = QueryBuilders.termQuery(Result.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime()); ResultsFilterBuilder builder = new ResultsFilterBuilder(recordFilter) .interim(AnomalyRecord.IS_INTERIM.getPreferredName(), includeInterim); @@ -835,7 +835,7 @@ public class JobProvider { public void records(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer> handler, Consumer errorHandler) { QueryBuilder fb = new ResultsFilterBuilder() - .timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) + .timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) .score(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreThreshold()) .score(AnomalyRecord.NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbabilityThreshold()) .interim(AnomalyRecord.IS_INTERIM.getPreferredName(), query.isIncludeInterim()) @@ -911,7 +911,7 @@ public class JobProvider { public void influencers(String jobId, InfluencersQuery query, Consumer> handler, Consumer errorHandler) { QueryBuilder fb = new ResultsFilterBuilder() - .timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) + .timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd()) .score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter()) .interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim()) .build(); @@ -1039,7 +1039,7 @@ public class JobProvider { fb = new ResultsFilterBuilder(); } - QueryBuilder qb = fb.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build(); + QueryBuilder qb = fb.timeRange(Result.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build(); modelSnapshots(jobId, from, size, sortField, sortDescending, qb, handler, errorHandler); } @@ -1073,13 +1073,7 @@ public class JobProvider { client.search(searchRequest, ActionListener.wrap(searchResponse -> { List results = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits().getHits()) { - BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { - ModelSnapshot modelSnapshot = ModelSnapshot.PARSER.apply(parser, null); - results.add(modelSnapshot); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse modelSnapshot", e); - } + results.add(ModelSnapshot.fromJson(hit.getSourceRef())); } QueryPage result = diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java index ee58efa35cd..a110663dd4f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java @@ -5,15 +5,22 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.state; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.json.JsonXContentParser; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.utils.time.TimeUtils; @@ -288,4 +295,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { public static String documentId(String jobId, String snapshotId) { return jobId + "-" + snapshotId; } + + public static ModelSnapshot fromJson(BytesReference bytesReference) { + try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, bytesReference)) { + return PARSER.apply(parser, null); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse modelSnapshot", e); + } + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java index 7b3fead55b5..9485a32e9d0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AnomalyRecord.java @@ -58,7 +58,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { public static final ParseField IS_INTERIM = new ParseField("is_interim"); public static final ParseField INFLUENCERS = new ParseField("influencers"); public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); - public static final ParseField TIMESTAMP = new ParseField("timestamp"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("records"); @@ -94,8 +93,9 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { } else if (p.currentToken() == Token.VALUE_STRING) { return new Date(TimeUtils.dateStringToEpoch(p.text())); } - throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ValueType.VALUE); + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); PARSER.declareString((anomalyRecord, s) -> {}, Result.RESULT_TYPE); @@ -154,7 +154,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; - this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName()); this.bucketSpan = bucketSpan; this.sequenceNum = sequenceNum; } @@ -252,7 +252,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable { builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex); builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); builder.field(IS_INTERIM.getPreferredName(), isInterim); - builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); if (byFieldName != null) { builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java index d5fa9ed84f5..0f3af3f5cfe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java @@ -36,7 +36,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { */ private static final ParseField JOB_ID = Job.ID; - public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score"); public static final ParseField INITIAL_ANOMALY_SCORE = new ParseField("initial_anomaly_score"); public static final ParseField MAX_NORMALIZED_PROBABILITY = new ParseField("max_normalized_probability"); @@ -69,8 +68,9 @@ public class Bucket extends ToXContentToBytes implements Writeable { } else if (p.currentToken() == Token.VALUE_STRING) { return new Date(TimeUtils.dateStringToEpoch(p.text())); } - throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ValueType.VALUE); + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); PARSER.declareDouble(Bucket::setAnomalyScore, ANOMALY_SCORE); PARSER.declareDouble(Bucket::setInitialAnomalyScore, INITIAL_ANOMALY_SCORE); @@ -102,7 +102,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { public Bucket(String jobId, Date timestamp, long bucketSpan) { this.jobId = jobId; - this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName()); this.bucketSpan = bucketSpan; } @@ -163,7 +163,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(JOB_ID.getPreferredName(), jobId); - builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore); builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/BucketInfluencer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/BucketInfluencer.java index c16dad00f43..3ee7f88d0c4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/BucketInfluencer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/BucketInfluencer.java @@ -38,7 +38,6 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { public static final ParseField RAW_ANOMALY_SCORE = new ParseField("raw_anomaly_score"); public static final ParseField PROBABILITY = new ParseField("probability"); public static final ParseField IS_INTERIM = new ParseField("is_interim"); - public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num"); @@ -59,8 +58,9 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { } else if (p.currentToken() == Token.VALUE_STRING) { return new Date(TimeUtils.dateStringToEpoch(p.text())); } - throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ValueType.VALUE); + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); PARSER.declareString((bucketInfluencer, s) -> {}, Result.RESULT_TYPE); @@ -85,7 +85,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { public BucketInfluencer(String jobId, Date timestamp, long bucketSpan, int sequenceNum) { this.jobId = jobId; - this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName()); this.bucketSpan = bucketSpan; this.sequenceNum = sequenceNum; } @@ -129,7 +129,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable { builder.field(ANOMALY_SCORE.getPreferredName(), anomalyScore); builder.field(RAW_ANOMALY_SCORE.getPreferredName(), rawAnomalyScore); builder.field(PROBABILITY.getPreferredName(), probability); - builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); builder.field(IS_INTERIM.getPreferredName(), isInterim); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java index b7e0c4d99c6..8bbbaa161d3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Influencer.java @@ -34,7 +34,6 @@ public class Influencer extends ToXContentToBytes implements Writeable { */ public static final ParseField PROBABILITY = new ParseField("probability"); public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num"); - public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name"); public static final ParseField INFLUENCER_FIELD_VALUE = new ParseField("influencer_field_value"); @@ -58,8 +57,9 @@ public class Influencer extends ToXContentToBytes implements Writeable { } else if (p.currentToken() == Token.VALUE_STRING) { return new Date(TimeUtils.dateStringToEpoch(p.text())); } - throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ValueType.VALUE); + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); PARSER.declareInt(ConstructingObjectParser.constructorArg(), SEQUENCE_NUM); PARSER.declareString((influencer, s) -> {}, Result.RESULT_TYPE); @@ -84,7 +84,7 @@ public class Influencer extends ToXContentToBytes implements Writeable { this.jobId = jobId; influenceField = fieldName; influenceValue = fieldValue; - this.timestamp = ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP.getPreferredName()); + this.timestamp = ExceptionsHelper.requireNonNull(timestamp, Result.TIMESTAMP.getPreferredName()); this.bucketSpan = bucketSpan; this.sequenceNum = sequenceNum; } @@ -132,7 +132,7 @@ public class Influencer extends ToXContentToBytes implements Writeable { builder.field(SEQUENCE_NUM.getPreferredName(), sequenceNum); builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(Bucket.IS_INTERIM.getPreferredName(), isInterim); - builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); builder.endObject(); return builder; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ModelDebugOutput.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ModelDebugOutput.java index f589b91f93e..e86fbbe59fb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ModelDebugOutput.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ModelDebugOutput.java @@ -34,7 +34,6 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable { public static final String RESULT_TYPE_VALUE = "model_debug_output"; public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE); - public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name"); public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value"); public static final ParseField OVER_FIELD_NAME = new ParseField("over_field_name"); @@ -59,8 +58,9 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable { } else if (p.currentToken() == Token.VALUE_STRING) { return new Date(TimeUtils.dateStringToEpoch(p.text())); } - throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); - }, TIMESTAMP, ValueType.VALUE); + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); PARSER.declareString(ModelDebugOutput::setPartitionFieldName, PARTITION_FIELD_NAME); PARSER.declareString(ModelDebugOutput::setPartitionFieldValue, PARTITION_FIELD_VALUE); PARSER.declareString(ModelDebugOutput::setOverFieldName, OVER_FIELD_NAME); @@ -140,7 +140,7 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable { builder.field(Job.ID.getPreferredName(), jobId); builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); if (timestamp != null) { - builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); } if (partitionFieldName != null) { builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java index 895b48139c0..4e3a1b3312d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/PerPartitionMaxProbabilities.java @@ -58,8 +58,8 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W return new Date(TimeUtils.dateStringToEpoch(p.text())); } throw new IllegalArgumentException( - "unexpected token [" + p.currentToken() + "] for [" + Bucket.TIMESTAMP.getPreferredName() + "]"); - }, Bucket.TIMESTAMP, ObjectParser.ValueType.VALUE); + "unexpected token [" + p.currentToken() + "] for [" + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ObjectParser.ValueType.VALUE); PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN); PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES); PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE); @@ -168,7 +168,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); - builder.dateField(Bucket.TIMESTAMP.getPreferredName(), Bucket.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan); builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities); builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index cb08d411d6a..f3bbca2b348 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -88,7 +88,6 @@ public final class ReservedFieldNames { Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(), Bucket.PROCESSING_TIME_MS.getPreferredName(), Bucket.PARTITION_SCORES.getPreferredName(), - Bucket.TIMESTAMP.getPreferredName(), BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(), BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(), @@ -144,7 +143,8 @@ public final class ReservedFieldNames { PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), - Result.RESULT_TYPE.getPreferredName() + Result.RESULT_TYPE.getPreferredName(), + Result.TIMESTAMP.getPreferredName() }; /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Result.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Result.java index 8dc25a32990..69b2c0ba77d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Result.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Result.java @@ -17,4 +17,5 @@ public class Result { */ public static final ParseField TYPE = new ParseField("result"); public static final ParseField RESULT_TYPE = new ParseField("result_type"); + public static final ParseField TIMESTAMP = new ParseField("timestamp"); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java new file mode 100644 index 00000000000..f1135846517 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.xpack.ml.MlDailyManagementService; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.Result; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Removes job data that expired with respect to their retention period. + * + *

The implementation ensures removal happens in asynchronously to avoid + * blocking the thread it was called at for too long. It does so by + * chaining the steps together. + */ +abstract class AbstractExpiredJobDataRemover implements MlDailyManagementService.Listener { + + private final ClusterService clusterService; + + AbstractExpiredJobDataRemover(ClusterService clusterService) { + this.clusterService = Objects.requireNonNull(clusterService); + } + + @Override + public void onTrigger() { + removeData(newJobIterator()); + } + + private Iterator newJobIterator() { + List jobs = new ArrayList<>(); + ClusterState clusterState = clusterService.state(); + MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE); + if (mlMetadata != null) { + jobs.addAll(mlMetadata.getJobs().values()); + } + return createVolatileCursorIterator(jobs); + } + + protected static Iterator createVolatileCursorIterator(List items) { + return new VolatileCursorIterator(items); + } + + protected void removeData(Iterator jobIterator) { + if (jobIterator.hasNext() == false) { + return; + } + Job job = jobIterator.next(); + Long retentionDays = getRetentionDays(job); + if (retentionDays == null) { + removeData(jobIterator); + return; + } + long cutoffEpochMs = calcCutoffEpochMs(retentionDays); + removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator)); + } + + private long calcCutoffEpochMs(long retentionDays) { + long startOfDayEpochMs = DateTime.now(ISOChronology.getInstance()).withTimeAtStartOfDay().getMillis(); + return startOfDayEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis(); + } + + protected abstract Long getRetentionDays(Job job); + + /** + * Template method to allow implementation details of various types of data (e.g. results, model snapshots). + * Implementors need to call {@code onFinish} when they are done in order to continue to the next job. + */ + protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish); + + protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { + return QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); + } + + private static class VolatileCursorIterator implements Iterator { + private final List items; + private volatile int cursor; + + private VolatileCursorIterator(List items) { + this.items = items; + this.cursor = 0; + } + + @Override + public boolean hasNext() { + return cursor < items.size(); + } + + @Override + public T next() { + return items.get(cursor++); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java new file mode 100644 index 00000000000..721df66d671 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * Deletes all model snapshots that have expired the configured retention time + * of their respective job with the exception of the currently used snapshot. + * A snapshot is deleted if its timestamp is earlier than the start of the + * current day (local time-zone) minus the retention period. + */ +public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover { + + private static final Logger LOGGER = Loggers.getLogger(ExpiredModelSnapshotsRemover.class); + + /** + * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as + * we don't change that in our ML indices. It should be more than enough for most cases. If not, + * it will take a few iterations to delete all snapshots, which is OK. + */ + private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; + + private final Client client; + + public ExpiredModelSnapshotsRemover(Client client, ClusterService clusterService) { + super(clusterService); + this.client = Objects.requireNonNull(client); + } + + @Override + protected Long getRetentionDays(Job job) { + return job.getModelSnapshotRetentionDays(); + } + + @Override + protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + if (job.getModelSnapshotId() == null) { + // No snapshot to remove + onFinish.run(); + return; + } + LOGGER.info("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); + QueryBuilder excludeFilter = QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId()); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId())); + searchRequest.types(ModelSnapshot.TYPE.getPreferredName()); + QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter); + searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)); + client.execute(SearchAction.INSTANCE, searchRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + try { + List modelSnapshots = new ArrayList<>(); + for (SearchHit hit : searchResponse.getHits()) { + modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + } + deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e); + onFinish.run(); + } + }); + } + + private void deleteModelSnapshots(Iterator modelSnapshotIterator, Runnable onFinish) { + if (modelSnapshotIterator.hasNext() == false) { + onFinish.run(); + return; + } + ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( + modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { + @Override + public void onResponse(DeleteModelSnapshotAction.Response response) { + LOGGER.trace("[{}] Deleted expired snapshot [{}]", modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + try { + deleteModelSnapshots(modelSnapshotIterator, onFinish); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" + + modelSnapshot.getSnapshotId() + "]", e); + deleteModelSnapshots(modelSnapshotIterator, onFinish); + } + }); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java new file mode 100644 index 00000000000..d20dac047e1 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.notifications.Auditor; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Objects; +import java.util.function.Function; + +/** + * Removes all results that have expired the configured retention time + * of their respective job. A result is deleted if its timestamp is earlier + * than the start of the current day (local time-zone) minus the retention + * period. + */ +public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { + + private static final Logger LOGGER = Loggers.getLogger(ExpiredResultsRemover.class); + + private final Client client; + private final Function auditorSupplier; + + public ExpiredResultsRemover(Client client, ClusterService clusterService, Function auditorSupplier) { + super(clusterService); + this.client = Objects.requireNonNull(client); + this.auditorSupplier = Objects.requireNonNull(auditorSupplier); + } + + @Override + protected Long getRetentionDays(Job job) { + return job.getResultsRetentionDays(); + } + + @Override + protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + LOGGER.info("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); + QueryBuilder excludeFilter = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE); + DeleteByQueryRequest request = createDBQRequest(job, Result.TYPE.getPreferredName(), cutoffEpochMs, excludeFilter); + + client.execute(MlDeleteByQueryAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + try { + auditResultsWereDeleted(job.getId(), cutoffEpochMs); + onFinish.run(); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e); + onFinish.run(); + } + }); + } + + private DeleteByQueryRequest createDBQRequest(Job job, String type, long cutoffEpochMs, QueryBuilder excludeFilter) { + SearchRequest searchRequest = new SearchRequest(); + // We need to create the DeleteByQueryRequest before we modify the SearchRequest + // because the constructor of the former wipes the latter + DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + request.setSlices(5); + + searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId())); + searchRequest.types(type); + QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter); + searchRequest.source(new SearchSourceBuilder().query(query)); + return request; + } + + private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) { + Instant instant = Instant.ofEpochMilli(cutoffEpochMs); + ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault()); + String formatted = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(zonedDateTime); + String msg = Messages.getMessage(Messages.JOB_AUDIT_OLD_RESULTS_DELETED, formatted); + auditorSupplier.apply(jobId).info(msg); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetBucketsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetBucketsAction.java index da5cd869085..0c1120fc785 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetBucketsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetBucketsAction.java @@ -14,9 +14,9 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.GetBucketsAction; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.action.util.PageParams; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.Result; import java.io.IOException; @@ -26,10 +26,10 @@ public class RestGetBucketsAction extends BaseRestHandler { super(settings); controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() - + "}/results/buckets/{" + Bucket.TIMESTAMP.getPreferredName() + "}", this); + + "}/results/buckets/{" + Result.TIMESTAMP.getPreferredName() + "}", this); controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() - + "}/results/buckets/{" + Bucket.TIMESTAMP.getPreferredName() + "}", this); + + "}/results/buckets/{" + Result.TIMESTAMP.getPreferredName() + "}", this); controller.registerHandler(RestRequest.Method.GET, MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/results/buckets", this); diff --git a/plugin/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/plugin/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index e2ffff46885..ab80463d40d 100644 --- a/plugin/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/plugin/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -24,7 +24,7 @@ job.audit.paused = Job paused job.audit.resumed = Job resumed job.audit.updated = Job updated: {0} job.audit.reverted = Job model snapshot reverted to ''{0}'' -job.audit.old.results.deleted = Deleted results prior to {0} +job.audit.old.results.deleted = Deleted results prior to {1} job.audit.snapshot.deleted = Job model snapshot ''{0}'' deleted job.audit.datafeed.started.from.to = Datafeed started (from: {0} to: {1}) job.audit.datafeed.started.realtime = Datafeed started in real-time diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java new file mode 100644 index 00000000000..39c9bd058ca --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.mock.orig.Mockito.verify; +import static org.mockito.Mockito.mock; + +public class MlDailyManagementServiceTests extends ESTestCase { + + private ThreadPool threadPool; + + @Before + public void setUpTests() { + threadPool = new TestThreadPool("MlDailyManagementServiceTests"); + } + + @After + public void stop() throws InterruptedException { + terminate(threadPool); + } + + public void testScheduledTriggering() throws InterruptedException { + MlDailyManagementService.Listener listener1 = mock(MlDailyManagementService.Listener.class); + MlDailyManagementService.Listener listener2 = mock(MlDailyManagementService.Listener.class); + int triggerCount = randomIntBetween(2, 4); + CountDownLatch latch = new CountDownLatch(triggerCount); + try (MlDailyManagementService service = createService(latch, Arrays.asList(listener1, listener2))) { + service.start(); + latch.await(1, TimeUnit.SECONDS); + } + + verify(listener1, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger(); + verify(listener2, org.mockito.Mockito.atLeast(triggerCount - 1)).onTrigger(); + } + + private MlDailyManagementService createService(CountDownLatch latch, List listeners) { + return new MlDailyManagementService(threadPool, listeners, () -> { + latch.countDown(); + return TimeValue.timeValueMillis(100); + }); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index eab7e0fdac2..b118f742cbf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.Version; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -18,17 +19,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.MlInitializationService; -import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.Before; import java.net.InetAddress; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.times; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -37,19 +40,32 @@ import static org.mockito.Mockito.when; public class MlInitializationServiceTests extends ESTestCase { - public void testInitialize() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); + private ThreadPool threadPool; + private ExecutorService executorService; + private ClusterService clusterService; + private Client client; + private JobProvider jobProvider; + + @Before + public void setUpMocks() { + threadPool = mock(ThreadPool.class); + executorService = mock(ExecutorService.class); + clusterService = mock(ClusterService.class); + client = mock(Client.class); + jobProvider = mock(JobProvider.class); + doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); + } - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mock(JobProvider.class); + public void testInitialize() throws Exception { MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -64,21 +80,12 @@ public class MlInitializationServiceTests extends ESTestCase { verify(jobProvider, times(1)).createNotificationMessageIndex(any()); verify(jobProvider, times(1)).createMetaIndex(any()); verify(jobProvider, times(1)).createJobStateIndex(any()); + assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); } public void testInitialize_noMasterNode() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mock(JobProvider.class); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -91,21 +98,12 @@ public class MlInitializationServiceTests extends ESTestCase { verify(jobProvider, times(0)).createNotificationMessageIndex(any()); verify(jobProvider, times(0)).createMetaIndex(any()); verify(jobProvider, times(0)).createJobStateIndex(any()); + assertThat(initializationService.getDailyManagementService(), is(nullValue())); } public void testInitialize_alreadyInitialized() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mock(JobProvider.class); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -130,27 +128,20 @@ public class MlInitializationServiceTests extends ESTestCase { )) .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .build(); + MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); + initializationService.setDailyManagementService(initialDailyManagementService); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(jobProvider, times(0)).createNotificationMessageIndex(any()); verify(jobProvider, times(0)).createMetaIndex(any()); verify(jobProvider, times(0)).createJobStateIndex(any()); + assertSame(initialDailyManagementService, initializationService.getDailyManagementService()); } public void testInitialize_onlyOnce() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mock(JobProvider.class); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -167,4 +158,32 @@ public class MlInitializationServiceTests extends ESTestCase { verify(jobProvider, times(1)).createMetaIndex(any()); verify(jobProvider, times(1)).createJobStateIndex(any()); } + + public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { + MlInitializationService initializationService = + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); + initializationService.setDailyManagementService(initialDailyManagementService); + + ClusterState masterCs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + ClusterState noMasterCs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) + .metaData(MetaData.builder()) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", noMasterCs, masterCs)); + + verify(initialDailyManagementService).stop(); + + initializationService.clusterChanged(new ClusterChangedEvent("_source", masterCs, noMasterCs)); + MlDailyManagementService finalDailyManagementService = initializationService.getDailyManagementService(); + assertNotSame(initialDailyManagementService, finalDailyManagementService); + assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilderTests.java index 718001f1e0a..47d8ab46b1f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ResultsFilterBuilderTests.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.Result; public class ResultsFilterBuilderTests extends ESTestCase { private static final String TIMESTAMP = "timestamp"; @@ -28,11 +28,11 @@ public class ResultsFilterBuilderTests extends ESTestCase { public void testBuild_GivenOnlyStartTime() { QueryBuilder expected = QueryBuilders - .rangeQuery(Bucket.TIMESTAMP.getPreferredName()) + .rangeQuery(Result.TIMESTAMP.getPreferredName()) .gte(1000); QueryBuilder fb = new ResultsFilterBuilder() - .timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, null) + .timeRange(Result.TIMESTAMP.getPreferredName(), 1000, null) .build(); assertEquals(expected.toString(), fb.toString()); @@ -123,7 +123,7 @@ public class ResultsFilterBuilderTests extends ESTestCase { public void testBuild_GivenCombination() { QueryBuilder originalFilter = QueryBuilders.existsQuery("someField"); QueryBuilder timeFilter = QueryBuilders - .rangeQuery(Bucket.TIMESTAMP.getPreferredName()) + .rangeQuery(Result.TIMESTAMP.getPreferredName()) .gte(1000) .lt(2000); QueryBuilder score1Filter = new ResultsFilterBuilder() @@ -144,7 +144,7 @@ public class ResultsFilterBuilderTests extends ESTestCase { .filter(termFilter); QueryBuilder fb = new ResultsFilterBuilder(originalFilter) - .timeRange(Bucket.TIMESTAMP.getPreferredName(), 1000, 2000) + .timeRange(Result.TIMESTAMP.getPreferredName(), 1000, 2000) .score("score1", 50.0) .score("score2", 80.0) .interim("isInterim", false) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java new file mode 100644 index 00000000000..9e3d3f8c78c --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -0,0 +1,259 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.mock.orig.Mockito; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobTests; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { + + private Client client; + private ClusterService clusterService; + private ClusterState clusterState; + private List capturedSearchRequests; + private List capturedDeleteModelSnapshotRequests; + private List searchResponsesPerCall; + + @Before + public void setUpTests() { + capturedSearchRequests = new ArrayList<>(); + capturedDeleteModelSnapshotRequests = new ArrayList<>(); + searchResponsesPerCall = new ArrayList<>(); + clusterService = mock(ClusterService.class); + clusterState = mock(ClusterState.class); + when(clusterService.state()).thenReturn(clusterState); + client = mock(Client.class); + } + + public void testOnTrigger_GivenJobsWithoutRetentionPolicy() { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("foo").build(), + JobTests.buildJobBuilder("bar").build() + )); + + createExpiredModelSnapshotsRemover().onTrigger(); + + Mockito.verifyNoMoreInteractions(client); + } + + public void testOnTrigger_GivenJobWithoutActiveSnapshot() { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); + + createExpiredModelSnapshotsRemover().onTrigger(); + + Mockito.verifyNoMoreInteractions(client); + } + + public void testOnTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + )); + + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + createModelSnapshot("snapshots-1", "snapshots-1_2")); + List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + + createExpiredModelSnapshotsRemover().onTrigger(); + + assertThat(capturedSearchRequests.size(), equalTo(2)); + SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + searchRequest = capturedSearchRequests.get(1); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); + deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); + deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); + } + + public void testOnTrigger_GivenClientSearchRequestsFail() throws IOException { + givenClientSearchRequestsFail(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + )); + + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + createModelSnapshot("snapshots-1", "snapshots-1_2")); + List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + + createExpiredModelSnapshotsRemover().onTrigger(); + + assertThat(capturedSearchRequests.size(), equalTo(2)); + SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + searchRequest = capturedSearchRequests.get(1); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); + } + + public void testOnTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException { + givenClientDeleteModelSnapshotRequestsFail(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + )); + + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + createModelSnapshot("snapshots-1", "snapshots-1_2")); + List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + + createExpiredModelSnapshotsRemover().onTrigger(); + + assertThat(capturedSearchRequests.size(), equalTo(2)); + SearchRequest searchRequest = capturedSearchRequests.get(0); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")})); + searchRequest = capturedSearchRequests.get(1); + assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")})); + + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); + deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); + deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); + assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); + assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); + } + + private void givenJobs(List jobs) { + Map jobsMap = new HashMap<>(); + jobs.stream().forEach(job -> jobsMap.put(job.getId(), job)); + MlMetadata mlMetadata = mock(MlMetadata.class); + when(mlMetadata.getJobs()).thenReturn(jobsMap); + MetaData metadata = mock(MetaData.class); + when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata); + when(clusterState.getMetaData()).thenReturn(metadata); + } + + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { + return new ExpiredModelSnapshotsRemover(client, clusterService); + } + + private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { + ModelSnapshot modelSnapshot = new ModelSnapshot(jobId); + modelSnapshot.setSnapshotId(snapshotId); + return modelSnapshot; + } + + private static SearchResponse createSearchResponse(List modelSnapshots) throws IOException { + SearchHit[] hitsArray = new SearchHit[modelSnapshots.size()]; + for (int i = 0; i < modelSnapshots.size(); i++) { + hitsArray[i] = new SearchHit(randomInt()); + XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); + modelSnapshots.get(i).toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + hitsArray[i].sourceRef(jsonBuilder.bytes()); + } + SearchHits hits = new SearchHits(hitsArray, hitsArray.length, 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(hits); + return searchResponse; + } + + private void givenClientRequestsSucceed() { + givenClientRequests(true, true); + } + + private void givenClientSearchRequestsFail() { + givenClientRequests(false, true); + } + + private void givenClientDeleteModelSnapshotRequestsFail() { + givenClientRequests(true, false); + } + + private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { + doAnswer(new Answer() { + int callCount = 0; + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1]; + capturedSearchRequests.add(searchRequest); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + if (shouldSearchRequestsSucceed) { + listener.onResponse(searchResponsesPerCall.get(callCount++)); + } else { + listener.onFailure(new RuntimeException("search failed")); + } + return null; + } + }).when(client).execute(same(SearchAction.INSTANCE), any(), any()); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]); + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + if (shouldDeleteSnapshotRequestsSucceed) { + listener.onResponse(null); + } else { + listener.onFailure(new RuntimeException("delete snapshot failed")); + } + return null; + } + }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java new file mode 100644 index 00000000000..8bc40327d52 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -0,0 +1,161 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; +import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.mock.orig.Mockito; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobTests; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExpiredResultsRemoverTests extends ESTestCase { + + private Client client; + private ClusterService clusterService; + private ClusterState clusterState; + private List capturedDeleteByQueryRequests; + + @Before + public void setUpTests() { + capturedDeleteByQueryRequests = new ArrayList<>(); + clusterService = mock(ClusterService.class); + clusterState = mock(ClusterState.class); + when(clusterService.state()).thenReturn(clusterState); + client = mock(Client.class); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(null); + return null; + } + }).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any()); + } + + public void testOnTrigger_GivenNoJobs() { + givenClientRequestsSucceed(); + givenJobs(Collections.emptyList()); + + createExpiredResultsRemover().onTrigger(); + + Mockito.verifyNoMoreInteractions(client); + } + + public void testOnTrigger_GivenJobsWithoutRetentionPolicy() { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("foo").build(), + JobTests.buildJobBuilder("bar").build() + )); + + createExpiredResultsRemover().onTrigger(); + + Mockito.verifyNoMoreInteractions(client); + } + + public void testOnTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + )); + + createExpiredResultsRemover().onTrigger(); + + assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); + DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")})); + dbqRequest = capturedDeleteByQueryRequests.get(1); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")})); + } + + public void testOnTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { + givenClientRequestsFailed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("none").build(), + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + )); + + createExpiredResultsRemover().onTrigger(); + + assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); + DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")})); + dbqRequest = capturedDeleteByQueryRequests.get(1); + assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")})); + } + + private void givenClientRequestsSucceed() { + givenClientRequests(true); + } + + private void givenClientRequestsFailed() { + givenClientRequests(false); + } + + private void givenClientRequests(boolean shouldSucceed) { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + if (shouldSucceed) { + listener.onResponse(null); + } else { + listener.onFailure(new RuntimeException("failed")); + } + return null; + } + }).when(client).execute(same(MlDeleteByQueryAction.INSTANCE), any(), any()); + } + + private void givenJobs(List jobs) { + Map jobsMap = new HashMap<>(); + jobs.stream().forEach(job -> jobsMap.put(job.getId(), job)); + MlMetadata mlMetadata = mock(MlMetadata.class); + when(mlMetadata.getJobs()).thenReturn(jobsMap); + MetaData metadata = mock(MetaData.class); + when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata); + when(clusterState.getMetaData()).thenReturn(metadata); + } + + private ExpiredResultsRemover createExpiredResultsRemover() { + return new ExpiredResultsRemover(client, clusterService, jobId -> mock(Auditor.class)); + } +} \ No newline at end of file