Refactor deleting old results (elastic/elasticsearch#431)

* Refactor deleting old results

* Reinstate quiet logging when deleting interim results

Original commit: elastic/x-pack-elasticsearch@01ea95469c
This commit is contained in:
David Kyle 2016-12-01 10:32:41 +00:00 committed by GitHub
parent 2fdf848df5
commit 53adc100ad
6 changed files with 130 additions and 181 deletions

View File

@ -334,9 +334,10 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
logger.debug("Received request to revert to time '" + request.getTime() + "' description '" + request.getDescription()
+ "' snapshot id '" + request.getSnapshotId() + "' for job '" + request.getJobId() + "', deleting intervening "
+ " results: " + request.getDeleteInterveningResults());
logger.debug("Received request to revert to time '{}' description '{}' snapshot id '{}' for job '{}', deleting intervening " +
"results: {}",
request.getTime(), request.getDescription(), request.getSnapshotId(), request.getJobId(),
request.getDeleteInterveningResults());
if (request.getTime() == null && request.getSnapshotId() == null && request.getDescription() == null) {
throw new IllegalStateException(Messages.getMessage(Messages.REST_INVALID_REVERT_PARAMS));
@ -344,7 +345,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
Allocation allocation = jobManager.getJobAllocation(request.getJobId());
if (job.count() > 0 && allocation.getStatus().equals(JobStatus.RUNNING)) {
if (job.count() > 0 && allocation.getStatus().equals(JobStatus.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}
@ -386,12 +387,12 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: "
+ modelSnapshot.getLatestResultTimeStamp());
logger.info("Deleting buckets after '" + deleteAfter + "'");
logger.info("Deleting results after '" + deleteAfter + "'");
// NORELEASE: OldDataRemover is basically delete-by-query.
// We should replace this
// whole abstraction with DBQ eventually
OldDataRemover remover = new OldDataRemover(jobProvider, bulkDeleterFactory);
OldDataRemover remover = new OldDataRemover(bulkDeleterFactory);
remover.deleteResultsAfter(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -12,28 +13,23 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.ModelState;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.util.Date;
import java.util.Objects;
import java.util.function.LongSupplier;
public class ElasticsearchBulkDeleter implements JobDataDeleter {
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBulkDeleter.class);
@ -44,117 +40,63 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
private final Client client;
private final String jobId;
private final BulkRequestBuilder bulkRequestBuilder;
private long deletedBucketCount;
private long deletedRecordCount;
private long deletedBucketInfluencerCount;
private long deletedInfluencerCount;
private long deletedResultCount;
private long deletedModelSnapshotCount;
private long deletedModelStateCount;
private boolean quiet;
public ElasticsearchBulkDeleter(Client client, String jobId, boolean quiet) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
bulkRequestBuilder = client.prepareBulk();
deletedBucketCount = 0;
deletedRecordCount = 0;
deletedBucketInfluencerCount = 0;
deletedInfluencerCount = 0;
deletedModelSnapshotCount = 0;
deletedModelStateCount = 0;
this.quiet = quiet;
}
public ElasticsearchBulkDeleter(Client client, String jobId) {
this(client, jobId, false);
}
@Override
public void deleteBucket(Bucket bucket) {
deleteRecords(bucket);
deleteBucketInfluencers(bucket);
bulkRequestBuilder.add(
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), bucket.getId()));
++deletedBucketCount;
public ElasticsearchBulkDeleter(Client client, String jobId, boolean quiet) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
bulkRequestBuilder = client.prepareBulk();
deletedResultCount = 0;
deletedModelSnapshotCount = 0;
deletedModelStateCount = 0;
this.quiet = quiet;
}
@Override
public void deleteRecords(Bucket bucket) {
// Find the records using the time stamp rather than a parent-child
// relationship. The parent-child filter involves two queries behind
// the scenes, and Elasticsearch documentation claims it's significantly
// slower. Here we rely on the record timestamps being identical to the
// bucket timestamp.
deleteResultTypeByBucket(bucket, AnomalyRecord.RESULT_TYPE_VALUE, () -> ++deletedRecordCount);
}
public void deleteResultsFromTime(long cutoffEpochMs) {
String index = JobResultsPersister.getJobIndexName(jobId);
private void deleteResultTypeByBucket(Bucket bucket, String resultType, LongSupplier deleteCounter) {
QueryBuilder timeQuery = QueryBuilders.termQuery(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp().getTime());
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(timeQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP);
timeRange.gte(cutoffEpochMs);
timeRange.lt(new Date().getTime());
int done = 0;
boolean finished = false;
while (finished == false) {
SearchResponse searchResponse = SearchAction.INSTANCE.newRequestBuilder(client)
.setIndices(JobResultsPersister.getJobIndexName(jobId))
.setTypes(Result.TYPE.getPreferredName())
.setQuery(boolQuery)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setSize(SCROLL_SIZE)
.setFrom(done)
.execute().actionGet();
SearchResponse searchResponse = client.prepareSearch(index)
.setTypes(Result.TYPE.getPreferredName())
.setFetchSource(false)
.setQuery(timeRange)
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.get();
String scrollId = searchResponse.getScrollId();
long totalHits = searchResponse.getHits().totalHits();
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
++done;
addDeleteRequest(hit);
deleteCounter.getAsLong();
}
if (searchResponse.getHits().getTotalHits() == done) {
finished = true;
LOGGER.trace("Search hit for result: {}", hit.getId());
++totalDeletedCount;
addDeleteRequest(hit, index);
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
}
}
private void addDeleteRequest(SearchHit hit) {
private void addDeleteRequest(SearchHit hit, String index) {
DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client)
.setIndex(JobResultsPersister.getJobIndexName(jobId))
.setIndex(index)
.setType(hit.getType())
.setId(hit.getId());
bulkRequestBuilder.add(deleteRequest);
}
public void deleteBucketInfluencers(Bucket bucket) {
// Find the bucket influencers using the time stamp, relying on the
// bucket influencer timestamps being identical to the bucket timestamp.
deleteResultTypeByBucket(bucket, BucketInfluencer.RESULT_TYPE_VALUE, () -> ++deletedBucketInfluencerCount);
}
public void deleteInfluencers(Bucket bucket) {
// Find the influencers using the time stamp, relying on the influencer
// timestamps being identical to the bucket timestamp.
deleteResultTypeByBucket(bucket, Influencer.RESULT_TYPE_VALUE, () -> ++deletedInfluencerCount);
}
public void deleteBucketByTime(Bucket bucket) {
deleteResultTypeByBucket(bucket, Bucket.RESULT_TYPE_VALUE, () -> ++deletedBucketCount);
}
@Override
public void deleteInfluencer(Influencer influencer) {
String id = influencer.getId();
if (id == null) {
LOGGER.error("Cannot delete specific influencer without an ID",
// This means we get a stack trace to show where the request came from
new NullPointerException());
return;
}
bulkRequestBuilder.add(
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Result.TYPE.getPreferredName(), id));
++deletedInfluencerCount;
}
@Override
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
@ -165,13 +107,11 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
// too big and has no mappings
for (int i = 0; i < docCount; ++i) {
String stateId = snapshotId + '_' + i;
bulkRequestBuilder.add(
client.prepareDelete(indexName, ModelState.TYPE, stateId));
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE, stateId));
++deletedModelStateCount;
}
bulkRequestBuilder.add(
client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId));
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId));
++deletedModelSnapshotCount;
}
@ -188,12 +128,16 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
JobResultsPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId()));
}
@Override
public void deleteInterimResults() {
String index = JobResultsPersister.getJobIndexName(jobId);
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(JobResultsPersister.getJobIndexName(jobId))
SearchResponse searchResponse = client.prepareSearch(index)
.setTypes(Result.RESULT_TYPE.getPreferredName())
.setQuery(qb)
.setFetchSource(false)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
@ -204,19 +148,10 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
LOGGER.trace("Search hit for bucket: {}, {}", hit.toString(), hit.getId());
String type = (String) hit.getSource().get(Result.RESULT_TYPE.getPreferredName());
if (Bucket.RESULT_TYPE_VALUE.equals(type)) {
++deletedBucketCount;
} else if (AnomalyRecord.RESULT_TYPE_VALUE.equals(type)) {
++deletedRecordCount;
} else if (BucketInfluencer.RESULT_TYPE_VALUE.equals(type)) {
++deletedBucketInfluencerCount;
} else if (Influencer.RESULT_TYPE_VALUE.equals(type)) {
++deletedInfluencerCount;
}
LOGGER.trace("Search hit for result: {}", hit.getId());
++totalDeletedCount;
addDeleteRequest(hit);
addDeleteRequest(hit, index);
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
@ -234,16 +169,9 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
return;
}
if (!quiet) {
LOGGER.debug("Requesting deletion of "
+ deletedBucketCount + " buckets, "
+ deletedRecordCount + " records, "
+ deletedBucketInfluencerCount + " bucket influencers, "
+ deletedInfluencerCount + " influencers, "
+ deletedModelSnapshotCount + " model snapshots, "
+ " and "
+ deletedModelStateCount + " model state documents");
}
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
try {
bulkRequestBuilder.execute(listener);

View File

@ -9,31 +9,16 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
public interface JobDataDeleter {
/**
* Delete a {@code Bucket} and its records
*
* @param bucket the bucket to delete
*/
void deleteBucket(Bucket bucket);
/**
* Delete the records of a {@code Bucket}
* Delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
*
* @param bucket the bucket whose records to delete
* @param cutoffEpochMs Results at and after this time will be deleted
*/
void deleteRecords(Bucket bucket);
/**
* Delete an {@code Influencer}
*
* @param influencer the influencer to delete
*/
void deleteInfluencer(Influencer influencer);
void deleteResultsFromTime(long cutoffEpochMs);
/**
* Delete a {@code ModelSnapshot}
@ -56,6 +41,11 @@ public interface JobDataDeleter {
*/
void deleteModelSizeStats(ModelSizeStats modelSizeStats);
/**
* Delete all results marked as interim
*/
void deleteInterimResults();
/**
* Commit the deletions without enforcing the removal of data from disk
*/

View File

@ -8,10 +8,7 @@ package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import java.util.Date;
import java.util.Deque;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
/**
@ -20,11 +17,9 @@ import java.util.function.Function;
*/
public class OldDataRemover {
private final JobProvider jobProvider;
private final Function<String, ElasticsearchBulkDeleter> dataDeleterFactory;
public OldDataRemover(JobProvider jobProvider, Function<String, ElasticsearchBulkDeleter> dataDeleterFactory) {
this.jobProvider = Objects.requireNonNull(jobProvider);
public OldDataRemover(Function<String, ElasticsearchBulkDeleter> dataDeleterFactory) {
this.dataDeleterFactory = Objects.requireNonNull(dataDeleterFactory);
}
@ -32,34 +27,8 @@ public class OldDataRemover {
* Removes results between the time given and the current time
*/
public void deleteResultsAfter(ActionListener<BulkResponse> listener, String jobId, long cutoffEpochMs) {
Date now = new Date();
JobDataDeleter deleter = dataDeleterFactory.apply(jobId);
deleteResultsWithinRange(jobId, deleter, cutoffEpochMs, now.getTime());
deleter.deleteResultsFromTime(cutoffEpochMs);
deleter.commit(listener);
}
private void deleteResultsWithinRange(String jobId, JobDataDeleter deleter, long start, long end) {
deleteBatchedData(
jobProvider.newBatchedInfluencersIterator(jobId).timeRange(start, end),
deleter::deleteInfluencer
);
deleteBatchedData(
jobProvider.newBatchedBucketsIterator(jobId).timeRange(start, end),
deleter::deleteBucket
);
}
private <T> void deleteBatchedData(BatchedDocumentsIterator<T> resultsIterator,
Consumer<T> deleteFunction) {
while (resultsIterator.hasNext()) {
Deque<T> batch = resultsIterator.next();
if (batch.isEmpty()) {
return;
}
for (T result : batch) {
deleteFunction.accept(result);
}
}
}
}

View File

@ -37,8 +37,7 @@ setup:
"jobId": "farequote",
"result_type": "record",
"timestamp": "2016-06-01T00:00:00Z",
"anomalyScore": 60.0,
"result_type": "record"
"anomalyScore": 60.0
}
- do:
@ -51,8 +50,7 @@ setup:
"jobId": "farequote",
"result_type": "record",
"timestamp": "2016-06-02T00:00:00Z",
"anomalyScore": 80.0,
"result_type": "record"
"anomalyScore": 80.0
}
- do:

View File

@ -68,6 +68,49 @@ setup:
id: "foo_1462060800000_1"
body: { "jobId": "foo", "result_type": "bucket", "timestamp": "2016-05-01T00:00:00Z", "bucketSpan":1 }
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1464825600000_1_record"
body: { "jobId": "foo", "result_type": "record", "timestamp": "2016-06-02T00:00:00Z" }
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1462060800000_1_record"
body: { "jobId": "foo", "result_type": "record", "timestamp": "2016-05-01T00:00:00Z" }
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1464825600000_1_influencer"
body: {
"jobId": "foo",
"result_type": "influencer",
"timestamp": "2016-06-02T00:00:00Z",
"influencerFieldName": "foo",
"influencerFieldValue": "zoo",
"anomalyScore": 50.0
}
- do:
index:
index: prelertresults-foo
type: result
id: "foo_1462060800000_1_influencer"
body:
{
"jobId": "foo",
"result_type": "influencer",
"timestamp": "2016-05-01T00:00:00Z",
"influencerFieldName": "foo",
"influencerFieldValue": "zoo",
"anomalyScore": 50.0
}
- do:
indices.refresh:
index: prelertresults-foo
@ -211,6 +254,26 @@ setup:
- match: { buckets.0.jobId: "foo" }
- match: { buckets.0.timestamp: 1462060800000 }
- do:
xpack.prelert.get_records:
job_id: "foo"
start: "2016-01-01T00:00:00Z"
end: "2016-12-01T00:00:00Z"
- match: { count: 1 }
- match: { records.0.jobId: "foo" }
- match: { records.0.timestamp: 1462060800000 }
- do:
xpack.prelert.get_influencers:
job_id: "foo"
start: "2016-01-01T00:00:00Z"
end: "2016-12-01T01:00:00Z"
- match: { count: 1 }
- match: { influencers.0.jobId: "foo" }
- match: { influencers.0.timestamp: 1462060800000 }
- do:
xpack.prelert.get_jobs:
job_id: foo