diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java index c266ea40778..a52817b0ea4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java @@ -334,9 +334,10 @@ extends Action listener) throws Exception { - logger.debug("Received request to revert to time '" + request.getTime() + "' description '" + request.getDescription() - + "' snapshot id '" + request.getSnapshotId() + "' for job '" + request.getJobId() + "', deleting intervening " - + " results: " + request.getDeleteInterveningResults()); + logger.debug("Received request to revert to time '{}' description '{}' snapshot id '{}' for job '{}', deleting intervening " + + "results: {}", + request.getTime(), request.getDescription(), request.getSnapshotId(), request.getJobId(), + request.getDeleteInterveningResults()); if (request.getTime() == null && request.getSnapshotId() == null && request.getDescription() == null) { throw new IllegalStateException(Messages.getMessage(Messages.REST_INVALID_REVERT_PARAMS)); @@ -344,7 +345,7 @@ extends Action job = jobManager.getJob(request.getJobId(), clusterService.state()); Allocation allocation = jobManager.getJobAllocation(request.getJobId()); - if (job.count() > 0 && allocation.getStatus().equals(JobStatus.RUNNING)) { + if (job.count() > 0 && allocation.getStatus().equals(JobStatus.CLOSED) == false) { throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)); } @@ -386,12 +387,12 @@ extends Action() { @Override public void onResponse(BulkResponse bulkItemResponses) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleter.java index 14198bacec8..8a663c2872a 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleter.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.prelert.job.persistence; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -12,28 +13,23 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.DeleteRequestBuilder; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.ModelState; -import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.Bucket; -import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer; -import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; import org.elasticsearch.xpack.prelert.job.results.Result; +import java.util.Date; import java.util.Objects; -import java.util.function.LongSupplier; public class ElasticsearchBulkDeleter implements JobDataDeleter { private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBulkDeleter.class); @@ -44,117 +40,63 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter { private final Client client; private final String jobId; private final BulkRequestBuilder bulkRequestBuilder; - private long deletedBucketCount; - private long deletedRecordCount; - private long deletedBucketInfluencerCount; - private long deletedInfluencerCount; + private long deletedResultCount; private long deletedModelSnapshotCount; private long deletedModelStateCount; private boolean quiet; - public ElasticsearchBulkDeleter(Client client, String jobId, boolean quiet) { - this.client = Objects.requireNonNull(client); - this.jobId = Objects.requireNonNull(jobId); - bulkRequestBuilder = client.prepareBulk(); - deletedBucketCount = 0; - deletedRecordCount = 0; - deletedBucketInfluencerCount = 0; - deletedInfluencerCount = 0; - deletedModelSnapshotCount = 0; - deletedModelStateCount = 0; - this.quiet = quiet; - } - public ElasticsearchBulkDeleter(Client client, String jobId) { this(client, jobId, false); } - @Override - public void deleteBucket(Bucket bucket) { - deleteRecords(bucket); - deleteBucketInfluencers(bucket); - bulkRequestBuilder.add( - client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), bucket.getId())); - ++deletedBucketCount; + public ElasticsearchBulkDeleter(Client client, String jobId, boolean quiet) { + this.client = Objects.requireNonNull(client); + this.jobId = Objects.requireNonNull(jobId); + bulkRequestBuilder = client.prepareBulk(); + deletedResultCount = 0; + deletedModelSnapshotCount = 0; + deletedModelStateCount = 0; + this.quiet = quiet; } @Override - public void deleteRecords(Bucket bucket) { - // Find the records using the time stamp rather than a parent-child - // relationship. The parent-child filter involves two queries behind - // the scenes, and Elasticsearch documentation claims it's significantly - // slower. Here we rely on the record timestamps being identical to the - // bucket timestamp. - deleteResultTypeByBucket(bucket, AnomalyRecord.RESULT_TYPE_VALUE, () -> ++deletedRecordCount); - } + public void deleteResultsFromTime(long cutoffEpochMs) { + String index = JobResultsPersister.getJobIndexName(jobId); - private void deleteResultTypeByBucket(Bucket bucket, String resultType, LongSupplier deleteCounter) { - QueryBuilder timeQuery = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp().getTime()); - QueryBuilder boolQuery = new BoolQueryBuilder() - .filter(timeQuery) - .filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType)); + RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP); + timeRange.gte(cutoffEpochMs); + timeRange.lt(new Date().getTime()); - int done = 0; - boolean finished = false; - while (finished == false) { - SearchResponse searchResponse = SearchAction.INSTANCE.newRequestBuilder(client) - .setIndices(JobResultsPersister.getJobIndexName(jobId)) - .setTypes(Result.TYPE.getPreferredName()) - .setQuery(boolQuery) - .addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) - .setSize(SCROLL_SIZE) - .setFrom(done) - .execute().actionGet(); + SearchResponse searchResponse = client.prepareSearch(index) + .setTypes(Result.TYPE.getPreferredName()) + .setFetchSource(false) + .setQuery(timeRange) + .setScroll(SCROLL_CONTEXT_DURATION) + .setSize(SCROLL_SIZE) + .get(); + String scrollId = searchResponse.getScrollId(); + long totalHits = searchResponse.getHits().totalHits(); + long totalDeletedCount = 0; + while (totalDeletedCount < totalHits) { for (SearchHit hit : searchResponse.getHits()) { - ++done; - addDeleteRequest(hit); - deleteCounter.getAsLong(); - } - if (searchResponse.getHits().getTotalHits() == done) { - finished = true; + LOGGER.trace("Search hit for result: {}", hit.getId()); + ++totalDeletedCount; + addDeleteRequest(hit, index); + ++deletedResultCount; } + searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get(); } } - private void addDeleteRequest(SearchHit hit) { + private void addDeleteRequest(SearchHit hit, String index) { DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client) - .setIndex(JobResultsPersister.getJobIndexName(jobId)) + .setIndex(index) .setType(hit.getType()) .setId(hit.getId()); bulkRequestBuilder.add(deleteRequest); } - public void deleteBucketInfluencers(Bucket bucket) { - // Find the bucket influencers using the time stamp, relying on the - // bucket influencer timestamps being identical to the bucket timestamp. - deleteResultTypeByBucket(bucket, BucketInfluencer.RESULT_TYPE_VALUE, () -> ++deletedBucketInfluencerCount); - } - - public void deleteInfluencers(Bucket bucket) { - // Find the influencers using the time stamp, relying on the influencer - // timestamps being identical to the bucket timestamp. - deleteResultTypeByBucket(bucket, Influencer.RESULT_TYPE_VALUE, () -> ++deletedInfluencerCount); - } - - public void deleteBucketByTime(Bucket bucket) { - deleteResultTypeByBucket(bucket, Bucket.RESULT_TYPE_VALUE, () -> ++deletedBucketCount); - } - - @Override - public void deleteInfluencer(Influencer influencer) { - String id = influencer.getId(); - if (id == null) { - LOGGER.error("Cannot delete specific influencer without an ID", - // This means we get a stack trace to show where the request came from - new NullPointerException()); - return; - } - bulkRequestBuilder.add( - client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), id)); - ++deletedInfluencerCount; - } - @Override public void deleteModelSnapshot(ModelSnapshot modelSnapshot) { String snapshotId = modelSnapshot.getSnapshotId(); @@ -165,13 +107,11 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter { // too big and has no mappings for (int i = 0; i < docCount; ++i) { String stateId = snapshotId + '_' + i; - bulkRequestBuilder.add( - client.prepareDelete(indexName, ModelState.TYPE, stateId)); + bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE, stateId)); ++deletedModelStateCount; } - bulkRequestBuilder.add( - client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId)); + bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId)); ++deletedModelSnapshotCount; } @@ -188,12 +128,16 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter { JobResultsPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId())); } + @Override public void deleteInterimResults() { + String index = JobResultsPersister.getJobIndexName(jobId); + QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); - SearchResponse searchResponse = client.prepareSearch(JobResultsPersister.getJobIndexName(jobId)) + SearchResponse searchResponse = client.prepareSearch(index) .setTypes(Result.RESULT_TYPE.getPreferredName()) .setQuery(qb) + .setFetchSource(false) .addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) .setScroll(SCROLL_CONTEXT_DURATION) .setSize(SCROLL_SIZE) @@ -204,19 +148,10 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter { long totalDeletedCount = 0; while (totalDeletedCount < totalHits) { for (SearchHit hit : searchResponse.getHits()) { - LOGGER.trace("Search hit for bucket: {}, {}", hit.toString(), hit.getId()); - String type = (String) hit.getSource().get(Result.RESULT_TYPE.getPreferredName()); - if (Bucket.RESULT_TYPE_VALUE.equals(type)) { - ++deletedBucketCount; - } else if (AnomalyRecord.RESULT_TYPE_VALUE.equals(type)) { - ++deletedRecordCount; - } else if (BucketInfluencer.RESULT_TYPE_VALUE.equals(type)) { - ++deletedBucketInfluencerCount; - } else if (Influencer.RESULT_TYPE_VALUE.equals(type)) { - ++deletedInfluencerCount; - } + LOGGER.trace("Search hit for result: {}", hit.getId()); ++totalDeletedCount; - addDeleteRequest(hit); + addDeleteRequest(hit, index); + ++deletedResultCount; } searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get(); @@ -234,16 +169,9 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter { return; } - if (!quiet) { - LOGGER.debug("Requesting deletion of " - + deletedBucketCount + " buckets, " - + deletedRecordCount + " records, " - + deletedBucketInfluencerCount + " bucket influencers, " - + deletedInfluencerCount + " influencers, " - + deletedModelSnapshotCount + " model snapshots, " - + " and " - + deletedModelStateCount + " model state documents"); - } + Level logLevel = quiet ? Level.DEBUG : Level.INFO; + LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", + deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); try { bulkRequestBuilder.execute(listener); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java index 5c1c8eda2a1..fb2d6a958bb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java @@ -9,31 +9,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; -import org.elasticsearch.xpack.prelert.job.results.Bucket; -import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; public interface JobDataDeleter { - /** - * Delete a {@code Bucket} and its records - * - * @param bucket the bucket to delete - */ - void deleteBucket(Bucket bucket); /** - * Delete the records of a {@code Bucket} + * Delete all result types (Buckets, Records, Influencers) from {@code cutOffTime} * - * @param bucket the bucket whose records to delete + * @param cutoffEpochMs Results at and after this time will be deleted */ - void deleteRecords(Bucket bucket); - - /** - * Delete an {@code Influencer} - * - * @param influencer the influencer to delete - */ - void deleteInfluencer(Influencer influencer); + void deleteResultsFromTime(long cutoffEpochMs); /** * Delete a {@code ModelSnapshot} @@ -56,6 +41,11 @@ public interface JobDataDeleter { */ void deleteModelSizeStats(ModelSizeStats modelSizeStats); + /** + * Delete all results marked as interim + */ + void deleteInterimResults(); + /** * Commit the deletions without enforcing the removal of data from disk */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java index 82d324bf1e4..ae3db64503c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java @@ -8,10 +8,7 @@ package org.elasticsearch.xpack.prelert.job.persistence; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkResponse; -import java.util.Date; -import java.util.Deque; import java.util.Objects; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -20,11 +17,9 @@ import java.util.function.Function; */ public class OldDataRemover { - private final JobProvider jobProvider; private final Function dataDeleterFactory; - public OldDataRemover(JobProvider jobProvider, Function dataDeleterFactory) { - this.jobProvider = Objects.requireNonNull(jobProvider); + public OldDataRemover(Function dataDeleterFactory) { this.dataDeleterFactory = Objects.requireNonNull(dataDeleterFactory); } @@ -32,34 +27,8 @@ public class OldDataRemover { * Removes results between the time given and the current time */ public void deleteResultsAfter(ActionListener listener, String jobId, long cutoffEpochMs) { - Date now = new Date(); JobDataDeleter deleter = dataDeleterFactory.apply(jobId); - deleteResultsWithinRange(jobId, deleter, cutoffEpochMs, now.getTime()); + deleter.deleteResultsFromTime(cutoffEpochMs); deleter.commit(listener); } - - private void deleteResultsWithinRange(String jobId, JobDataDeleter deleter, long start, long end) { - deleteBatchedData( - jobProvider.newBatchedInfluencersIterator(jobId).timeRange(start, end), - deleter::deleteInfluencer - ); - deleteBatchedData( - jobProvider.newBatchedBucketsIterator(jobId).timeRange(start, end), - deleter::deleteBucket - ); - } - - private void deleteBatchedData(BatchedDocumentsIterator resultsIterator, - Consumer deleteFunction) { - while (resultsIterator.hasNext()) { - Deque batch = resultsIterator.next(); - if (batch.isEmpty()) { - return; - } - for (T result : batch) { - deleteFunction.accept(result); - } - } - } - } diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml index 413ec20a776..fe1a53825b2 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/jobs_get_result_records.yaml @@ -37,8 +37,7 @@ setup: "jobId": "farequote", "result_type": "record", "timestamp": "2016-06-01T00:00:00Z", - "anomalyScore": 60.0, - "result_type": "record" + "anomalyScore": 60.0 } - do: @@ -51,8 +50,7 @@ setup: "jobId": "farequote", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z", - "anomalyScore": 80.0, - "result_type": "record" + "anomalyScore": 80.0 } - do: diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index 31a1f0af116..6d754ae019d 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -68,6 +68,49 @@ setup: id: "foo_1462060800000_1" body: { "jobId": "foo", "result_type": "bucket", "timestamp": "2016-05-01T00:00:00Z", "bucketSpan":1 } + - do: + index: + index: prelertresults-foo + type: result + id: "foo_1464825600000_1_record" + body: { "jobId": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z" } + + - do: + index: + index: prelertresults-foo + type: result + id: "foo_1462060800000_1_record" + body: { "jobId": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z" } + + - do: + index: + index: prelertresults-foo + type: result + id: "foo_1464825600000_1_influencer" + body: { + "jobId": "foo", + "result_type": "influencer", + "timestamp": "2016-06-02T00:00:00Z", + "influencerFieldName": "foo", + "influencerFieldValue": "zoo", + "anomalyScore": 50.0 + } + + - do: + index: + index: prelertresults-foo + type: result + id: "foo_1462060800000_1_influencer" + body: + { + "jobId": "foo", + "result_type": "influencer", + "timestamp": "2016-05-01T00:00:00Z", + "influencerFieldName": "foo", + "influencerFieldValue": "zoo", + "anomalyScore": 50.0 + } + - do: indices.refresh: index: prelertresults-foo @@ -211,6 +254,26 @@ setup: - match: { buckets.0.jobId: "foo" } - match: { buckets.0.timestamp: 1462060800000 } + - do: + xpack.prelert.get_records: + job_id: "foo" + start: "2016-01-01T00:00:00Z" + end: "2016-12-01T00:00:00Z" + + - match: { count: 1 } + - match: { records.0.jobId: "foo" } + - match: { records.0.timestamp: 1462060800000 } + + - do: + xpack.prelert.get_influencers: + job_id: "foo" + start: "2016-01-01T00:00:00Z" + end: "2016-12-01T01:00:00Z" + + - match: { count: 1 } + - match: { influencers.0.jobId: "foo" } + - match: { influencers.0.timestamp: 1462060800000 } + - do: xpack.prelert.get_jobs: job_id: foo