[ML] Write updated model snapshot to its original index (elastic/x-pack-elasticsearch#1415)

When we update a model snapshot we need to write it back to
the exact index where we read it from. This is necessary
for rollover as otherwise we would end up with two different
versions of the same snapshot.

Relates elastic/x-pack-elasticsearch#827

Original commit: elastic/x-pack-elasticsearch@b5d1ab38a7
This commit is contained in:
Dimitris Athanasiou 2017-05-15 12:00:15 +01:00 committed by GitHub
parent 463133b7de
commit e36e2c604d
12 changed files with 128 additions and 100 deletions

View File

@ -310,7 +310,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(),
request.getJobId())); request.getJobId()));
} }
handler.accept(modelSnapshot); handler.accept(modelSnapshot.result);
}, errorHandler); }, errorHandler);
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -275,25 +276,25 @@ public class UpdateModelSnapshotAction extends Action<UpdateModelSnapshotAction.
listener.onFailure(new ResourceNotFoundException(Messages.getMessage( listener.onFailure(new ResourceNotFoundException(Messages.getMessage(
Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), request.getJobId()))); Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), request.getJobId())));
} else { } else {
ModelSnapshot updatedSnapshot = applyUpdate(request, modelSnapshot); Result<ModelSnapshot> updatedSnapshot = applyUpdate(request, modelSnapshot);
jobManager.updateModelSnapshot(updatedSnapshot, b -> { jobManager.updateModelSnapshot(updatedSnapshot, b -> {
// The quantiles can be large, and totally dominate the output - // The quantiles can be large, and totally dominate the output -
// it's clearer to remove them // it's clearer to remove them
listener.onResponse(new Response(new ModelSnapshot.Builder(updatedSnapshot).setQuantiles(null).build())); listener.onResponse(new Response(new ModelSnapshot.Builder(updatedSnapshot.result).setQuantiles(null).build()));
}, listener::onFailure); }, listener::onFailure);
} }
}, listener::onFailure); }, listener::onFailure);
} }
private static ModelSnapshot applyUpdate(Request request, ModelSnapshot target) { private static Result<ModelSnapshot> applyUpdate(Request request, Result<ModelSnapshot> target) {
ModelSnapshot.Builder updatedSnapshotBuilder = new ModelSnapshot.Builder(target); ModelSnapshot.Builder updatedSnapshotBuilder = new ModelSnapshot.Builder(target.result);
if (request.getDescription() != null) { if (request.getDescription() != null) {
updatedSnapshotBuilder.setDescription(request.getDescription()); updatedSnapshotBuilder.setDescription(request.getDescription());
} }
if (request.getRetain() != null) { if (request.getRetain() != null) {
updatedSnapshotBuilder.setRetain(request.getRetain()); updatedSnapshotBuilder.setRetain(request.getRetain());
} }
return updatedSnapshotBuilder.build(); return new Result(target.index, updatedSnapshotBuilder.build());
} }
} }
} }

View File

@ -33,18 +33,17 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -54,8 +53,6 @@ import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/** /**
* Allows interactions with jobs. The managed interactions include: * Allows interactions with jobs. The managed interactions include:
* <ul> * <ul>
@ -224,9 +221,9 @@ public class JobManager extends AbstractComponent {
errorHandler.accept(new ResourceNotFoundException(message)); errorHandler.accept(new ResourceNotFoundException(message));
} }
jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> { jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
if (oldModelSnapshot != null && newModelSnapshot.getTimestamp().before(oldModelSnapshot.getTimestamp())) { if (oldModelSnapshot != null && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
String message = "Job [" + job.getId() + "] has a more recent model snapshot [" + String message = "Job [" + job.getId() + "] has a more recent model snapshot [" +
oldModelSnapshot.getSnapshotId() + "]"; oldModelSnapshot.result.getSnapshotId() + "]";
errorHandler.accept(new IllegalArgumentException(message)); errorHandler.accept(new IllegalArgumentException(message));
} }
handler.accept(true); handler.accept(true);
@ -413,11 +410,11 @@ public class JobManager extends AbstractComponent {
* *
* @param modelSnapshot the updated model snapshot object to be stored * @param modelSnapshot the updated model snapshot object to be stored
*/ */
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) { public void updateModelSnapshot(Result<ModelSnapshot> modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId()); IndexRequest indexRequest = new IndexRequest(modelSnapshot.index, ModelSnapshot.TYPE.getPreferredName(),
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); ModelSnapshot.documentId(modelSnapshot.result));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
modelSnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); modelSnapshot.result.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder); indexRequest.source(builder);
} catch (IOException e) { } catch (IOException e) {
errorHandler.accept(e); errorHandler.accept(e);

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException; import java.io.IOException;
@ -23,7 +24,7 @@ class BatchedBucketsIterator extends BatchedResultsIterator<Bucket> {
} }
@Override @Override
protected ResultWithIndex<Bucket> map(SearchHit hit) { protected Result<Bucket> map(SearchHit hit) {
BytesReference source = hit.getSourceRef(); BytesReference source = hit.getSourceRef();
XContentParser parser; XContentParser parser;
try { try {
@ -32,6 +33,6 @@ class BatchedBucketsIterator extends BatchedResultsIterator<Bucket> {
throw new ElasticsearchParseException("failed to parse bucket", e); throw new ElasticsearchParseException("failed to parse bucket", e);
} }
Bucket bucket = Bucket.PARSER.apply(parser, null); Bucket bucket = Bucket.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), bucket); return new Result<>(hit.getIndex(), bucket);
} }
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException; import java.io.IOException;
@ -22,7 +23,7 @@ class BatchedInfluencersIterator extends BatchedResultsIterator<Influencer> {
} }
@Override @Override
protected ResultWithIndex<Influencer> map(SearchHit hit) { protected Result<Influencer> map(SearchHit hit) {
BytesReference source = hit.getSourceRef(); BytesReference source = hit.getSourceRef();
XContentParser parser; XContentParser parser;
try { try {
@ -32,6 +33,6 @@ class BatchedInfluencersIterator extends BatchedResultsIterator<Influencer> {
} }
Influencer influencer = Influencer.PARSER.apply(parser, null); Influencer influencer = Influencer.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), influencer); return new Result<>(hit.getIndex(), influencer);
} }
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException; import java.io.IOException;
@ -23,7 +24,7 @@ class BatchedRecordsIterator extends BatchedResultsIterator<AnomalyRecord> {
} }
@Override @Override
protected ResultWithIndex<AnomalyRecord> map(SearchHit hit) { protected Result<AnomalyRecord> map(SearchHit hit) {
BytesReference source = hit.getSourceRef(); BytesReference source = hit.getSourceRef();
XContentParser parser; XContentParser parser;
try { try {
@ -32,6 +33,6 @@ class BatchedRecordsIterator extends BatchedResultsIterator<AnomalyRecord> {
throw new ElasticsearchParseException("failed to parse record", e); throw new ElasticsearchParseException("failed to parse record", e);
} }
AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null); AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null);
return new ResultWithIndex<>(hit.getIndex(), record); return new Result<>(hit.getIndex(), record);
} }
} }

View File

