[ML] Retrieve model_size_stats via search (elastic/x-pack-elasticsearch#1326)

This is a task towards allowing rollover.

Multiple model_size_stats are stored in order to allow
analytics of the memory usage over time. The job _stats
need to display the latest model_size_stats. Before this
commit, the latest model_size_stats was being stored with
a special ID and it was retrieved using that ID. This
does not lend itself well for rollover as we would end up
with multiple of those special IDs in the rolled indices.

This commit removes the need to store a special model_size_stats
version. Job _stats now retrieve the model_size_stats by searching
for the latest one. It also uses a manual ID for all model_size_stats
in order to maintain a single document per log_time.

Relates elastic/x-pack-elasticsearch#827

Original commit: elastic/x-pack-elasticsearch@b2796e9b08
This commit is contained in:
Dimitris Athanasiou 2017-05-05 17:05:39 +01:00 committed by GitHub
parent 8ab46e800c
commit 3570eb32d3
5 changed files with 50 additions and 31 deletions

View File

@ -248,7 +248,7 @@ public class JobProvider {
String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createDocIdSearch(resultsIndex, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId)))
.add(createDocIdSearch(resultsIndex, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId)))
.add(createLatestModelSizeStatsSearch(resultsIndex))
.add(createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
.add(createDocIdSearch(stateIndex, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId)));
@ -277,15 +277,15 @@ public class JobProvider {
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
if (totalHits == 0) {
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearch.request().requests().get(i);
LOGGER.debug("Found 0 hits for [{}/{}]", searchRequest.indices(), searchRequest.types());
} else if (totalHits == 1) {
} else if (hitsCount == 1) {
parseAutodetectParamSearchHit(paramsBuilder, hits.getAt(0), errorHandler);
} else if (totalHits > 1) {
errorHandler.accept(new IllegalStateException("Expected total hits 0 or 1, but got [" + totalHits +
"] total hits"));
} else if (hitsCount > 1) {
errorHandler.accept(new IllegalStateException("Expected hits count to be 0 or 1, but got ["
+ hitsCount + "]"));
}
}
}
@ -950,16 +950,29 @@ public class JobProvider {
* Get the job's model size stats.
*/
public void modelSizeStats(String jobId, Consumer<ModelSizeStats> handler, Consumer<Exception> errorHandler) {
LOGGER.trace("ES API CALL: get result type {} ID {} for job {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId);
LOGGER.trace("ES API CALL: search latest {} for job {}", ModelSizeStats.RESULT_TYPE_VALUE, jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
get(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId),
handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(),
() -> {
LOGGER.trace("No memory usage details for job with id {}", jobId);
return new ModelSizeStats.Builder(jobId).build();
});
createLatestModelSizeStatsSearch(indexName).execute(ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
LOGGER.trace("No {} for job with id {}", ModelSizeStats.RESULT_TYPE_VALUE, jobId);
handler.accept(new ModelSizeStats.Builder(jobId).build());
} else if (hits.length == 1) {
handler.accept(parseSearchHit(hits[0], ModelSizeStats.PARSER, errorHandler).build());
} else {
errorHandler.accept(new IllegalStateException("Search returned " + hits.length + " hits even though size was 1"));
}
}, errorHandler
));
}
private SearchRequestBuilder createLatestModelSizeStatsSearch(String indexName) {
return client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE))
.addSort(SortBuilders.fieldSort(ModelSizeStats.LOG_TIME_FIELD.getPreferredName()).order(SortOrder.DESC));
}
/**

View File

@ -250,9 +250,7 @@ public class JobResultsPersister extends AbstractComponent {
String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
ModelSizeStats.documentId(jobId));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null);
modelSizeStats.documentId());
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
// Don't commit as we expect masses of these updates and they're only
// for information at the API level

View File

@ -78,10 +78,6 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
PARSER.declareField(Builder::setMemoryStatus, p -> MemoryStatus.fromString(p.text()), MEMORY_STATUS_FIELD, ValueType.STRING);
}
public static String documentId(String jobId) {
return jobId + "-" + RESULT_TYPE_VALUE;
}
/**
* The status of the memory monitored by the ResourceMonitor. OK is default,
* SOFT_LIMIT means that the models have done some aggressive pruning to
@ -262,6 +258,12 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.jobId, that.jobId);
}
public String documentId() {
// We choose to create IDs manually here to ensure that we'll have only one
// document for a given log_time (which is in seconds granularity).
return jobId + "-" + RESULT_TYPE_VALUE + "-" + logTime.getTime();
}
public static class Builder {
private final String jobId;

View File

@ -26,7 +26,13 @@ public class ModelSizeStatsTests extends AbstractSerializingTestCase<ModelSizeSt
}
public void testDocumentId() {
assertEquals("foo-model_size_stats", ModelSizeStats.documentId("foo"));
ModelSizeStats.Builder stats1 = new ModelSizeStats.Builder("foo");
stats1.setLogTime(new Date(123456789L));
assertEquals("foo-model_size_stats-123456789", stats1.build().documentId());
ModelSizeStats.Builder stats2 = new ModelSizeStats.Builder("bar");
stats2.setLogTime(new Date(987654321L));
assertEquals("bar-model_size_stats-987654321", stats2.build().documentId());
}
public void testSetMemoryStatus_GivenNull() {

View File

@ -105,7 +105,7 @@
- do:
count:
index: .ml-anomalies-shared
- match: {count: 8}
- match: {count: 6}
- do:
count:
@ -117,7 +117,7 @@
term:
job_id: index-layout-job
- match: {count: 4}
- match: {count: 3}
- do:
count:
@ -129,7 +129,7 @@
term:
job_id: index-layout-job
- match: {count: 4}
- match: {count: 3}
- do:
count:
@ -141,7 +141,7 @@
term:
job_id: index-layout-job2
- match: {count: 4}
- match: {count: 3}
- do:
count:
@ -152,7 +152,7 @@
filter:
term:
job_id: index-layout-job2
- match: {count: 4}
- match: {count: 3}
# Put some categorizer state
- do:
@ -203,7 +203,7 @@
- do:
count:
index: .ml-anomalies-shared
- match: {count: 4}
- match: {count: 3}
- do:
@ -216,7 +216,7 @@
term:
job_id: index-layout-job2
- match: {count: 4}
- match: {count: 3}
- do:
count:
@ -228,7 +228,7 @@
term:
job_id: index-layout-job2
- match: {count: 4}
- match: {count: 3}
- do:
xpack.ml.delete_job: