[ML] Make get_job_stats robust to missing results indices (elastic/x-pack-elasticsearch#1662)

Although the job stats for jobs with missing results indices are clearly
ruined, it's better to provide zeroes for the missing values and show the
stats for other jobs than to fail the whole request. This means the UI
can continue to function.

relates elastic/x-pack-elasticsearch#1656

Original commit: elastic/x-pack-elasticsearch@a06fa994a5
This commit is contained in:
David Roberts 2017-06-08 08:33:06 +01:00 committed by GitHub
parent d7658bd9a2
commit 7aa1114eca
2 changed files with 54 additions and 11 deletions

View File

@ -180,15 +180,14 @@ public class JobProvider {
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId()); String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId());
String indexName = job.getResultsIndexName(); String indexName = job.getResultsIndexName();
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> { final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success ->
client.admin().indices().prepareAliases() client.admin().indices().prepareAliases()
.addAlias(indexName, readAliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) .addAlias(indexName, readAliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
.addAlias(indexName, writeAliasName) .addAlias(indexName, writeAliasName)
// we could return 'success && r.isAcknowledged()' instead of 'true', but that makes // we could return 'success && r.isAcknowledged()' instead of 'true', but that makes
// testing not possible as we can't create IndicesAliasesResponse instance or // testing not possible as we can't create IndicesAliasesResponse instance or
// mock IndicesAliasesResponse#isAcknowledged() // mock IndicesAliasesResponse#isAcknowledged()
.execute(ActionListener.wrap(r -> finalListener.onResponse(true), finalListener::onFailure)); .execute(ActionListener.wrap(r -> finalListener.onResponse(true), finalListener::onFailure)),
},
finalListener::onFailure); finalListener::onFailure);
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
@ -295,6 +294,7 @@ public class JobProvider {
private SearchRequestBuilder createLatestDataCountsSearch(String indexName, String jobId) { private SearchRequestBuilder createLatestDataCountsSearch(String indexName, String jobId) {
return client.prepareSearch(indexName) return client.prepareSearch(indexName)
.setSize(1) .setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
// look for both old and new formats // look for both old and new formats
.setQuery(QueryBuilders.idsQuery().addIds(DataCounts.documentId(jobId), DataCounts.v54DocumentId(jobId))) .setQuery(QueryBuilders.idsQuery().addIds(DataCounts.documentId(jobId), DataCounts.v54DocumentId(jobId)))
.addSort(SortBuilders.fieldSort(DataCounts.LATEST_RECORD_TIME.getPreferredName()).order(SortOrder.DESC)); .addSort(SortBuilders.fieldSort(DataCounts.LATEST_RECORD_TIME.getPreferredName()).order(SortOrder.DESC));
@ -482,15 +482,13 @@ public class JobProvider {
} else { } else {
handler.accept(buckets); handler.accept(buckets);
} }
}, e -> { errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME)); })); }, e -> errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME))));
} }
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand, private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) { Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) {
if (bucketsToExpand.hasNext()) { if (bucketsToExpand.hasNext()) {
Consumer<Integer> c = i -> { Consumer<Integer> c = i -> expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
};
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), c, errorHandler, client); expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), c, errorHandler, client);
} else { } else {
handler.accept(buckets); handler.accept(buckets);
@ -750,7 +748,7 @@ public class JobProvider {
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, modelSnapshotId)); SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, modelSnapshotId));
searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, ModelSnapshot.PARSER, searchSingleResult(jobId, ModelSnapshot.TYPE.getPreferredName(), search, ModelSnapshot.PARSER,
result -> handler.accept(result.result == null ? null : new Result(result.index, result.result.build())), result -> handler.accept(result.result == null ? null : new Result<ModelSnapshot>(result.index, result.result.build())),
errorHandler, () -> null); errorHandler, () -> null);
} }
@ -953,9 +951,9 @@ public class JobProvider {
SearchHit[] hits = response.getHits().getHits(); SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) { if (hits.length == 0) {
LOGGER.trace("No {} for job with id {}", resultDescription, jobId); LOGGER.trace("No {} for job with id {}", resultDescription, jobId);
handler.accept(new Result(null, notFoundSupplier.get())); handler.accept(new Result<>(null, notFoundSupplier.get()));
} else if (hits.length == 1) { } else if (hits.length == 1) {
handler.accept(new Result(hits[0].getIndex(), parseSearchHit(hits[0], objectParser, errorHandler))); handler.accept(new Result<>(hits[0].getIndex(), parseSearchHit(hits[0], objectParser, errorHandler)));
} else { } else {
errorHandler.accept(new IllegalStateException("Search for unique [" + resultDescription + "] returned [" errorHandler.accept(new IllegalStateException("Search for unique [" + resultDescription + "] returned ["
+ hits.length + "] hits even though size was 1")); + hits.length + "] hits even though size was 1"));
@ -967,6 +965,7 @@ public class JobProvider {
private SearchRequestBuilder createLatestModelSizeStatsSearch(String indexName) { private SearchRequestBuilder createLatestModelSizeStatsSearch(String indexName) {
return client.prepareSearch(indexName) return client.prepareSearch(indexName)
.setSize(1) .setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE)) .setQuery(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE))
.addSort(SortBuilders.fieldSort(ModelSizeStats.LOG_TIME_FIELD.getPreferredName()).order(SortOrder.DESC)); .addSort(SortBuilders.fieldSort(ModelSizeStats.LOG_TIME_FIELD.getPreferredName()).order(SortOrder.DESC));
} }

View File

@ -224,3 +224,47 @@ setup:
- match: { jobs.0.job_id : job-stats-v54-bwc-test } - match: { jobs.0.job_id : job-stats-v54-bwc-test }
- match: { jobs.0.data_counts.processed_record_count: 10 } - match: { jobs.0.data_counts.processed_record_count: 10 }
- match: { jobs.0.model_size_stats.total_by_field_count: 101 } - match: { jobs.0.model_size_stats.total_by_field_count: 101 }
---
"Test no exception on get job stats with missing index":
- do:
xpack.ml.post_data:
job_id: job-stats-test
body: >
{"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
- do:
xpack.ml.close_job:
job_id: jobs-get-stats-datafeed-job
- match: { closed: true }
- do:
xpack.ml.close_job:
job_id: job-stats-test
- match: { closed: true }
- do:
indices.delete:
index: .ml-anomalies-shared
- do:
xpack.ml.get_job_stats: {}
- match: { count: 2 }
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.data_counts.processed_field_count: 0 }
- match: { jobs.0.data_counts.input_field_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
- match: { jobs.1.data_counts.processed_record_count: 0 }
- match: { jobs.1.data_counts.processed_field_count: 0 }
- match: { jobs.1.data_counts.input_field_count: 0 }
- match: { jobs.1.model_size_stats.model_bytes: 0 }
- match: { jobs.1.state: closed }
- is_false: jobs.1.node
- is_false: jobs.1.open_time