@ -9,8 +9,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
public abstract class BatchedResultsIterator<T> public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {
extends BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<T>> {
public BatchedResultsIterator(Client client, String jobId, String resultType) { public BatchedResultsIterator(Client client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId), super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
@ -21,14 +20,4 @@ public abstract class BatchedResultsIterator<T>
protected String getType() { protected String getType() {
return Result.TYPE.getPreferredName(); return Result.TYPE.getPreferredName();
} }
public static class ResultWithIndex<T> {
public final String indexName;
public final T result;
public ResultWithIndex(String indexName, T result) {
this.indexName = indexName;
this.result = result;
}
}
} }

View File

@ -235,7 +235,7 @@ public class JobProvider {
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) { public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
searchSingleResult(jobId, DataCounts.TYPE.getPreferredName(), createLatestDataCountsSearch(indexName, jobId), searchSingleResult(jobId, DataCounts.TYPE.getPreferredName(), createLatestDataCountsSearch(indexName, jobId),
DataCounts.PARSER, handler, errorHandler, () -> new DataCounts(jobId)); DataCounts.PARSER, result -> handler.accept(result.result), errorHandler, () -> new DataCounts(jobId));
} }
private SearchRequestBuilder createLatestDataCountsSearch(String indexName, String jobId) { private SearchRequestBuilder createLatestDataCountsSearch(String indexName, String jobId) {
@ -496,7 +496,7 @@ public class JobProvider {
* @param jobId the id of the job for which buckets are requested * @param jobId the id of the job for which buckets are requested
* @return a bucket {@link BatchedDocumentsIterator} * @return a bucket {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> newBatchedBucketsIterator(String jobId) { public BatchedDocumentsIterator<Result<Bucket>> newBatchedBucketsIterator(String jobId) {
return new BatchedBucketsIterator(client, jobId); return new BatchedBucketsIterator(client, jobId);
} }
@ -508,7 +508,7 @@ public class JobProvider {
* @param jobId the id of the job for which buckets are requested * @param jobId the id of the job for which buckets are requested
* @return a record {@link BatchedDocumentsIterator} * @return a record {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> public BatchedDocumentsIterator<Result<AnomalyRecord>>
newBatchedRecordsIterator(String jobId) { newBatchedRecordsIterator(String jobId) {
return new BatchedRecordsIterator(client, jobId); return new BatchedRecordsIterator(client, jobId);
} }
@ -732,14 +732,14 @@ public class JobProvider {
* @param jobId the id of the job for which influencers are requested * @param jobId the id of the job for which influencers are requested
* @return an influencer {@link BatchedDocumentsIterator} * @return an influencer {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> newBatchedInfluencersIterator(String jobId) { public BatchedDocumentsIterator<Result<Influencer>> newBatchedInfluencersIterator(String jobId) {
return new BatchedInfluencersIterator(client, jobId); return new BatchedInfluencersIterator(client, jobId);
} }
/** /**
* Get a job's model snapshot by its id * Get a job's model snapshot by its id
*/ */
public void getModelSnapshot(String jobId, @Nullable String modelSnapshotId, Consumer<ModelSnapshot> handler, public void getModelSnapshot(String jobId, @Nullable String modelSnapshotId, Consumer<Result<ModelSnapshot>> handler,
Consumer<Exception> errorHandler) { Consumer<Exception> errorHandler) {
if (modelSnapshotId == null) { if (modelSnapshotId == null) {
handler.accept(null); handler.accept(null);
@ -748,8 +748,9 @@ public class JobProvider {
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(), SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId)); ModelSnapshot.documentId(jobId, modelSnapshotId));
searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, ModelSnapshot.PARSER,
ModelSnapshot.PARSER, builder -> handler.accept(builder == null ? null : builder.build()), errorHandler, () -> null); result -> handler.accept(result.result == null ? null : new Result(result.index, result.result.build())),
errorHandler, () -> null);
} }
/** /**
@ -936,21 +937,21 @@ public class JobProvider {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
searchSingleResult(jobId, ModelSizeStats.RESULT_TYPE_VALUE, createLatestModelSizeStatsSearch(indexName), searchSingleResult(jobId, ModelSizeStats.RESULT_TYPE_VALUE, createLatestModelSizeStatsSearch(indexName),
ModelSizeStats.PARSER, ModelSizeStats.PARSER,
builder -> handler.accept(builder.build()), errorHandler, result -> handler.accept(result.result.build()), errorHandler,
() -> new ModelSizeStats.Builder(jobId)); () -> new ModelSizeStats.Builder(jobId));
} }
private <U, T> void searchSingleResult(String jobId, String resultDescription, SearchRequestBuilder search, private <U, T> void searchSingleResult(String jobId, String resultDescription, SearchRequestBuilder search,
BiFunction<XContentParser, U, T> objectParser, Consumer<T> handler, BiFunction<XContentParser, U, T> objectParser, Consumer<Result<T>> handler,
Consumer<Exception> errorHandler, Supplier<T> notFoundSupplier) { Consumer<Exception> errorHandler, Supplier<T> notFoundSupplier) {
search.execute(ActionListener.wrap( search.execute(ActionListener.wrap(
response -> { response -> {
SearchHit[] hits = response.getHits().getHits(); SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) { if (hits.length == 0) {
LOGGER.trace("No {} for job with id {}", resultDescription, jobId); LOGGER.trace("No {} for job with id {}", resultDescription, jobId);
handler.accept(notFoundSupplier.get()); handler.accept(new Result(null, notFoundSupplier.get()));
} else if (hits.length == 1) { } else if (hits.length == 1) {
handler.accept(parseSearchHit(hits[0], objectParser, errorHandler)); handler.accept(new Result(hits[0].getIndex(), parseSearchHit(hits[0], objectParser, errorHandler)));
} else { } else {
errorHandler.accept(new IllegalStateException("Search for unique [" + resultDescription + "] returned [" errorHandler.accept(new IllegalStateException("Search for unique [" + resultDescription + "] returned ["
+ hits.length + "] hits even though size was 1")); + hits.length + "] hits even though size was 1"));

View File

@ -10,13 +10,13 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
@ -92,7 +92,7 @@ public class ScoresUpdater {
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> bucketsIterator = BatchedDocumentsIterator<Result<Bucket>> bucketsIterator =
jobProvider.newBatchedBucketsIterator(job.getId()) jobProvider.newBatchedBucketsIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false); .includeInterim(false);
@ -111,16 +111,16 @@ public class ScoresUpdater {
while (bucketsIterator.hasNext()) { while (bucketsIterator.hasNext()) {
// Get a batch of buckets without their records to calculate // Get a batch of buckets without their records to calculate
// how many buckets can be sensibly retrieved // how many buckets can be sensibly retrieved
Deque<BatchedResultsIterator.ResultWithIndex<Bucket>> buckets = bucketsIterator.next(); Deque<Result<Bucket>> buckets = bucketsIterator.next();
if (buckets.isEmpty()) { if (buckets.isEmpty()) {
break; break;
} }
while (!buckets.isEmpty()) { while (!buckets.isEmpty()) {
BatchedResultsIterator.ResultWithIndex<Bucket> current = buckets.removeFirst(); Result<Bucket> current = buckets.removeFirst();
Bucket currentBucket = current.result; Bucket currentBucket = current.result;
if (currentBucket.isNormalizable()) { if (currentBucket.isNormalizable()) {
BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.indexName); BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.index);
List<RecordNormalizable> recordNormalizables = List<RecordNormalizable> recordNormalizables =
bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime()); bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime());
batchRecordCount += recordNormalizables.size(); batchRecordCount += recordNormalizables.size();
@ -148,15 +148,14 @@ public class ScoresUpdater {
} }
private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) { private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) {
BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId())
recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId())
.timeRange(bucketTimeStamp, bucketTimeStamp + 1) .timeRange(bucketTimeStamp, bucketTimeStamp + 1)
.includeInterim(false); .includeInterim(false);
List<RecordNormalizable> recordNormalizables = new ArrayList<>(); List<RecordNormalizable> recordNormalizables = new ArrayList<>();
while (recordsIterator.hasNext()) { while (recordsIterator.hasNext()) {
for (BatchedResultsIterator.ResultWithIndex<AnomalyRecord> record : recordsIterator.next() ) { for (Result<AnomalyRecord> record : recordsIterator.next() ) {
recordNormalizables.add(new RecordNormalizable(record.result, record.indexName)); recordNormalizables.add(new RecordNormalizable(record.result, record.index));
} }
} }
@ -204,13 +203,12 @@ public class ScoresUpdater {
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts) { long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> influencersIterator = BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobProvider.newBatchedInfluencersIterator(job.getId())
jobProvider.newBatchedInfluencersIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false); .includeInterim(false);
while (influencersIterator.hasNext()) { while (influencersIterator.hasNext()) {
Deque<BatchedResultsIterator.ResultWithIndex<Influencer>> influencers = influencersIterator.next(); Deque<Result<Influencer>> influencers = influencersIterator.next();
if (influencers.isEmpty()) { if (influencers.isEmpty()) {
LOGGER.debug("[{}] No influencers to renormalize for job", job.getId()); LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
break; break;
@ -219,7 +217,7 @@ public class ScoresUpdater {
LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size()); LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size());
List<Normalizable> asNormalizables = influencers.stream() List<Normalizable> asNormalizables = influencers.stream()
.map(influencerResultIndex -> .map(influencerResultIndex ->
new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.indexName)) new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index))
.collect(Collectors.toList()); .collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);

View File

@ -5,12 +5,14 @@
*/ */
package org.elasticsearch.xpack.ml.job.results; package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
/** /**
* Common attributes of the result types * A wrapper for concrete result objects plus meta information.
* Also contains common attributes for results.
*/ */
public class Result { public class Result<T> {
/** /**
* Serialisation fields * Serialisation fields
@ -19,4 +21,14 @@ public class Result {
public static final ParseField RESULT_TYPE = new ParseField("result_type"); public static final ParseField RESULT_TYPE = new ParseField("result_type");
public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField IS_INTERIM = new ParseField("is_interim"); public static final ParseField IS_INTERIM = new ParseField("is_interim");
@Nullable
public final String index;
@Nullable
public final T result;
public Result(String index, T result) {
this.index = index;
this.result = result;
}
} }

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
@ -14,6 +15,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
@ -24,12 +28,15 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshotTests;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Matchers; import org.mockito.Matchers;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -43,12 +50,14 @@ import static org.mockito.Mockito.mock;
public class JobManagerTests extends ESTestCase { public class JobManagerTests extends ESTestCase {
private Client client;
private ClusterService clusterService; private ClusterService clusterService;
private JobProvider jobProvider; private JobProvider jobProvider;
private Auditor auditor; private Auditor auditor;
@Before @Before
public void setupMocks() { public void setupMocks() {
client = mock(Client.class);
clusterService = mock(ClusterService.class); clusterService = mock(ClusterService.class);
jobProvider = mock(JobProvider.class); jobProvider = mock(JobProvider.class);
auditor = mock(Auditor.class); auditor = mock(Auditor.class);
@ -154,6 +163,28 @@ public class JobManagerTests extends ESTestCase {
}); });
} }
public void testUpdateModelSnapshot() {
ArgumentCaptor<IndexRequest> indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
doAnswer(invocationOnMock -> null).when(client).index(indexRequestCaptor.capture(), any());
ModelSnapshot modelSnapshot = ModelSnapshotTests.createRandomized();
JobManager jobManager = createJobManager();
jobManager.updateModelSnapshot(new Result("snapshot-index", modelSnapshot), response -> {}, error -> {});
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), equalTo("snapshot-index"));
// Assert snapshot was correctly serialised in the request by parsing it back and comparing to original
try (XContentParser parser = XContentFactory.xContent(indexRequest.source()).createParser(NamedXContentRegistry.EMPTY,
indexRequest.source())) {
ModelSnapshot requestSnapshot = ModelSnapshot.PARSER.apply(parser, null).build();
assertThat(requestSnapshot, equalTo(modelSnapshot));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Job.Builder createJob() { private Job.Builder createJob() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain"); Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client"); d1.setOverFieldName("client");
@ -168,8 +199,6 @@ public class JobManagerTests extends ESTestCase {
private JobManager createJobManager() { private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
Client client = mock(Client.class);
UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class); UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class);
return new JobManager(settings, jobProvider, clusterService, auditor, client, notifier); return new JobManager(settings, jobProvider, clusterService, auditor, client, notifier);
} }

View File

@ -5,6 +5,25 @@
*/ */
package org.elasticsearch.xpack.ml.job.process.normalizer; package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.junit.Before;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
@ -13,26 +32,6 @@ import java.util.Date;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.BatchedResultsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.junit.Before;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
@ -191,10 +190,10 @@ public class ScoresUpdaterTests extends ESTestCase {
Bucket bucket1 = generateBucket(new Date(0)); Bucket bucket1 = generateBucket(new Date(0));
bucket1.setAnomalyScore(42.0); bucket1.setAnomalyScore(42.0);
bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0)); bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0));
List<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> records = new ArrayList<>(); List<Result<AnomalyRecord>> records = new ArrayList<>();
Date date = new Date(); Date date = new Date();
for (int i=0; i<100000; i++) { for (int i=0; i<100000; i++) {
records.add(new BatchedResultsIterator.ResultWithIndex<>("foo", new AnomalyRecord("foo", date, 1, i))); records.add(new Result<>("foo", new AnomalyRecord("foo", date, 1, i)));
} }
Bucket bucket2 = generateBucket(new Date(10000 * 1000)); Bucket bucket2 = generateBucket(new Date(10000 * 1000));
@ -207,9 +206,9 @@ public class ScoresUpdaterTests extends ESTestCase {
givenProviderReturnsBuckets(batch); givenProviderReturnsBuckets(batch);
List<Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> recordBatches = new ArrayList<>(); List<Deque<Result<AnomalyRecord>>> recordBatches = new ArrayList<>();
recordBatches.add(new ArrayDeque<>(records)); recordBatches.add(new ArrayDeque<>(records));
MockBatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter = MockBatchedDocumentsIterator<Result<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(recordBatches); new MockBatchedDocumentsIterator<>(recordBatches);
recordIter.requireIncludeInterim(false); recordIter.requireIncludeInterim(false);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
@ -337,30 +336,30 @@ public class ScoresUpdaterTests extends ESTestCase {
} }
private void givenBuckets(List<Deque<Bucket>> batches) { private void givenBuckets(List<Deque<Bucket>> batches) {
List<Deque<BatchedResultsIterator.ResultWithIndex<Bucket>>> batchesWithIndex = new ArrayList<>(); List<Deque<Result<Bucket>>> batchesWithIndex = new ArrayList<>();
for (Deque<Bucket> deque : batches) { for (Deque<Bucket> deque : batches) {
Deque<BatchedResultsIterator.ResultWithIndex<Bucket>> queueWithIndex = new ArrayDeque<>(); Deque<Result<Bucket>> queueWithIndex = new ArrayDeque<>();
for (Bucket bucket : deque) { for (Bucket bucket : deque) {
queueWithIndex.add(new BatchedResultsIterator.ResultWithIndex<>("foo", bucket)); queueWithIndex.add(new Result<>("foo", bucket));
} }
batchesWithIndex.add(queueWithIndex); batchesWithIndex.add(queueWithIndex);
} }
MockBatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Bucket>> bucketIter = MockBatchedDocumentsIterator<Result<Bucket>> bucketIter =
new MockBatchedDocumentsIterator<>(batchesWithIndex); new MockBatchedDocumentsIterator<>(batchesWithIndex);
bucketIter.requireIncludeInterim(false); bucketIter.requireIncludeInterim(false);
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
} }
private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) { private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) {
Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> batch = new ArrayDeque<>(); Deque<Result<AnomalyRecord>> batch = new ArrayDeque<>();
List<Deque<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>>> batches = new ArrayList<>(); List<Deque<Result<AnomalyRecord>>> batches = new ArrayList<>();
for (AnomalyRecord record : records) { for (AnomalyRecord record : records) {
batch.add(new BatchedResultsIterator.ResultWithIndex<>("foo", record)); batch.add(new Result<>("foo", record));
} }
batches.add(batch); batches.add(batch);
MockBatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<AnomalyRecord>> recordIter = MockBatchedDocumentsIterator<Result<AnomalyRecord>> recordIter =
new MockBatchedDocumentsIterator<>(batches); new MockBatchedDocumentsIterator<>(batches);
recordIter.requireIncludeInterim(false); recordIter.requireIncludeInterim(false);
when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
@ -371,14 +370,13 @@ public class ScoresUpdaterTests extends ESTestCase {
} }
private void givenProviderReturnsInfluencers(Deque<Influencer> influencers) { private void givenProviderReturnsInfluencers(Deque<Influencer> influencers) {
List<Deque<BatchedResultsIterator.ResultWithIndex<Influencer>>> batches = new ArrayList<>(); List<Deque<Result<Influencer>>> batches = new ArrayList<>();
Deque<BatchedResultsIterator.ResultWithIndex<Influencer>> queue = new ArrayDeque<>(); Deque<Result<Influencer>> queue = new ArrayDeque<>();
for (Influencer inf : influencers) { for (Influencer inf : influencers) {
queue.add(new BatchedResultsIterator.ResultWithIndex<>("foo", inf)); queue.add(new Result<>("foo", inf));
} }
batches.add(queue); batches.add(queue);
MockBatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> iterator = MockBatchedDocumentsIterator<Result<Influencer>> iterator = new MockBatchedDocumentsIterator<>(batches);
new MockBatchedDocumentsIterator<>(batches);
iterator.requireIncludeInterim(false); iterator.requireIncludeInterim(false);
when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
} }