diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index dfe3b78cb04..69c8e742ff1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -57,7 +57,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator; import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService; import org.elasticsearch.xpack.prelert.job.metadata.PrelertInitializationService; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; -import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister; @@ -181,7 +181,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { jobManager, new JobAllocator(settings, clusterService, threadPool), new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()), - new ElasticsearchBulkDeleterFactory(client), //NORELEASE: this should use Delete-by-query + new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query dataProcessor, new PrelertInitializationService(settings, threadPool, clusterService, jobProvider), jobDataCountsPersister diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteModelSnapshotAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteModelSnapshotAction.java index 95ba57a314d..51e4d337267 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteModelSnapshotAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/DeleteModelSnapshotAction.java @@ -29,9 +29,9 @@ import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleter; -import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; @@ -134,13 +134,13 @@ public class DeleteModelSnapshotAction extends Action() { @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java index edda4308ae6..212a04a2842 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/RevertModelSnapshotAction.java @@ -43,7 +43,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; -import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; @@ -311,13 +311,13 @@ extends Action listener) { - String index = JobResultsPersister.getJobIndexName(jobId); - - RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP); - timeRange.gte(cutoffEpochMs); - timeRange.lt(new Date().getTime()); - - RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener); - - client.prepareSearch(index) - .setTypes(Result.TYPE.getPreferredName()) - .setFetchSource(false) - .setQuery(timeRange) - .setScroll(SCROLL_CONTEXT_DURATION) - .setSize(SCROLL_SIZE) - .execute(scrollSearchListener); - } - - private void addDeleteRequestForSearchHits(SearchHits hits, String index) { - for (SearchHit hit : hits.hits()) { - LOGGER.trace("Search hit for result: {}", hit.getId()); - addDeleteRequest(hit, index); - } - deletedResultCount = hits.getTotalHits(); - } - - private void addDeleteRequest(SearchHit hit, String index) { - DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client) - .setIndex(index) - .setType(hit.getType()) - .setId(hit.getId()); - bulkRequestBuilder.add(deleteRequest); - } - - @Override - public void deleteModelSnapshot(ModelSnapshot modelSnapshot) { - String snapshotId = modelSnapshot.getSnapshotId(); - int docCount = modelSnapshot.getSnapshotDocCount(); - String indexName = JobResultsPersister.getJobIndexName(jobId); - // Deduce the document IDs of the state documents from the information - // in the snapshot document - we cannot query the state itself as it's - // too big and has no mappings - for (int i = 0; i < docCount; ++i) { - String stateId = snapshotId + '_' + i; - bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId)); - ++deletedModelStateCount; - } - - bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId)); - ++deletedModelSnapshotCount; - } - - @Override - public void deleteModelDebugOutput(ModelDebugOutput modelDebugOutput) { - String id = modelDebugOutput.getId(); - bulkRequestBuilder.add( - client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), ModelDebugOutput.TYPE.getPreferredName(), id)); - } - - @Override - public void deleteModelSizeStats(ModelSizeStats modelSizeStats) { - bulkRequestBuilder.add(client.prepareDelete( - JobResultsPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId())); - } - - @Override - public void deleteInterimResults() { - String index = JobResultsPersister.getJobIndexName(jobId); - - QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); - - SearchResponse searchResponse = client.prepareSearch(index) - .setTypes(Result.RESULT_TYPE.getPreferredName()) - .setQuery(qb) - .setFetchSource(false) - .addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) - .setScroll(SCROLL_CONTEXT_DURATION) - .setSize(SCROLL_SIZE) - .get(); - - String scrollId = searchResponse.getScrollId(); - long totalHits = searchResponse.getHits().totalHits(); - long totalDeletedCount = 0; - while (totalDeletedCount < totalHits) { - for (SearchHit hit : searchResponse.getHits()) { - LOGGER.trace("Search hit for result: {}", hit.getId()); - ++totalDeletedCount; - addDeleteRequest(hit, index); - ++deletedResultCount; - } - - searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get(); - } - } - - /** - * Commits the deletions and if {@code forceMerge} is {@code true}, it - * forces a merge which removes the data from disk. - */ - @Override - public void commit(ActionListener listener) { - if (bulkRequestBuilder.numberOfActions() == 0) { - listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); - return; - } - - Level logLevel = quiet ? Level.DEBUG : Level.INFO; - LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", - deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); - - try { - bulkRequestBuilder.execute(listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - - /** - * Repeats a scroll search adding the hits a bulk delete request - */ - private class RepeatingSearchScrollListener implements ActionListener { - - private final AtomicLong totalDeletedCount; - private final String index; - private final ActionListener scrollFinishedListener; - - RepeatingSearchScrollListener(String index, ActionListener scrollFinishedListener) { - totalDeletedCount = new AtomicLong(0L); - this.index = index; - this.scrollFinishedListener = scrollFinishedListener; - } - - @Override - public void onResponse(SearchResponse searchResponse) { - addDeleteRequestForSearchHits(searchResponse.getHits(), index); - - totalDeletedCount.addAndGet(searchResponse.getHits().hits().length); - if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) { - client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION) - .execute(this); - } - else { - scrollFinishedListener.onResponse(true); - } - } - - @Override - public void onFailure(Exception e) { - scrollFinishedListener.onFailure(e); - } - }; -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterFactory.java deleted file mode 100644 index 3acaf471947..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.persistence; - -import org.elasticsearch.client.Client; - -import java.util.function.Function; - -/** - * TODO This is all just silly static typing shenanigans because Guice can't inject - * anonymous lambdas. This can all be removed once Guice goes away. - */ -public class ElasticsearchBulkDeleterFactory implements Function { - - private final Client client; - - public ElasticsearchBulkDeleterFactory(Client client) { - this.client = client; - } - - @Override - public ElasticsearchBulkDeleter apply(String jobId) { - return new ElasticsearchBulkDeleter(client, jobId); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java index cbaa5730881..8f86591a5ca 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java @@ -5,13 +5,62 @@ */ package org.elasticsearch.xpack.prelert.job.persistence; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteAction; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; +import org.elasticsearch.xpack.prelert.job.ModelState; +import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; +import org.elasticsearch.xpack.prelert.job.results.Result; -public interface JobDataDeleter { +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class JobDataDeleter { + + private static final Logger LOGGER = Loggers.getLogger(JobDataDeleter.class); + + private static final int SCROLL_SIZE = 1000; + private static final String SCROLL_CONTEXT_DURATION = "5m"; + + private final Client client; + private final String jobId; + private final BulkRequestBuilder bulkRequestBuilder; + private long deletedResultCount; + private long deletedModelSnapshotCount; + private long deletedModelStateCount; + private boolean quiet; + + public JobDataDeleter(Client client, String jobId) { + this(client, jobId, false); + } + + public JobDataDeleter(Client client, String jobId, boolean quiet) { + this.client = Objects.requireNonNull(client); + this.jobId = Objects.requireNonNull(jobId); + bulkRequestBuilder = client.prepareBulk(); + deletedResultCount = 0; + deletedModelSnapshotCount = 0; + deletedModelStateCount = 0; + this.quiet = quiet; + } /** * Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime} @@ -19,36 +68,164 @@ public interface JobDataDeleter { * @param cutoffEpochMs Results at and after this time will be deleted * @param listener Response listener */ - void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener); + public void deleteResultsFromTime(long cutoffEpochMs, ActionListener listener) { + String index = JobResultsPersister.getJobIndexName(jobId); + + RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP); + timeRange.gte(cutoffEpochMs); + timeRange.lt(new Date().getTime()); + + RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener); + + client.prepareSearch(index) + .setTypes(Result.TYPE.getPreferredName()) + .setFetchSource(false) + .setQuery(timeRange) + .setScroll(SCROLL_CONTEXT_DURATION) + .setSize(SCROLL_SIZE) + .execute(scrollSearchListener); + } + + private void addDeleteRequestForSearchHits(SearchHits hits, String index) { + for (SearchHit hit : hits.hits()) { + LOGGER.trace("Search hit for result: {}", hit.getId()); + addDeleteRequest(hit, index); + } + deletedResultCount = hits.getTotalHits(); + } + + private void addDeleteRequest(SearchHit hit, String index) { + DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client) + .setIndex(index) + .setType(hit.getType()) + .setId(hit.getId()); + bulkRequestBuilder.add(deleteRequest); + } /** * Delete a {@code ModelSnapshot} * * @param modelSnapshot the model snapshot to delete */ - void deleteModelSnapshot(ModelSnapshot modelSnapshot); + public void deleteModelSnapshot(ModelSnapshot modelSnapshot) { + String snapshotId = modelSnapshot.getSnapshotId(); + int docCount = modelSnapshot.getSnapshotDocCount(); + String indexName = JobResultsPersister.getJobIndexName(jobId); + // Deduce the document IDs of the state documents from the information + // in the snapshot document - we cannot query the state itself as it's + // too big and has no mappings + for (int i = 0; i < docCount; ++i) { + String stateId = snapshotId + '_' + i; + bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId)); + ++deletedModelStateCount; + } - /** - * Delete a {@code ModelDebugOutput} record - * - * @param modelDebugOutput to delete - */ - void deleteModelDebugOutput(ModelDebugOutput modelDebugOutput); - - /** - * Delete a {@code ModelSizeStats} record - * - * @param modelSizeStats to delete - */ - void deleteModelSizeStats(ModelSizeStats modelSizeStats); + bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId)); + ++deletedModelSnapshotCount; + } /** * Delete all results marked as interim */ - void deleteInterimResults(); + public void deleteInterimResults() { + String index = JobResultsPersister.getJobIndexName(jobId); + + QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); + + SearchResponse searchResponse = client.prepareSearch(index) + .setTypes(Result.RESULT_TYPE.getPreferredName()) + .setQuery(qb) + .setFetchSource(false) + .addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) + .setScroll(SCROLL_CONTEXT_DURATION) + .setSize(SCROLL_SIZE) + .get(); + + String scrollId = searchResponse.getScrollId(); + long totalHits = searchResponse.getHits().totalHits(); + long totalDeletedCount = 0; + while (totalDeletedCount < totalHits) { + for (SearchHit hit : searchResponse.getHits()) { + LOGGER.trace("Search hit for result: {}", hit.getId()); + ++totalDeletedCount; + addDeleteRequest(hit, index); + ++deletedResultCount; + } + + searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get(); + } + } /** * Commit the deletions without enforcing the removal of data from disk */ - void commit(ActionListener listener); + public void commit(ActionListener listener) { + if (bulkRequestBuilder.numberOfActions() == 0) { + listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L)); + return; + } + + Level logLevel = quiet ? Level.DEBUG : Level.INFO; + LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", + deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); + + try { + bulkRequestBuilder.execute(listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Blocking version of {@linkplain #commit(ActionListener)} + */ + public void commit() { + if (bulkRequestBuilder.numberOfActions() == 0) { + return; + } + + Level logLevel = quiet ? Level.DEBUG : Level.INFO; + LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents", + deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount); + + BulkResponse response = bulkRequestBuilder.get(); + if (response.hasFailures()) { + LOGGER.debug("Bulk request has failures. {}", response.buildFailureMessage()); + } + } + + /** + * Repeats a scroll search adding the hits a bulk delete request + */ + private class RepeatingSearchScrollListener implements ActionListener { + + private final AtomicLong totalDeletedCount; + private final String index; + private final ActionListener scrollFinishedListener; + + RepeatingSearchScrollListener(String index, ActionListener scrollFinishedListener) { + totalDeletedCount = new AtomicLong(0L); + this.index = index; + this.scrollFinishedListener = scrollFinishedListener; + } + + @Override + public void onResponse(SearchResponse searchResponse) { + addDeleteRequestForSearchHits(searchResponse.getHits(), index); + + totalDeletedCount.addAndGet(searchResponse.getHits().hits().length); + if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) { + client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION) + .execute(this); + } + else { + scrollFinishedListener.onResponse(true); + } + } + + @Override + public void onFailure(Exception e) { + scrollFinishedListener.onFailure(e); + } + }; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterFactory.java index 1bf8c9e2ad6..fa140900fa2 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterFactory.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterFactory.java @@ -5,6 +5,24 @@ */ package org.elasticsearch.xpack.prelert.job.persistence; -public interface JobDataDeleterFactory { - JobDataDeleter newDeleter(String jobId); +import org.elasticsearch.client.Client; + +import java.util.function.Function; + +/** + * TODO This is all just silly static typing shenanigans because Guice can't inject + * anonymous lambdas. This can all be removed once Guice goes away. + */ +public class JobDataDeleterFactory implements Function { + + private final Client client; + + public JobDataDeleterFactory(Client client) { + this.client = client; + } + + @Override + public JobDataDeleter apply(String jobId) { + return new JobDataDeleter(client, jobId); + } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java index dc60b2e6c63..dba4c6d87a8 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobRenormaliser.java @@ -5,36 +5,30 @@ */ package org.elasticsearch.xpack.prelert.job.persistence; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.Influencer; import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; -import org.elasticsearch.xpack.prelert.job.results.Result; -import java.io.IOException; import java.util.List; /** * Interface for classes that update {@linkplain Bucket Buckets} * for a particular job with new normalised anomaly scores and - * unusual scores + * unusual scores. + * + * Renormalised results already have an ID having been indexed at least + * once before that same ID should be used on persistence */ public class JobRenormaliser extends AbstractComponent { - private final Client client; private final JobResultsPersister jobResultsPersister; - public JobRenormaliser(Settings settings, Client client, JobResultsPersister jobResultsPersister) { + public JobRenormaliser(Settings settings, JobResultsPersister jobResultsPersister) { super(settings); - this.client = client; this.jobResultsPersister = jobResultsPersister; } @@ -45,82 +39,44 @@ public class JobRenormaliser extends AbstractComponent { * @param bucket the bucket to update */ public void updateBucket(Bucket bucket) { - String jobId = bucket.getJobId(); - try { - String indexName = JobResultsPersister.getJobIndexName(jobId); - logger.trace("[{}] ES API CALL: update result type {} to index {} with ID {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName, - bucket.getId()); - client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()) - .setSource(jobResultsPersister.toXContentBuilder(bucket)).execute().actionGet(); - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e)); - return; - } - - // If the update to the bucket was successful, also update the - // standalone copies of the nested bucket influencers - try { - jobResultsPersister.persistBucketInfluencersStandalone(bucket.getJobId(), bucket.getId(), bucket.getBucketInfluencers(), - bucket.getTimestamp(), bucket.isInterim()); - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error updating standalone bucket influencer state", new Object[]{jobId}, e)); - return; - } + jobResultsPersister.bulkPersisterBuilder(bucket.getJobId()).persistBucket(bucket).executeRequest(); } - /** - * Update the anomaly records for a particular bucket and job. - * The anomaly records are updated with the values in the - * records list. + * Update the anomaly records for a particular job. + * The anomaly records are updated with the values in records and + * stored with the ID returned by {@link AnomalyRecord#getId()} * - * @param bucketId Id of the bucket to update - * @param records The new record values + * @param jobId Id of the job to update + * @param records The updated records */ - public void updateRecords(String jobId, String bucketId, List records) { - try { - // Now bulk update the records within the bucket - BulkRequestBuilder bulkRequest = client.prepareBulk(); - boolean addedAny = false; - for (AnomalyRecord record : records) { - String recordId = record.getId(); - String indexName = JobResultsPersister.getJobIndexName(jobId); - logger.trace("[{}] ES BULK ACTION: update ID {} result type {} in index {} using map of new values, for bucket {}", - jobId, recordId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, bucketId); - - bulkRequest.add( - client.prepareIndex(indexName, Result.TYPE.getPreferredName(), recordId) - .setSource(jobResultsPersister.toXContentBuilder(record))); - - addedAny = true; - } - - if (addedAny) { - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - if (bulkResponse.hasFailures()) { - logger.error("[{}] BulkResponse has errors: {}", jobId, bulkResponse.buildFailureMessage()); - } - } - } catch (IOException | ElasticsearchException e) { - logger.error(new ParameterizedMessage("[{}] Error updating anomaly records", new Object[]{jobId}, e)); - } - } - - public void updatePerPartitionMaxProbabilities(String jobId, List records) { - PerPartitionMaxProbabilities ppMaxProbs = - new PerPartitionMaxProbabilities(records); - - logger.trace("[{}] ES API CALL: update result type {} with ID {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, ppMaxProbs.getId()); - jobResultsPersister.persistPerPartitionMaxProbabilities(ppMaxProbs); + public void updateRecords(String jobId, List records) { + jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records, false).executeRequest(); } /** - * Update the influencer for a particular job + * Create a {@link PerPartitionMaxProbabilities} object from this list of records and persist + * with the given ID. + * + * @param jobId Id of the job to update + * @param documentId The ID the {@link PerPartitionMaxProbabilities} document should be persisted with + * @param records Source of the new {@link PerPartitionMaxProbabilities} object */ - public void updateInfluencer(Influencer influencer) { - jobResultsPersister.persistInfluencer(influencer); + public void updatePerPartitionMaxProbabilities(String jobId, String documentId, List records) { + PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records); + ppMaxProbs.setId(documentId); + jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs, false).executeRequest(); + } + + /** + * Update the influencer for a particular job. + * The Influencer's are stored with the ID in {@link Influencer#getId()} + * + * @param jobId Id of the job to update + * @param influencers The updated influencers + */ + public void updateInfluencer(String jobId, List influencers) { + jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers, false).executeRequest(); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index 98fbb3e3f71..cf721bf993e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.persistence; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -31,26 +30,24 @@ import org.elasticsearch.xpack.prelert.job.results.Result; import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** - * Saves result Buckets and Quantiles to Elasticsearch
- *

