Refactor delete interim results (elastic/elasticsearch#470)

* Collapse ElasticsearchBulkDeleter into JobDataDeleter

* Add blocking delete to JobDataDeleter

* Delete interim results only after all the results are parsed.

* Remove unused deleteModelSizeStats and deleteModelDebugOutput methods.

Document missing javadoc tags

Original commit: elastic/x-pack-elasticsearch@1997541673
This commit is contained in:
David Kyle 2016-12-07 11:23:27 +00:00 committed by GitHub
parent 14f43af818
commit ccf8cb7e0d
14 changed files with 513 additions and 564 deletions

View File

@ -57,7 +57,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator;
import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertInitializationService;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
@ -181,7 +181,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
jobManager,
new JobAllocator(settings, clusterService, threadPool),
new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()),
new ElasticsearchBulkDeleterFactory(client), //NORELEASE: this should use Delete-by-query
new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query
dataProcessor,
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister

View File

@ -29,9 +29,9 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleter;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -134,13 +134,13 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
private final JobProvider jobProvider;
private final JobManager jobManager;
private final ClusterService clusterService;
private final ElasticsearchBulkDeleterFactory bulkDeleterFactory;
private final JobDataDeleterFactory bulkDeleterFactory;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ElasticsearchJobProvider jobProvider, JobManager jobManager, ClusterService clusterService,
ElasticsearchBulkDeleterFactory bulkDeleterFactory) {
JobDataDeleterFactory bulkDeleterFactory) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.jobProvider = jobProvider;
this.jobManager = jobManager;
@ -181,7 +181,7 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
}
// Delete the snapshot and any associated state files
ElasticsearchBulkDeleter deleter = bulkDeleterFactory.apply(request.getJobId());
JobDataDeleter deleter = bulkDeleterFactory.apply(request.getJobId());
deleter.deleteModelSnapshot(deleteCandidate);
deleter.commit(new ActionListener<BulkResponse>() {
@Override

View File

@ -43,7 +43,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
@ -311,13 +311,13 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
private final JobManager jobManager;
private final JobProvider jobProvider;
private final ElasticsearchBulkDeleterFactory bulkDeleterFactory;
private final JobDataDeleterFactory bulkDeleterFactory;
private final JobDataCountsPersister jobDataCountsPersister;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, ElasticsearchJobProvider jobProvider,
ClusterService clusterService, ElasticsearchBulkDeleterFactory bulkDeleterFactory,
ClusterService clusterService, JobDataDeleterFactory bulkDeleterFactory,
JobDataCountsPersister jobDataCountsPersister) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;

View File

@ -1,216 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.ModelState;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class ElasticsearchBulkDeleter implements JobDataDeleter {
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchBulkDeleter.class);
private static final int SCROLL_SIZE = 1000;
private static final String SCROLL_CONTEXT_DURATION = "5m";
private final Client client;
private final String jobId;
private final BulkRequestBuilder bulkRequestBuilder;
private long deletedResultCount;
private long deletedModelSnapshotCount;
private long deletedModelStateCount;
private boolean quiet;
public ElasticsearchBulkDeleter(Client client, String jobId) {
this(client, jobId, false);
}
public ElasticsearchBulkDeleter(Client client, String jobId, boolean quiet) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
bulkRequestBuilder = client.prepareBulk();
deletedResultCount = 0;
deletedModelSnapshotCount = 0;
deletedModelStateCount = 0;
this.quiet = quiet;
}
@Override
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = JobResultsPersister.getJobIndexName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP);
timeRange.gte(cutoffEpochMs);
timeRange.lt(new Date().getTime());
RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener);
client.prepareSearch(index)
.setTypes(Result.TYPE.getPreferredName())
.setFetchSource(false)
.setQuery(timeRange)
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.execute(scrollSearchListener);
}
private void addDeleteRequestForSearchHits(SearchHits hits, String index) {
for (SearchHit hit : hits.hits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
addDeleteRequest(hit, index);
}
deletedResultCount = hits.getTotalHits();
}
private void addDeleteRequest(SearchHit hit, String index) {
DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client)
.setIndex(index)
.setType(hit.getType())
.setId(hit.getId());
bulkRequestBuilder.add(deleteRequest);
}
@Override
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
int docCount = modelSnapshot.getSnapshotDocCount();
String indexName = JobResultsPersister.getJobIndexName(jobId);
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings
for (int i = 0; i < docCount; ++i) {
String stateId = snapshotId + '_' + i;
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId));
++deletedModelStateCount;
}
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId));
++deletedModelSnapshotCount;
}
@Override
public void deleteModelDebugOutput(ModelDebugOutput modelDebugOutput) {
String id = modelDebugOutput.getId();
bulkRequestBuilder.add(
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), ModelDebugOutput.TYPE.getPreferredName(), id));
}
@Override
public void deleteModelSizeStats(ModelSizeStats modelSizeStats) {
bulkRequestBuilder.add(client.prepareDelete(
JobResultsPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId()));
}
@Override
public void deleteInterimResults() {
String index = JobResultsPersister.getJobIndexName(jobId);
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(index)
.setTypes(Result.RESULT_TYPE.getPreferredName())
.setQuery(qb)
.setFetchSource(false)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.get();
String scrollId = searchResponse.getScrollId();
long totalHits = searchResponse.getHits().totalHits();
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
++totalDeletedCount;
addDeleteRequest(hit, index);
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
}
}
/**
* Commits the deletions and if {@code forceMerge} is {@code true}, it
* forces a merge which removes the data from disk.
*/
@Override
public void commit(ActionListener<BulkResponse> listener) {
if (bulkRequestBuilder.numberOfActions() == 0) {
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return;
}
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
try {
bulkRequestBuilder.execute(listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Repeats a scroll search adding the hits a bulk delete request
*/
private class RepeatingSearchScrollListener implements ActionListener<SearchResponse> {
private final AtomicLong totalDeletedCount;
private final String index;
private final ActionListener<Boolean> scrollFinishedListener;
RepeatingSearchScrollListener(String index, ActionListener<Boolean> scrollFinishedListener) {
totalDeletedCount = new AtomicLong(0L);
this.index = index;
this.scrollFinishedListener = scrollFinishedListener;
}
@Override
public void onResponse(SearchResponse searchResponse) {
addDeleteRequestForSearchHits(searchResponse.getHits(), index);
totalDeletedCount.addAndGet(searchResponse.getHits().hits().length);
if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) {
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION)
.execute(this);
}
else {
scrollFinishedListener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
scrollFinishedListener.onFailure(e);
}
};
}

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.client.Client;
import java.util.function.Function;
/**
* TODO This is all just silly static typing shenanigans because Guice can't inject
* anonymous lambdas. This can all be removed once Guice goes away.
*/
public class ElasticsearchBulkDeleterFactory implements Function<String, ElasticsearchBulkDeleter> {
private final Client client;
public ElasticsearchBulkDeleterFactory(Client client) {
this.client = client;
}
@Override
public ElasticsearchBulkDeleter apply(String jobId) {
return new ElasticsearchBulkDeleter(client, jobId);
}
}

View File

@ -5,13 +5,62 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.ModelState;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.Result;
public interface JobDataDeleter {
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
public class JobDataDeleter {
private static final Logger LOGGER = Loggers.getLogger(JobDataDeleter.class);
private static final int SCROLL_SIZE = 1000;
private static final String SCROLL_CONTEXT_DURATION = "5m";
private final Client client;
private final String jobId;
private final BulkRequestBuilder bulkRequestBuilder;
private long deletedResultCount;
private long deletedModelSnapshotCount;
private long deletedModelStateCount;
private boolean quiet;
public JobDataDeleter(Client client, String jobId) {
this(client, jobId, false);
}
public JobDataDeleter(Client client, String jobId, boolean quiet) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
bulkRequestBuilder = client.prepareBulk();
deletedResultCount = 0;
deletedModelSnapshotCount = 0;
deletedModelStateCount = 0;
this.quiet = quiet;
}
/**
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
@ -19,36 +68,164 @@ public interface JobDataDeleter {
* @param cutoffEpochMs Results at and after this time will be deleted
* @param listener Response listener
*/
void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener);
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = JobResultsPersister.getJobIndexName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(ElasticsearchMappings.ES_TIMESTAMP);
timeRange.gte(cutoffEpochMs);
timeRange.lt(new Date().getTime());
RepeatingSearchScrollListener scrollSearchListener = new RepeatingSearchScrollListener(index, listener);
client.prepareSearch(index)
.setTypes(Result.TYPE.getPreferredName())
.setFetchSource(false)
.setQuery(timeRange)
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.execute(scrollSearchListener);
}
private void addDeleteRequestForSearchHits(SearchHits hits, String index) {
for (SearchHit hit : hits.hits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
addDeleteRequest(hit, index);
}
deletedResultCount = hits.getTotalHits();
}
private void addDeleteRequest(SearchHit hit, String index) {
DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client)
.setIndex(index)
.setType(hit.getType())
.setId(hit.getId());
bulkRequestBuilder.add(deleteRequest);
}
/**
* Delete a {@code ModelSnapshot}
*
* @param modelSnapshot the model snapshot to delete
*/
void deleteModelSnapshot(ModelSnapshot modelSnapshot);
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
int docCount = modelSnapshot.getSnapshotDocCount();
String indexName = JobResultsPersister.getJobIndexName(jobId);
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings
for (int i = 0; i < docCount; ++i) {
String stateId = snapshotId + '_' + i;
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId));
++deletedModelStateCount;
}
/**
* Delete a {@code ModelDebugOutput} record
*
* @param modelDebugOutput to delete
*/
void deleteModelDebugOutput(ModelDebugOutput modelDebugOutput);
/**
* Delete a {@code ModelSizeStats} record
*
* @param modelSizeStats to delete
*/
void deleteModelSizeStats(ModelSizeStats modelSizeStats);
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId));
++deletedModelSnapshotCount;
}
/**
* Delete all results marked as interim
*/
void deleteInterimResults();
public void deleteInterimResults() {
String index = JobResultsPersister.getJobIndexName(jobId);
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(index)
.setTypes(Result.RESULT_TYPE.getPreferredName())
.setQuery(qb)
.setFetchSource(false)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE)
.get();
String scrollId = searchResponse.getScrollId();
long totalHits = searchResponse.getHits().totalHits();
long totalDeletedCount = 0;
while (totalDeletedCount < totalHits) {
for (SearchHit hit : searchResponse.getHits()) {
LOGGER.trace("Search hit for result: {}", hit.getId());
++totalDeletedCount;
addDeleteRequest(hit, index);
++deletedResultCount;
}
searchResponse = client.prepareSearchScroll(scrollId).setScroll(SCROLL_CONTEXT_DURATION).get();
}
}
/**
* Commit the deletions without enforcing the removal of data from disk
*/
void commit(ActionListener<BulkResponse> listener);
public void commit(ActionListener<BulkResponse> listener) {
if (bulkRequestBuilder.numberOfActions() == 0) {
listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
return;
}
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
try {
bulkRequestBuilder.execute(listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Blocking version of {@linkplain #commit(ActionListener)}
*/
public void commit() {
if (bulkRequestBuilder.numberOfActions() == 0) {
return;
}
Level logLevel = quiet ? Level.DEBUG : Level.INFO;
LOGGER.log(logLevel, "Requesting deletion of {} results, {} model snapshots and {} model state documents",
deletedResultCount, deletedModelSnapshotCount, deletedModelStateCount);
BulkResponse response = bulkRequestBuilder.get();
if (response.hasFailures()) {
LOGGER.debug("Bulk request has failures. {}", response.buildFailureMessage());
}
}
/**
* Repeats a scroll search adding the hits a bulk delete request
*/
private class RepeatingSearchScrollListener implements ActionListener<SearchResponse> {
private final AtomicLong totalDeletedCount;
private final String index;
private final ActionListener<Boolean> scrollFinishedListener;
RepeatingSearchScrollListener(String index, ActionListener<Boolean> scrollFinishedListener) {
totalDeletedCount = new AtomicLong(0L);
this.index = index;
this.scrollFinishedListener = scrollFinishedListener;
}
@Override
public void onResponse(SearchResponse searchResponse) {
addDeleteRequestForSearchHits(searchResponse.getHits(), index);
totalDeletedCount.addAndGet(searchResponse.getHits().hits().length);
if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) {
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION)
.execute(this);
}
else {
scrollFinishedListener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
scrollFinishedListener.onFailure(e);
}
};
}

View File

@ -5,6 +5,24 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
public interface JobDataDeleterFactory {
JobDataDeleter newDeleter(String jobId);
import org.elasticsearch.client.Client;
import java.util.function.Function;
/**
* TODO This is all just silly static typing shenanigans because Guice can't inject
* anonymous lambdas. This can all be removed once Guice goes away.
*/
public class JobDataDeleterFactory implements Function<String, JobDataDeleter> {
private final Client client;
public JobDataDeleterFactory(Client client) {
this.client = client;
}
@Override
public JobDataDeleter apply(String jobId) {
return new JobDataDeleter(client, jobId);
}
}

View File

@ -5,36 +5,30 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.List;
/**
* Interface for classes that update {@linkplain Bucket Buckets}
* for a particular job with new normalised anomaly scores and
* unusual scores
* unusual scores.
*
* Renormalised results already have an ID having been indexed at least
* once before that same ID should be used on persistence
*/
public class JobRenormaliser extends AbstractComponent {
private final Client client;
private final JobResultsPersister jobResultsPersister;
public JobRenormaliser(Settings settings, Client client, JobResultsPersister jobResultsPersister) {
public JobRenormaliser(Settings settings, JobResultsPersister jobResultsPersister) {
super(settings);
this.client = client;
this.jobResultsPersister = jobResultsPersister;
}
@ -45,82 +39,44 @@ public class JobRenormaliser extends AbstractComponent {
* @param bucket the bucket to update
*/
public void updateBucket(Bucket bucket) {
String jobId = bucket.getJobId();
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: update result type {} to index {} with ID {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName,
bucket.getId());
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId())
.setSource(jobResultsPersister.toXContentBuilder(bucket)).execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e));
return;
}
// If the update to the bucket was successful, also update the
// standalone copies of the nested bucket influencers
try {
jobResultsPersister.persistBucketInfluencersStandalone(bucket.getJobId(), bucket.getId(), bucket.getBucketInfluencers(),
bucket.getTimestamp(), bucket.isInterim());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error updating standalone bucket influencer state", new Object[]{jobId}, e));
return;
}
jobResultsPersister.bulkPersisterBuilder(bucket.getJobId()).persistBucket(bucket).executeRequest();
}
/**
* Update the anomaly records for a particular bucket and job.
* The anomaly records are updated with the values in the
* <code>records</code> list.
* Update the anomaly records for a particular job.
* The anomaly records are updated with the values in <code>records</code> and
* stored with the ID returned by {@link AnomalyRecord#getId()}
*
* @param bucketId Id of the bucket to update
* @param records The new record values
* @param jobId Id of the job to update
* @param records The updated records
*/
public void updateRecords(String jobId, String bucketId, List<AnomalyRecord> records) {
try {
// Now bulk update the records within the bucket
BulkRequestBuilder bulkRequest = client.prepareBulk();
boolean addedAny = false;
for (AnomalyRecord record : records) {
String recordId = record.getId();
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: update ID {} result type {} in index {} using map of new values, for bucket {}",
jobId, recordId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, bucketId);
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), recordId)
.setSource(jobResultsPersister.toXContentBuilder(record)));
addedAny = true;
}
if (addedAny) {
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error("[{}] BulkResponse has errors: {}", jobId, bulkResponse.buildFailureMessage());
}
}
} catch (IOException | ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Error updating anomaly records", new Object[]{jobId}, e));
}
}
public void updatePerPartitionMaxProbabilities(String jobId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs =
new PerPartitionMaxProbabilities(records);
logger.trace("[{}] ES API CALL: update result type {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, ppMaxProbs.getId());
jobResultsPersister.persistPerPartitionMaxProbabilities(ppMaxProbs);
public void updateRecords(String jobId, List<AnomalyRecord> records) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistRecords(records, false).executeRequest();
}
/**
* Update the influencer for a particular job
* Create a {@link PerPartitionMaxProbabilities} object from this list of records and persist
* with the given ID.
*
* @param jobId Id of the job to update
* @param documentId The ID the {@link PerPartitionMaxProbabilities} document should be persisted with
* @param records Source of the new {@link PerPartitionMaxProbabilities} object
*/
public void updateInfluencer(Influencer influencer) {
jobResultsPersister.persistInfluencer(influencer);
public void updatePerPartitionMaxProbabilities(String jobId, String documentId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records);
ppMaxProbs.setId(documentId);
jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs, false).executeRequest();
}
/**
* Update the influencer for a particular job.
* The Influencer's are stored with the ID in {@link Influencer#getId()}
*
* @param jobId Id of the job to update
* @param influencers The updated influencers
*/
public void updateInfluencer(String jobId, List<Influencer> influencers) {
jobResultsPersister.bulkPersisterBuilder(jobId).persistInfluencers(influencers, false).executeRequest();
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -31,26 +30,24 @@ import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Saves result Buckets and Quantiles to Elasticsearch<br>
* <p>
* <b>Buckets</b> are written with the following structure:
* <h2>Bucket</h2> The results of each job are stored in buckets, this is the
* top level structure for the results. A bucket contains multiple anomaly
* records. The anomaly score of the bucket may not match the summed score of
* all the records as all the records may not have been outputted for the
* bucket.
* <h2>Anomaly Record</h2> Each record was generated by a detector which can be identified via
* Persists result types, Quantiles etc to Elasticsearch<br>
* <h2>Bucket</h2> Bucket result. The anomaly score of the bucket may not match the summed
* score of all the records as all the records may not have been outputted for the
* bucket. Contains bucket influencers that are persisted both with the bucket
* and separately.
* <b>Anomaly Record</b> Each record was generated by a detector which can be identified via
* the detectorIndex field.
* <h2>Detector</h2> The Job has a fixed number of detectors but there may not
* be output for every detector in each bucket. <br>
* <b>Influencers</b>
* <b>Quantiles</b> may contain model quantiles used in normalisation and are
* stored in documents of type {@link Quantiles#TYPE} <br>
* <h2>ModelSizeStats</h2> This is stored in a flat structure <br>
* <b>ModelSizeStats</b> This is stored in a flat structure <br>
* <b>ModelSnapShot</b> This is stored in a flat structure <br>
*
* @see org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchMappings
*/
@ -58,112 +55,174 @@ public class JobResultsPersister extends AbstractComponent {
private final Client client;
public JobResultsPersister(Settings settings, Client client) {
super(settings);
this.client = client;
}
/**
* Persist the result bucket
*/
public void persistBucket(Bucket bucket) {
String jobId = bucket.getJobId();
try {
XContentBuilder content = toXContentBuilder(bucket);
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName,
bucket.getEpoch());
client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content).execute()
.actionGet();
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e));
}
public Builder bulkPersisterBuilder(String jobId) {
return new Builder(jobId);
}
/**
* Persist a list of anomaly records
*
* @param records the records to persist
*/
public void persistRecords(List<AnomalyRecord> records) {
if (records.isEmpty()) {
return;
}
String jobId = records.get(0).getJobId();
String indexName = getJobIndexName(jobId);
BulkRequestBuilder addRecordsRequest = client.prepareBulk();
XContentBuilder content = null;
try {
for (AnomalyRecord record : records) {
content = toXContentBuilder(record);
public class Builder {
private BulkRequestBuilder bulkRequest;
private final String jobId;
private final String indexName;
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName);
addRecordsRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
private Builder (String jobId) {
this.jobId = Objects.requireNonNull(jobId);
indexName = getJobIndexName(jobId);
bulkRequest = client.prepareBulk();
}
/**
* Persist the result bucket and its bucket influencers
* Buckets are persisted with a consistent ID
*
* @param bucket The bucket to persist
* @return this
*/
public Builder persistBucket(Bucket bucket) {
try {
XContentBuilder content = toXContentBuilder(bucket);
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucket.getEpoch());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content));
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object [] {jobId}, e));
return;
return this;
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions());
BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
}
}
/**
* Persist a list of influencers
*
* @param influencers the influencers to persist
*/
public void persistInfluencers(List<Influencer> influencers) {
if (influencers.isEmpty()) {
return;
}
String jobId = influencers.get(0).getJobId();
String indexName = getJobIndexName(jobId);
BulkRequestBuilder addInfluencersRequest = client.prepareBulk();
XContentBuilder content = null;
try {
for (Influencer influencer : influencers) {
content = toXContentBuilder(influencer);
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, Influencer.RESULT_TYPE_VALUE, indexName);
addInfluencersRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
private void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim);
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content));
}
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting influencers", new Object[] {jobId}, e));
return;
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions());
BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet();
if (addInfluencersResponse.hasFailures()) {
logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage());
/**
* Persist a list of anomaly records
*
* @param records the records to persist
* @param autoGenerateId If true then persist the influencer with an auto generated ID
* else use {@link AnomalyRecord#getId()}
* @return this
*/
public Builder persistRecords(List<AnomalyRecord> records, boolean autoGenerateId) {
try {
for (AnomalyRecord record : records) {
XContentBuilder content = toXContentBuilder(record);
if (autoGenerateId) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
else {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content));
}
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e));
}
return this;
}
/**
* Persist a list of influencers optionally using each influencer's ID or
* an auto generated ID
*
* @param influencers the influencers to persist
* @param autoGenerateId If true then persist the influencer with an auto generated ID
* else use {@link Influencer#getId()}
* @return this
*/
public Builder persistInfluencers(List<Influencer> influencers, boolean autoGenerateId) {
try {
for (Influencer influencer : influencers) {
XContentBuilder content = toXContentBuilder(influencer);
if (autoGenerateId) {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with auto-generated ID",
jobId, Influencer.RESULT_TYPE_VALUE, indexName);
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content));
}
else {
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
bulkRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content));
}
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e));
}
return this;
}
/**
* Persist {@link PerPartitionMaxProbabilities}
*
* @param partitionProbabilities The probabilities to persist
* @param autoGenerateId If true then persist the PerPartitionMaxProbabilities with an auto generated ID
* else use {@link PerPartitionMaxProbabilities#getId()}
* @return this
*/
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities, boolean autoGenerateId) {
try {
XContentBuilder builder = toXContentBuilder(partitionProbabilities);
if (autoGenerateId) {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(builder));
}
else {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId())
.setSource(builder));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
new Object[]{jobId}, e));
}
return this;
}
/**
* Execute the bulk action
*/
public void executeRequest() {
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse addRecordsResponse = bulkRequest.execute().actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
}
}
}
public void persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
String jobId = partitionProbabilities.getJobId();
try {
XContentBuilder builder = toXContentBuilder(partitionProbabilities);
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp());
client.prepareIndex(indexName, Result.TYPE.getPreferredName())
.setSource(builder)
.execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores",
new Object[]{jobId}, e));
}
}
/**
* Persist the category definition
@ -172,7 +231,7 @@ public class JobResultsPersister extends AbstractComponent {
*/
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE::getPreferredName,
() -> String.valueOf(category.getCategoryId()), () -> serialiseCategoryDefinition(category));
() -> String.valueOf(category.getCategoryId()), () -> toXContentBuilder(category));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
@ -230,17 +289,6 @@ public class JobResultsPersister extends AbstractComponent {
// read again by this process
}
/**
* Persist the influencer
*/
public void persistInfluencer(Influencer influencer) {
Persistable persistable = new Persistable(influencer.getJobId(), influencer, Result.TYPE::getPreferredName,
influencer::getId, () -> toXContentBuilder(influencer));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* Persist state sent from the native process
*/
@ -259,22 +307,12 @@ public class JobResultsPersister extends AbstractComponent {
}
/**
* Delete any existing interim results
* Delete any existing interim results synchronously
*/
public void deleteInterimResults(String jobId) {
ElasticsearchBulkDeleter deleter = new ElasticsearchBulkDeleter(client, jobId, true);
JobDataDeleter deleter = new JobDataDeleter(client, jobId, true);
deleter.deleteInterimResults();
deleter.commit(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// don't care?
}
@Override
public void onFailure(Exception e) {
// don't care?
}
});
deleter.commit();
}
/**
@ -299,35 +337,6 @@ public class JobResultsPersister extends AbstractComponent {
return builder;
}
private XContentBuilder serialiseCategoryDefinition(CategoryDefinition categoryDefinition) throws IOException {
XContentBuilder builder = jsonBuilder();
categoryDefinition.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk();
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim);
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}", jobId, BucketInfluencer.RESULT_TYPE_VALUE,
indexName, id);
addBucketInfluencersRequest.add(
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), id).setSource(content));
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addBucketInfluencersRequest.numberOfActions());
BulkResponse addBucketInfluencersResponse = addBucketInfluencersRequest.execute().actionGet();
if (addBucketInfluencersResponse.hasFailures()) {
logger.error("[{}] Bulk index of Bucket Influencers has errors: {}", jobId,
addBucketInfluencersResponse.buildFailureMessage());
}
}
}
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer,
Date bucketTime, boolean isInterim) throws IOException {
BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer);

View File

@ -17,9 +17,9 @@ import java.util.function.Function;
*/
public class OldDataRemover {
private final Function<String, ElasticsearchBulkDeleter> dataDeleterFactory;
private final Function<String, JobDataDeleter> dataDeleterFactory;
public OldDataRemover(Function<String, ElasticsearchBulkDeleter> dataDeleterFactory) {
public OldDataRemover(Function<String, JobDataDeleter> dataDeleterFactory) {
this.dataDeleterFactory = Objects.requireNonNull(dataDeleterFactory);
}

View File

@ -31,12 +31,21 @@ import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
/**
* A runnable class that reads the autodetect process output
* and writes the results via the {@linkplain JobResultsPersister}
* passed in the constructor.
* A runnable class that reads the autodetect process output in the
* {@link #process(String, InputStream, boolean)} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p>
* Has methods to register and remove alert observers.
* Also has a method to wait for a flush to be complete.
*
* Buckets are the written last after records, influencers etc
* when the end of bucket is reached. Therefore results aren't persisted
* until the bucket is read, this means that interim results for all
* result types can be safely deleted when the bucket is read and before
* the new results are updated. This is specifically for the case where
* a flush command is issued repeatedly in the same bucket to generate
* interim results and the old interim results have to be cleared out
* before the new ones are written.
*/
public class AutoDetectResultProcessor {
@ -52,10 +61,7 @@ public class AutoDetectResultProcessor {
private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser) {
this.renormaliser = renormaliser;
this.persister = persister;
this.parser = parser;
this.flushListener = new FlushListener();
this(renormaliser, persister, parser, new FlushListener());
}
AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser,
@ -70,7 +76,7 @@ public class AutoDetectResultProcessor {
try (Stream<AutodetectResult> stream = parser.parseResults(in)) {
int bucketCount = 0;
Iterator<AutodetectResult> iterator = stream.iterator();
Context context = new Context(jobId, isPerPartitionNormalisation);
Context context = new Context(jobId, isPerPartitionNormalisation, persister.bulkPersisterBuilder(jobId));
while (iterator.hasNext()) {
AutodetectResult result = iterator.next();
processResult(context, result);
@ -94,28 +100,30 @@ public class AutoDetectResultProcessor {
Bucket bucket = result.getBucket();
if (bucket != null) {
if (context.deleteInterimRequired) {
// Delete any existing interim results at the start
// of a job upload:
// these are generated by a Flush command, and will
// be replaced or
// superseded by new results
// Delete any existing interim results generated by a Flush command
// which have not been replaced or superseded by new results.
LOGGER.trace("[{}] Deleting interim results", context.jobId);
// TODO: Is this the right place to delete results?
persister.deleteInterimResults(context.jobId);
context.deleteInterimRequired = false;
}
persister.persistBucket(bucket);
// persist after deleting interim results in case the new
// results are also interim
context.bulkResultsPersister.persistBucket(bucket);
context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId);
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
persister.persistRecords(records);
context.bulkResultsPersister.persistRecords(records, true);
if (context.isPerPartitionNormalization) {
persister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records));
context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records), true);
}
}
List<Influencer> influencers = result.getInfluencers();
if (influencers != null && !influencers.isEmpty()) {
persister.persistInfluencers(influencers);
context.bulkResultsPersister.persistInfluencers(influencers, true);
}
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
if (categoryDefinition != null) {
@ -193,13 +201,15 @@ public class AutoDetectResultProcessor {
private final String jobId;
private final boolean isPerPartitionNormalization;
private JobResultsPersister.Builder bulkResultsPersister;
boolean deleteInterimRequired;
Context(String jobId, boolean isPerPartitionNormalization) {
Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) {
this.jobId = jobId;
this.isPerPartitionNormalization = isPerPartitionNormalization;
this.deleteInterimRequired = true;
this.bulkResultsPersister = bulkResultsPersister;
}
}

View File

@ -24,7 +24,7 @@ import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
public class ElasticsearchBulkDeleterTests extends ESTestCase {
public class JobDataDeleterTests extends ESTestCase {
public void testDeleteResultsFromTime() {
@ -39,7 +39,7 @@ public class ElasticsearchBulkDeleterTests extends ESTestCase {
.prepareSearchScrollExecuteListener(response)
.prepareBulk(bulkResponse).build();
ElasticsearchBulkDeleter bulkDeleter = new ElasticsearchBulkDeleter(client, "foo");
JobDataDeleter bulkDeleter = new JobDataDeleter(client, "foo");
// because of the mocking this runs in the current thread
bulkDeleter.deleteResultsFromTime(new Date().getTime(), new ActionListener<Boolean>() {

View File

@ -64,7 +64,7 @@ public class JobResultsPersisterTests extends ESTestCase {
bucket.setRecords(Arrays.asList(record));
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.persistBucket(bucket);
persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest();
List<XContentBuilder> list = captor.getAllValues();
assertEquals(2, list.size());
@ -122,7 +122,7 @@ public class JobResultsPersisterTests extends ESTestCase {
r1.setTypical(typicals);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.persistRecords(records);
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records, true).executeRequest();
List<XContentBuilder> captured = captor.getAllValues();
assertEquals(1, captured.size());
@ -164,7 +164,7 @@ public class JobResultsPersisterTests extends ESTestCase {
influencers.add(inf);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.persistInfluencers(influencers);
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers, true).executeRequest();
List<XContentBuilder> captured = captor.getAllValues();
assertEquals(1, captured.size());

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.stream.Stream;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -37,6 +38,8 @@ import static org.mockito.Mockito.when;
public class AutoDetectResultProcessorTests extends ESTestCase {
private static final String JOB_ID = "_id";
public void testProcess() {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
@ -52,7 +55,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, parser);
processor.process("_id", mock(InputStream.class), randomBoolean());
processor.process(JOB_ID, mock(InputStream.class), randomBoolean());
verify(renormaliser, times(1)).shutdown();
assertEquals(0, processor.completionLatch.getCount());
}
@ -60,33 +63,39 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_bucket() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, never()).deleteInterimResults("_id");
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verify(persister, never()).deleteInterimResults(JOB_ID);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_bucket_deleteInterimRequired() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, times(1)).deleteInterimResults("_id");
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(persister, times(1)).deleteInterimResults(JOB_ID);
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verifyNoMoreInteractions(persister);
assertFalse(context.deleteInterimRequired);
}
@ -94,9 +103,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_records() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo");
@ -105,16 +116,18 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
verify(persister, times(1)).persistRecords(records);
verify(bulkBuilder, times(1)).persistRecords(records, true);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_records_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo");
@ -125,35 +138,38 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
verify(persister, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(persister, times(1)).persistRecords(records);
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class), eq(true));
verify(bulkBuilder, times(1)).persistRecords(records, true);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_influencers() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Influencer influencer1 = new Influencer("foo", "infField", "infValue");
Influencer influencer2 = new Influencer("foo", "infField2", "infValue2");
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue");
Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2");
List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
when(result.getInfluencers()).thenReturn(influencers);
processor.processResult(context, result);
verify(persister, times(1)).persistInfluencers(influencers);
verify(bulkBuilder, times(1)).persistInfluencers(influencers, true);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_categoryDefinition() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
@ -167,19 +183,20 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_flushAcknowledgement() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn("_id");
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
processor.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush("_id");
verify(persister, times(1)).commitWrites("_id");
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(persister, times(1)).commitWrites(JOB_ID);
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}
@ -187,14 +204,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn("_id");
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
@ -203,8 +221,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(persister, times(1)).commitWrites("_id");
inOrder.verify(flushListener, times(1)).acknowledgeFlush("_id");
inOrder.verify(persister, times(1)).commitWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}
@ -212,9 +230,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelDebugOutput() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelDebugOutput modelDebugOutput = mock(ModelDebugOutput.class);
@ -228,9 +247,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSizeStats() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
@ -245,9 +265,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSnapshot() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
@ -261,9 +282,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_quantiles() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
@ -279,9 +301,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_quantiles_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", true);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, true, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);