[ML] Add method to find the established memory use for a job (elastic/x-pack-elasticsearch#2449)

"Established" memory use will be one of the building blocks for smarter node
allocation.

In order for a job to be considered to have established memory usage it must:
- Have generated at least 20 buckets of results
- Have generated at least one model size stats document
- Have low variability of model bytes in model size stats documents in the
  time period covered by the last 20 buckets, which is defined as having a
  coefficient of variation of no more than 0.1

Relates elastic/x-pack-elasticsearch#546

Original commit: elastic/x-pack-elasticsearch@5032eb01d8
This commit is contained in:
David Roberts 2017-09-12 10:25:53 +01:00 committed by GitHub
parent a979f33252
commit 1500074cb2
3 changed files with 326 additions and 1 deletions

View File

@ -426,7 +426,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
public static class Builder {
static final TimeValue DEFAULT_BUCKET_SPAN = TimeValue.timeValueMinutes(5);
public static final TimeValue DEFAULT_BUCKET_SPAN = TimeValue.timeValueMinutes(5);
private List<Detector> detectors;
private TimeValue bucketSpan = DEFAULT_BUCKET_SPAN;

View File

@ -44,6 +44,9 @@ import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
@ -100,6 +103,8 @@ public class JobProvider {
);
private static final int RECORDS_SIZE_PARAM = 10000;
private static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1;
private final Client client;
private final Settings settings;
@ -943,6 +948,96 @@ public class JobProvider {
.addSort(SortBuilders.fieldSort(ModelSizeStats.LOG_TIME_FIELD.getPreferredName()).order(SortOrder.DESC));
}
/**
* Get the "established" memory usage of a job, if it has one.
* In order for a job to be considered to have established memory usage it must:
* - Have generated at least <code>BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE</code> buckets of results
* - Have generated at least one model size stats document
* - Have low variability of model bytes in model size stats documents in the time period covered by the last
* <code>BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE</code> buckets, which is defined as having a coefficient of variation
* of no more than <code>ESTABLISHED_MEMORY_CV_THRESHOLD</code>
* @param jobId the id of the job for which established memory usage is required
* @param handler if the method succeeds, this will be passed the established memory usage (in bytes) of the
* specified job, or <code>null</code> if memory usage is not yet established
* @param errorHandler if a problem occurs, the exception will be passed to this handler
*/
public void getEstablishedMemoryUsage(String jobId, Consumer<Long> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Step 2. Find the count, mean and standard deviation of memory usage over the time span of the last N bucket results,
// where N is the number of buckets required to consider memory usage "established"
Consumer<QueryPage<Bucket>> bucketHandler = buckets -> {
if (buckets.results().size() == 1) {
String searchFromTimeMs = Long.toString(buckets.results().get(0).getTimestamp().getTime());
SearchRequestBuilder search = client.prepareSearch(indexName)
.setSize(0)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new BoolQueryBuilder()
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(searchFromTimeMs))
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE)))
.addAggregation(AggregationBuilders.extendedStats("es").field(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName()));
search.execute(ActionListener.wrap(
response -> {
List<Aggregation> aggregations = response.getAggregations().asList();
if (aggregations.size() == 1) {
ExtendedStats extendedStats = (ExtendedStats) aggregations.get(0);
long count = extendedStats.getCount();
if (count <= 0) {
// model size stats haven't changed in the last N buckets, so the latest (older) ones are established
modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), errorHandler);
} else if (count == 1) {
// no need to do an extra search in the case of exactly one document being aggregated
handler.accept((long) extendedStats.getAvg());
} else {
double coefficientOfVaration = extendedStats.getStdDeviation() / extendedStats.getAvg();
LOGGER.trace("[{}] Coefficient of variation [{}] when calculating established memory use", jobId,
coefficientOfVaration);
// is there sufficient stability in the latest model size stats readings?
if (coefficientOfVaration <= ESTABLISHED_MEMORY_CV_THRESHOLD) {
// yes, so return the latest model size as established
modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats),
errorHandler);
} else {
// no - we don't have an established model size
handler.accept(null);
}
}
} else {
handler.accept(null);
}
}, errorHandler
));
} else {
handler.accept(null);
}
};
// Step 1. Find the time span of the most recent N bucket results, where N is the number of buckets
// required to consider memory usage "established"
BucketsQueryBuilder.BucketsQuery bucketQuery = new BucketsQueryBuilder()
.sortField(Result.TIMESTAMP.getPreferredName())
.sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1)
.includeInterim(false)
.build();
bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> {
if (e instanceof ResourceNotFoundException) {
handler.accept(null);
} else {
errorHandler.accept(e);
}
});
}
/**
* A model size of 0 implies a completely uninitialised model. This method converts 0 to <code>null</code>
* before calling a handler.
*/
private static void handleModelBytesOrNull(Consumer<Long> handler, ModelSizeStats modelSizeStats) {
long modelBytes = modelSizeStats.getModelBytes();
handler.accept(modelBytes > 0 ? modelBytes : null);
}
/**
* Maps authorization failures when querying ML indexes to job-specific authorization failures attributed to the ML actions.
* Works by replacing the action name with another provided by the caller, and appending the job ID.

View File

@ -0,0 +1,230 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.Before;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
private long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis();
private JobProvider jobProvider;
private JobResultsPersister jobResultsPersister;
@Before
public void createComponents() {
Settings settings = nodeSettings(0);
jobProvider = new JobProvider(client(), settings);
jobResultsPersister = new JobResultsPersister(settings, client());
}
public void testEstablishedMem_givenNoResults() throws Exception {
String jobId = "no-results-established-mem-job";
initClusterAndJob(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
public void testEstablishedMem_givenNoStatsLongHistory() throws Exception {
String jobId = "no-stats-long-history-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 25);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
public void testEstablishedMem_givenNoStatsShortHistory() throws Exception {
String jobId = "no-stats-short-history-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 5);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
public void testEstablishedMem_givenHistoryTooShort() throws Exception {
String jobId = "too-short-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 19);
createModelSizeStats(jobId, 1, 19000L);
createModelSizeStats(jobId, 10, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
public void testEstablishedMem_givenHistoryJustEnoughLowVariation() throws Exception {
String jobId = "just-enough-low-cv-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 20);
createModelSizeStats(jobId, 1, 19000L);
createModelSizeStats(jobId, 10, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
}
public void testEstablishedMem_givenHistoryJustEnoughHighVariation() throws Exception {
String jobId = "just-enough-high-cv-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 20);
createModelSizeStats(jobId, 1, 1000L);
createModelSizeStats(jobId, 10, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
public void testEstablishedMem_givenLongEstablished() throws Exception {
String jobId = "long-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 25);
createModelSizeStats(jobId, 1, 10000L);
createModelSizeStats(jobId, 2, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
}
public void testEstablishedMem_givenOneRecentChange() throws Exception {
String jobId = "one-recent-change-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 25);
createModelSizeStats(jobId, 1, 10000L);
createModelSizeStats(jobId, 10, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
}
public void testEstablishedMem_givenOneRecentChangeOnly() throws Exception {
String jobId = "one-recent-change-only-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 25);
createModelSizeStats(jobId, 10, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
}
public void testEstablishedMem_givenHistoricHighVariationRecentLowVariation() throws Exception {
String jobId = "historic-high-cv-recent-low-cv-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 40);
createModelSizeStats(jobId, 1, 1000L);
createModelSizeStats(jobId, 3, 2000L);
createModelSizeStats(jobId, 10, 6000L);
createModelSizeStats(jobId, 19, 9000L);
createModelSizeStats(jobId, 30, 19000L);
createModelSizeStats(jobId, 35, 20000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
}
public void testEstablishedMem_givenHistoricLowVariationRecentHighVariation() throws Exception {
String jobId = "historic-low-cv-recent-high-cv-established-mem-job";
initClusterAndJob(jobId);
createBuckets(jobId, 40);
createModelSizeStats(jobId, 1, 19000L);
createModelSizeStats(jobId, 3, 20000L);
createModelSizeStats(jobId, 25, 21000L);
createModelSizeStats(jobId, 27, 39000L);
createModelSizeStats(jobId, 30, 67000L);
createModelSizeStats(jobId, 35, 95000L);
jobResultsPersister.commitResultWrites(jobId);
assertThat(queryEstablishedMemoryUsage(jobId), nullValue());
}
private void initClusterAndJob(String jobId) {
internalCluster().ensureAtLeastNumDataNodes(1);
ensureStableCluster(1);
Job.Builder job = createJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
}
private void createBuckets(String jobId, int count) {
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId);
for (int i = 1; i <= count; ++i) {
Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan);
builder.persistBucket(bucket);
}
builder.executeRequest();
}
private void createModelSizeStats(String jobId, int bucketNum, long modelBytes) {
ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId);
modelSizeStats.setTimestamp(new Date(bucketSpan * bucketNum));
modelSizeStats.setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000)));
modelSizeStats.setModelBytes(modelBytes);
jobResultsPersister.persistModelSizeStats(modelSizeStats.build());
}
private Long queryEstablishedMemoryUsage(String jobId) throws Exception {
AtomicReference<Long> establishedModelMemoryUsage = new AtomicReference<>();
AtomicReference<Exception> exception = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.getEstablishedMemoryUsage(jobId, memUse -> {
establishedModelMemoryUsage.set(memUse);
latch.countDown();
}, e -> {
exception.set(e);
latch.countDown();
});
latch.await();
if (exception.get() != null) {
throw exception.get();
}
return establishedModelMemoryUsage.get();
}
}