[ML] Retrieve model snapshot via search (elastic/x-pack-elasticsearch#1376)

This removes the last remaining GET in JobProvider.

Relates elastic/x-pack-elasticsearch#827

Original commit: elastic/x-pack-elasticsearch@820344be67
This commit is contained in:
Dimitris Athanasiou 2017-05-10 17:26:37 +01:00 committed by GitHub
parent 4e321cc409
commit 49eb5ea136
2 changed files with 9 additions and 32 deletions

View File

@ -16,7 +16,6 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
@ -38,7 +37,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
@ -341,31 +339,6 @@ public class JobProvider {
}
}
private <T, U> void get(String indexName, String type, String id, Consumer<T> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser, Supplier<T> notFoundSupplier) {
GetRequest getRequest = new GetRequest(indexName, type, id);
client.get(getRequest, ActionListener.wrap(
response -> {
if (response.isExists() == false) {
handler.accept(notFoundSupplier.get());
} else {
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
handler.accept(objectParser.apply(parser, null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse " + type, e);
}
}
},
error -> {
if (error instanceof IndexNotFoundException == false) {
errorHandler.accept(error);
} else {
handler.accept(notFoundSupplier.get());
}
}));
}
public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) {
return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(), indicesOptions);
@ -759,8 +732,7 @@ public class JobProvider {
* @param jobId the id of the job for which influencers are requested
* @return an influencer {@link BatchedDocumentsIterator}
*/
public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>>
newBatchedInfluencersIterator(String jobId) {
public BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<Influencer>> newBatchedInfluencersIterator(String jobId) {
return new BatchedInfluencersIterator(client, jobId);
}
@ -773,9 +745,11 @@ public class JobProvider {
handler.accept(null);
return;
}
get(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId), handler, errorHandler,
(parser, context) -> ModelSnapshot.PARSER.apply(parser, null).build(), () -> null);
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId));
searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search,
ModelSnapshot.PARSER, builder -> handler.accept(builder == null ? null : builder.build()), errorHandler, () -> null);
}
/**

View File

@ -166,6 +166,9 @@ public class AutoDetectResultProcessor {
ModelSnapshot modelSnapshot = result.getModelSnapshot();
if (modelSnapshot != null) {
persister.persistModelSnapshot(modelSnapshot);
// We need to refresh the index in order for the snapshot to be available when we'll try to
// update the job with it
persister.commitResultWrites(jobId);
updateModelSnapshotIdOnJob(modelSnapshot);
}
Quantiles quantiles = result.getQuantiles();