From 49eb5ea1366a5504c98e39d6e18d2616e3f2b8eb Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 10 May 2017 17:26:37 +0100 Subject: [PATCH] [ML] Retrieve model snapshot via search (elastic/x-pack-elasticsearch#1376) This removes the last remaining GET in JobProvider. Relates elastic/x-pack-elasticsearch#827 Original commit: elastic/x-pack-elasticsearch@820344be67befc2bf58e09d0e146b220b2965c02 --- .../xpack/ml/job/persistence/JobProvider.java | 38 +++---------------- .../output/AutoDetectResultProcessor.java | 3 ++ 2 files changed, 9 insertions(+), 32 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 19413af067e..4ebd9bb2365 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -16,7 +16,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequestBuilder; @@ -38,7 +37,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; @@ -341,31 +339,6 @@ public class JobProvider { } } - private void get(String indexName, String type, String id, Consumer handler, Consumer errorHandler, - BiFunction objectParser, Supplier notFoundSupplier) { - GetRequest getRequest = new GetRequest(indexName, type, id); - client.get(getRequest, ActionListener.wrap( - response -> { - if (response.isExists() == false) { - handler.accept(notFoundSupplier.get()); - } else { - BytesReference source = response.getSourceAsBytesRef(); - try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { - handler.accept(objectParser.apply(parser, null)); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse " + type, e); - } - } - }, - error -> { - if (error instanceof IndexNotFoundException == false) { - errorHandler.accept(error); - } else { - handler.accept(notFoundSupplier.get()); - } - })); - } - public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) { return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(), indicesOptions.expandWildcardsClosed(), indicesOptions); @@ -759,8 +732,7 @@ public class JobProvider { * @param jobId the id of the job for which influencers are requested * @return an influencer {@link BatchedDocumentsIterator} */ - public BatchedDocumentsIterator> - newBatchedInfluencersIterator(String jobId) { + public BatchedDocumentsIterator> newBatchedInfluencersIterator(String jobId) { return new BatchedInfluencersIterator(client, jobId); } @@ -773,9 +745,11 @@ public class JobProvider { handler.accept(null); return; } - get(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), ModelSnapshot.TYPE.getPreferredName(), - ModelSnapshot.documentId(jobId, modelSnapshotId), handler, errorHandler, - (parser, context) -> ModelSnapshot.PARSER.apply(parser, null).build(), () -> null); + String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(), + ModelSnapshot.documentId(jobId, modelSnapshotId)); + searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, + ModelSnapshot.PARSER, builder -> handler.accept(builder == null ? null : builder.build()), errorHandler, () -> null); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 80f9e84258a..977cec0e26a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -166,6 +166,9 @@ public class AutoDetectResultProcessor { ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { persister.persistModelSnapshot(modelSnapshot); + // We need to refresh the index in order for the snapshot to be available when we'll try to + // update the job with it + persister.commitResultWrites(jobId); updateModelSnapshotIdOnJob(modelSnapshot); } Quantiles quantiles = result.getQuantiles();