[ML] Retrieve data counts via search (elastic/x-pack-elasticsearch#1339)

Relates elastic/x-pack-elasticsearch#827

Original commit: elastic/x-pack-elasticsearch@73a6848526
This commit is contained in:
Dimitris Athanasiou 2017-05-08 13:02:07 +01:00 committed by GitHub
parent 4e25e1a24d
commit 28f2ba3ef8
1 changed files with 25 additions and 8 deletions

View File

@ -236,8 +236,15 @@ public class JobProvider {
*/
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
get(indexName, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId), handler, errorHandler,
DataCounts.PARSER, () -> new DataCounts(jobId));
searchSingleResult(jobId, DataCounts.TYPE.getPreferredName(), createLatestDataCountsSearch(indexName, jobId),
DataCounts.PARSER, handler, errorHandler, () -> new DataCounts(jobId));
}
private SearchRequestBuilder createLatestDataCountsSearch(String indexName, String jobId) {
return client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.idsQuery().addIds(DataCounts.documentId(jobId)))
.addSort(SortBuilders.fieldSort(DataCounts.LATEST_RECORD_TIME.getPreferredName()).order(SortOrder.DESC));
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
@ -247,7 +254,7 @@ public class JobProvider {
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createDocIdSearch(resultsIndex, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId)))
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
.add(createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
@ -953,16 +960,26 @@ public class JobProvider {
LOGGER.trace("ES API CALL: search latest {} for job {}", ModelSizeStats.RESULT_TYPE_VALUE, jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
createLatestModelSizeStatsSearch(indexName).execute(ActionListener.wrap(
searchSingleResult(jobId, ModelSizeStats.RESULT_TYPE_VALUE, createLatestModelSizeStatsSearch(indexName),
ModelSizeStats.PARSER,
builder -> handler.accept(builder.build()), errorHandler,
() -> new ModelSizeStats.Builder(jobId));
}
private <U, T> void searchSingleResult(String jobId, String resultDescription, SearchRequestBuilder search,
BiFunction<XContentParser, U, T> objectParser, Consumer<T> handler,
Consumer<Exception> errorHandler, Supplier<T> notFoundSupplier) {
search.execute(ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
LOGGER.trace("No {} for job with id {}", ModelSizeStats.RESULT_TYPE_VALUE, jobId);
handler.accept(new ModelSizeStats.Builder(jobId).build());
LOGGER.trace("No {} for job with id {}", resultDescription, jobId);
handler.accept(notFoundSupplier.get());
} else if (hits.length == 1) {
handler.accept(parseSearchHit(hits[0], ModelSizeStats.PARSER, errorHandler).build());
handler.accept(parseSearchHit(hits[0], objectParser, errorHandler));
} else {
errorHandler.accept(new IllegalStateException("Search returned " + hits.length + " hits even though size was 1"));
errorHandler.accept(new IllegalStateException("Search for unique [" + resultDescription + "] returned ["
+ hits.length + "] hits even though size was 1"));
}
}, errorHandler
));