- * Buckets are written with the following structure: - *

Bucket

The results of each job are stored in buckets, this is the - * top level structure for the results. A bucket contains multiple anomaly - * records. The anomaly score of the bucket may not match the summed score of - * all the records as all the records may not have been outputted for the - * bucket. - *

Anomaly Record

Each record was generated by a detector which can be identified via + * Persists result types, Quantiles etc to Elasticsearch
+ *

Bucket

Bucket result. The anomaly score of the bucket may not match the summed + * score of all the records as all the records may not have been outputted for the + * bucket. Contains bucket influencers that are persisted both with the bucket + * and separately. + * Anomaly Record Each record was generated by a detector which can be identified via * the detectorIndex field. - *

Detector

The Job has a fixed number of detectors but there may not - * be output for every detector in each bucket.
+ * Influencers * Quantiles may contain model quantiles used in normalisation and are * stored in documents of type {@link Quantiles#TYPE}
- *

ModelSizeStats

This is stored in a flat structure
+ * ModelSizeStats This is stored in a flat structure
+ * ModelSnapShot This is stored in a flat structure
* * @see org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchMappings */ @@ -58,112 +55,174 @@ public class JobResultsPersister extends AbstractComponent { private final Client client; + public JobResultsPersister(Settings settings, Client client) { super(settings); this.client = client; } - /** - * Persist the result bucket - */ - public void persistBucket(Bucket bucket) { - String jobId = bucket.getJobId(); - try { - XContentBuilder content = toXContentBuilder(bucket); - String indexName = getJobIndexName(jobId); - logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName, - bucket.getEpoch()); - client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content).execute() - .actionGet(); - persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(), - bucket.isInterim()); - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e)); - } - + public Builder bulkPersisterBuilder(String jobId) { + return new Builder(jobId); } - /** - * Persist a list of anomaly records - * - * @param records the records to persist - */ - public void persistRecords(List records) { - if (records.isEmpty()) { - return; - } - String jobId = records.get(0).getJobId(); - String indexName = getJobIndexName(jobId); - BulkRequestBuilder addRecordsRequest = client.prepareBulk(); - XContentBuilder content = null; - try { - for (AnomalyRecord record : records) { - content = toXContentBuilder(record); + public class Builder { + private BulkRequestBuilder bulkRequest; + private final String jobId; + private final String indexName; - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", - jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName); - addRecordsRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); + private Builder (String jobId) { + this.jobId = Objects.requireNonNull(jobId); + indexName = getJobIndexName(jobId); + bulkRequest = client.prepareBulk(); + } + + /** + * Persist the result bucket and its bucket influencers + * Buckets are persisted with a consistent ID + * + * @param bucket The bucket to persist + * @return this + */ + public Builder persistBucket(Bucket bucket) { + try { + XContentBuilder content = toXContentBuilder(bucket); + logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", + jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucket.getEpoch()); + + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content)); + + persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(), + bucket.isInterim()); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e)); } - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object [] {jobId}, e)); - return; + + return this; } - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions()); - BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet(); - if (addRecordsResponse.hasFailures()) { - logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); - } - } - - /** - * Persist a list of influencers - * - * @param influencers the influencers to persist - */ - public void persistInfluencers(List influencers) { - if (influencers.isEmpty()) { - return; - } - String jobId = influencers.get(0).getJobId(); - String indexName = getJobIndexName(jobId); - BulkRequestBuilder addInfluencersRequest = client.prepareBulk(); - XContentBuilder content = null; - try { - for (Influencer influencer : influencers) { - content = toXContentBuilder(influencer); - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", - jobId, Influencer.RESULT_TYPE_VALUE, indexName); - addInfluencersRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); + private void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers, + Date bucketTime, boolean isInterim) throws IOException { + if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { + for (BucketInfluencer bucketInfluencer : bucketInfluencers) { + XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim); + // Need consistent IDs to ensure overwriting on renormalisation + String id = bucketId + bucketInfluencer.getInfluencerFieldName(); + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", + jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content)); + } } - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error persisting influencers", new Object[] {jobId}, e)); - return; } - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions()); - BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet(); - if (addInfluencersResponse.hasFailures()) { - logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage()); + /** + * Persist a list of anomaly records + * + * @param records the records to persist + * @param autoGenerateId If true then persist the influencer with an auto generated ID + * else use {@link AnomalyRecord#getId()} + * @return this + */ + public Builder persistRecords(List records, boolean autoGenerateId) { + + try { + for (AnomalyRecord record : records) { + XContentBuilder content = toXContentBuilder(record); + + if (autoGenerateId) { + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", + jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); + } + else { + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", + jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId()); + bulkRequest.add( + client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content)); + } + } + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e)); + } + + return this; + } + + /** + * Persist a list of influencers optionally using each influencer's ID or + * an auto generated ID + * + * @param influencers the influencers to persist + * @param autoGenerateId If true then persist the influencer with an auto generated ID + * else use {@link Influencer#getId()} + * @return this + */ + public Builder persistInfluencers(List influencers, boolean autoGenerateId) { + try { + for (Influencer influencer : influencers) { + XContentBuilder content = toXContentBuilder(influencer); + if (autoGenerateId) { + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID", + jobId, Influencer.RESULT_TYPE_VALUE, indexName); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content)); + } + else { + logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", + jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId()); + bulkRequest.add( + client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content)); + } + } + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e)); + } + + return this; + } + + /** + * Persist {@link PerPartitionMaxProbabilities} + * + * @param partitionProbabilities The probabilities to persist + * @param autoGenerateId If true then persist the PerPartitionMaxProbabilities with an auto generated ID + * else use {@link PerPartitionMaxProbabilities#getId()} + * @return this + */ + public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities, boolean autoGenerateId) { + try { + XContentBuilder builder = toXContentBuilder(partitionProbabilities); + + if (autoGenerateId) { + logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}", + jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp()); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(builder)); + } + else { + logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}", + jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(), + partitionProbabilities.getId()); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()) + .setSource(builder)); + } + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", + new Object[]{jobId}, e)); + } + + return this; + } + + /** + * Execute the bulk action + */ + public void executeRequest() { + logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); + BulkResponse addRecordsResponse = bulkRequest.execute().actionGet(); + if (addRecordsResponse.hasFailures()) { + logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); + } } } - public void persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) { - String jobId = partitionProbabilities.getJobId(); - try { - XContentBuilder builder = toXContentBuilder(partitionProbabilities); - - String indexName = getJobIndexName(jobId); - logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}", - jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp()); - client.prepareIndex(indexName, Result.TYPE.getPreferredName()) - .setSource(builder) - .execute().actionGet(); - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores", - new Object[]{jobId}, e)); - } - } /** * Persist the category definition @@ -172,7 +231,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE::getPreferredName, - () -> String.valueOf(category.getCategoryId()), () -> serialiseCategoryDefinition(category)); + () -> String.valueOf(category.getCategoryId()), () -> toXContentBuilder(category)); persistable.persist(); // Don't commit as we expect masses of these updates and they're not // read again by this process @@ -230,17 +289,6 @@ public class JobResultsPersister extends AbstractComponent { // read again by this process } - /** - * Persist the influencer - */ - public void persistInfluencer(Influencer influencer) { - Persistable persistable = new Persistable(influencer.getJobId(), influencer, Result.TYPE::getPreferredName, - influencer::getId, () -> toXContentBuilder(influencer)); - persistable.persist(); - // Don't commit as we expect masses of these updates and they're not - // read again by this process - } - /** * Persist state sent from the native process */ @@ -259,22 +307,12 @@ public class JobResultsPersister extends AbstractComponent { } /** - * Delete any existing interim results + * Delete any existing interim results synchronously */ public void deleteInterimResults(String jobId) { - ElasticsearchBulkDeleter deleter = new ElasticsearchBulkDeleter(client, jobId, true); + JobDataDeleter deleter = new JobDataDeleter(client, jobId, true); deleter.deleteInterimResults(); - deleter.commit(new ActionListener() { - @Override - public void onResponse(BulkResponse bulkResponse) { - // don't care? - } - - @Override - public void onFailure(Exception e) { - // don't care? - } - }); + deleter.commit(); } /** @@ -299,35 +337,6 @@ public class JobResultsPersister extends AbstractComponent { return builder; } - private XContentBuilder serialiseCategoryDefinition(CategoryDefinition categoryDefinition) throws IOException { - XContentBuilder builder = jsonBuilder(); - categoryDefinition.toXContent(builder, ToXContent.EMPTY_PARAMS); - return builder; - } - - void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers, - Date bucketTime, boolean isInterim) throws IOException { - if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { - BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk(); - for (BucketInfluencer bucketInfluencer : bucketInfluencers) { - XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim); - // Need consistent IDs to ensure overwriting on renormalisation - String id = bucketId + bucketInfluencer.getInfluencerFieldName(); - String indexName = getJobIndexName(jobId); - logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", jobId, BucketInfluencer.RESULT_TYPE_VALUE, - indexName, id); - addBucketInfluencersRequest.add( - client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content)); - } - logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addBucketInfluencersRequest.numberOfActions()); - BulkResponse addBucketInfluencersResponse = addBucketInfluencersRequest.execute().actionGet(); - if (addBucketInfluencersResponse.hasFailures()) { - logger.error("[{}] Bulk index of Bucket Influencers has errors: {}", jobId, - addBucketInfluencersResponse.buildFailureMessage()); - } - } - } - private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer, Date bucketTime, boolean isInterim) throws IOException { BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java index 8c244a34112..08fe2c38596 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/OldDataRemover.java @@ -17,9 +17,9 @@ import java.util.function.Function; */ public class OldDataRemover { - private final Function dataDeleterFactory; + private final Function dataDeleterFactory; - public OldDataRemover(Function dataDeleterFactory) { + public OldDataRemover(Function dataDeleterFactory) { this.dataDeleterFactory = Objects.requireNonNull(dataDeleterFactory); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index d57548c6b55..a6d8ef16731 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -31,12 +31,21 @@ import java.util.concurrent.CountDownLatch; import java.util.stream.Stream; /** - * A runnable class that reads the autodetect process output - * and writes the results via the {@linkplain JobResultsPersister} - * passed in the constructor. + * A runnable class that reads the autodetect process output in the + * {@link #process(String, InputStream, boolean)} method and persists parsed + * results via the {@linkplain JobResultsPersister} passed in the constructor. *

* Has methods to register and remove alert observers. * Also has a method to wait for a flush to be complete. + * + * Buckets are the written last after records, influencers etc + * when the end of bucket is reached. Therefore results aren't persisted + * until the bucket is read, this means that interim results for all + * result types can be safely deleted when the bucket is read and before + * the new results are updated. This is specifically for the case where + * a flush command is issued repeatedly in the same bucket to generate + * interim results and the old interim results have to be cleared out + * before the new ones are written. */ public class AutoDetectResultProcessor { @@ -52,10 +61,7 @@ public class AutoDetectResultProcessor { private volatile ModelSizeStats latestModelSizeStats; public AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser) { - this.renormaliser = renormaliser; - this.persister = persister; - this.parser = parser; - this.flushListener = new FlushListener(); + this(renormaliser, persister, parser, new FlushListener()); } AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser, @@ -70,7 +76,7 @@ public class AutoDetectResultProcessor { try (Stream stream = parser.parseResults(in)) { int bucketCount = 0; Iterator iterator = stream.iterator(); - Context context = new Context(jobId, isPerPartitionNormalisation); + Context context = new Context(jobId, isPerPartitionNormalisation, persister.bulkPersisterBuilder(jobId)); while (iterator.hasNext()) { AutodetectResult result = iterator.next(); processResult(context, result); @@ -94,28 +100,30 @@ public class AutoDetectResultProcessor { Bucket bucket = result.getBucket(); if (bucket != null) { if (context.deleteInterimRequired) { - // Delete any existing interim results at the start - // of a job upload: - // these are generated by a Flush command, and will - // be replaced or - // superseded by new results + // Delete any existing interim results generated by a Flush command + // which have not been replaced or superseded by new results. LOGGER.trace("[{}] Deleting interim results", context.jobId); - // TODO: Is this the right place to delete results? persister.deleteInterimResults(context.jobId); context.deleteInterimRequired = false; } - persister.persistBucket(bucket); + + // persist after deleting interim results in case the new + // results are also interim + context.bulkResultsPersister.persistBucket(bucket); + + context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId); + } List records = result.getRecords(); if (records != null && !records.isEmpty()) { - persister.persistRecords(records); + context.bulkResultsPersister.persistRecords(records, true); if (context.isPerPartitionNormalization) { - persister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records)); + context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records), true); } } List influencers = result.getInfluencers(); if (influencers != null && !influencers.isEmpty()) { - persister.persistInfluencers(influencers); + context.bulkResultsPersister.persistInfluencers(influencers, true); } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { @@ -193,13 +201,15 @@ public class AutoDetectResultProcessor { private final String jobId; private final boolean isPerPartitionNormalization; + private JobResultsPersister.Builder bulkResultsPersister; boolean deleteInterimRequired; - Context(String jobId, boolean isPerPartitionNormalization) { + Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { this.jobId = jobId; this.isPerPartitionNormalization = isPerPartitionNormalization; this.deleteInterimRequired = true; + this.bulkResultsPersister = bulkResultsPersister; } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java similarity index 96% rename from elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterTests.java rename to elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java index 79fc17bc8cd..7d9f416ba77 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchBulkDeleterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleterTests.java @@ -24,7 +24,7 @@ import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; -public class ElasticsearchBulkDeleterTests extends ESTestCase { +public class JobDataDeleterTests extends ESTestCase { public void testDeleteResultsFromTime() { @@ -39,7 +39,7 @@ public class ElasticsearchBulkDeleterTests extends ESTestCase { .prepareSearchScrollExecuteListener(response) .prepareBulk(bulkResponse).build(); - ElasticsearchBulkDeleter bulkDeleter = new ElasticsearchBulkDeleter(client, "foo"); + JobDataDeleter bulkDeleter = new JobDataDeleter(client, "foo"); // because of the mocking this runs in the current thread bulkDeleter.deleteResultsFromTime(new Date().getTime(), new ActionListener() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index 9a6854a95ca..ca1df02e4b7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -64,7 +64,7 @@ public class JobResultsPersisterTests extends ESTestCase { bucket.setRecords(Arrays.asList(record)); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.persistBucket(bucket); + persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest(); List list = captor.getAllValues(); assertEquals(2, list.size()); @@ -122,7 +122,7 @@ public class JobResultsPersisterTests extends ESTestCase { r1.setTypical(typicals); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.persistRecords(records); + persister.bulkPersisterBuilder(JOB_ID).persistRecords(records, true).executeRequest(); List captured = captor.getAllValues(); assertEquals(1, captured.size()); @@ -164,7 +164,7 @@ public class JobResultsPersisterTests extends ESTestCase { influencers.add(inf); JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); - persister.persistInfluencers(influencers); + persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers, true).executeRequest(); List captured = captor.getAllValues(); assertEquals(1, captured.size()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 9ed332daa17..d882088735c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.stream.Stream; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -37,6 +38,8 @@ import static org.mockito.Mockito.when; public class AutoDetectResultProcessorTests extends ESTestCase { + private static final String JOB_ID = "_id"; + public void testProcess() { AutodetectResult autodetectResult = mock(AutodetectResult.class); @SuppressWarnings("unchecked") @@ -52,7 +55,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, parser); - processor.process("_id", mock(InputStream.class), randomBoolean()); + processor.process(JOB_ID, mock(InputStream.class), randomBoolean()); verify(renormaliser, times(1)).shutdown(); assertEquals(0, processor.completionLatch.getCount()); } @@ -60,33 +63,39 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_bucket() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); when(result.getBucket()).thenReturn(bucket); processor.processResult(context, result); - verify(persister, times(1)).persistBucket(bucket); - verify(persister, never()).deleteInterimResults("_id"); + verify(bulkBuilder, times(1)).persistBucket(bucket); + verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); + verify(persister, never()).deleteInterimResults(JOB_ID); verifyNoMoreInteractions(persister); } public void testProcessResult_bucket_deleteInterimRequired() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); when(result.getBucket()).thenReturn(bucket); processor.processResult(context, result); - verify(persister, times(1)).persistBucket(bucket); - verify(persister, times(1)).deleteInterimResults("_id"); + verify(bulkBuilder, times(1)).persistBucket(bucket); + verify(persister, times(1)).deleteInterimResults(JOB_ID); + verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); verifyNoMoreInteractions(persister); assertFalse(context.deleteInterimRequired); } @@ -94,9 +103,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_records() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo"); @@ -105,16 +116,18 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getRecords()).thenReturn(records); processor.processResult(context, result); - verify(persister, times(1)).persistRecords(records); + verify(bulkBuilder, times(1)).persistRecords(records, true); verifyNoMoreInteractions(persister); } public void testProcessResult_records_isPerPartitionNormalization() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); AnomalyRecord record1 = new AnomalyRecord("foo"); @@ -125,35 +138,38 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getRecords()).thenReturn(records); processor.processResult(context, result); - verify(persister, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); - verify(persister, times(1)).persistRecords(records); + verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class), eq(true)); + verify(bulkBuilder, times(1)).persistRecords(records, true); verifyNoMoreInteractions(persister); } public void testProcessResult_influencers() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); - Influencer influencer1 = new Influencer("foo", "infField", "infValue"); - Influencer influencer2 = new Influencer("foo", "infField2", "infValue2"); + Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue"); + Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2"); List influencers = Arrays.asList(influencer1, influencer2); when(result.getInfluencers()).thenReturn(influencers); processor.processResult(context, result); - verify(persister, times(1)).persistInfluencers(influencers); + verify(bulkBuilder, times(1)).persistInfluencers(influencers, true); verifyNoMoreInteractions(persister); } public void testProcessResult_categoryDefinition() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); @@ -167,19 +183,20 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_flushAcknowledgement() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); FlushListener flushListener = mock(FlushListener.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); - when(flushAcknowledgement.getId()).thenReturn("_id"); + when(flushAcknowledgement.getId()).thenReturn(JOB_ID); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processor.processResult(context, result); - verify(flushListener, times(1)).acknowledgeFlush("_id"); - verify(persister, times(1)).commitWrites("_id"); + verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); + verify(persister, times(1)).commitWrites(JOB_ID); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } @@ -187,14 +204,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); FlushListener flushListener = mock(FlushListener.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); - when(flushAcknowledgement.getId()).thenReturn("_id"); + when(flushAcknowledgement.getId()).thenReturn(JOB_ID); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); @@ -203,8 +221,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processor.processResult(context, result); inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); - inOrder.verify(persister, times(1)).commitWrites("_id"); - inOrder.verify(flushListener, times(1)).acknowledgeFlush("_id"); + inOrder.verify(persister, times(1)).commitWrites(JOB_ID); + inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } @@ -212,9 +230,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelDebugOutput() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelDebugOutput modelDebugOutput = mock(ModelDebugOutput.class); @@ -228,9 +247,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSizeStats() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); @@ -245,9 +265,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_modelSnapshot() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = mock(ModelSnapshot.class); @@ -261,9 +282,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); @@ -279,9 +301,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testProcessResult_quantiles_isPerPartitionNormalization() { Renormaliser renormaliser = mock(Renormaliser.class); JobResultsPersister persister = mock(JobResultsPersister.class); + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", true); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, true, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class);