diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java index 175f3108e35..a138c995ff2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java @@ -426,7 +426,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable { public static class Builder { - static final TimeValue DEFAULT_BUCKET_SPAN = TimeValue.timeValueMinutes(5); + public static final TimeValue DEFAULT_BUCKET_SPAN = TimeValue.timeValueMinutes(5); private List detectors; private TimeValue bucketSpan = DEFAULT_BUCKET_SPAN; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index f0197b7bf39..244a0b153d6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -44,6 +44,9 @@ import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilder; @@ -100,6 +103,8 @@ public class JobProvider { ); private static final int RECORDS_SIZE_PARAM = 10000; + private static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20; + private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1; private final Client client; private final Settings settings; @@ -943,6 +948,96 @@ public class JobProvider { .addSort(SortBuilders.fieldSort(ModelSizeStats.LOG_TIME_FIELD.getPreferredName()).order(SortOrder.DESC)); } + /** + * Get the "established" memory usage of a job, if it has one. + * In order for a job to be considered to have established memory usage it must: + * - Have generated at least BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE buckets of results + * - Have generated at least one model size stats document + * - Have low variability of model bytes in model size stats documents in the time period covered by the last + * BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE buckets, which is defined as having a coefficient of variation + * of no more than ESTABLISHED_MEMORY_CV_THRESHOLD + * @param jobId the id of the job for which established memory usage is required + * @param handler if the method succeeds, this will be passed the established memory usage (in bytes) of the + * specified job, or null if memory usage is not yet established + * @param errorHandler if a problem occurs, the exception will be passed to this handler + */ + public void getEstablishedMemoryUsage(String jobId, Consumer handler, Consumer errorHandler) { + + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + + // Step 2. Find the count, mean and standard deviation of memory usage over the time span of the last N bucket results, + // where N is the number of buckets required to consider memory usage "established" + Consumer> bucketHandler = buckets -> { + if (buckets.results().size() == 1) { + String searchFromTimeMs = Long.toString(buckets.results().get(0).getTimestamp().getTime()); + SearchRequestBuilder search = client.prepareSearch(indexName) + .setSize(0) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(new BoolQueryBuilder() + .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(searchFromTimeMs)) + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE))) + .addAggregation(AggregationBuilders.extendedStats("es").field(ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName())); + search.execute(ActionListener.wrap( + response -> { + List aggregations = response.getAggregations().asList(); + if (aggregations.size() == 1) { + ExtendedStats extendedStats = (ExtendedStats) aggregations.get(0); + long count = extendedStats.getCount(); + if (count <= 0) { + // model size stats haven't changed in the last N buckets, so the latest (older) ones are established + modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), errorHandler); + } else if (count == 1) { + // no need to do an extra search in the case of exactly one document being aggregated + handler.accept((long) extendedStats.getAvg()); + } else { + double coefficientOfVaration = extendedStats.getStdDeviation() / extendedStats.getAvg(); + LOGGER.trace("[{}] Coefficient of variation [{}] when calculating established memory use", jobId, + coefficientOfVaration); + // is there sufficient stability in the latest model size stats readings? + if (coefficientOfVaration <= ESTABLISHED_MEMORY_CV_THRESHOLD) { + // yes, so return the latest model size as established + modelSizeStats(jobId, modelSizeStats -> handleModelBytesOrNull(handler, modelSizeStats), + errorHandler); + } else { + // no - we don't have an established model size + handler.accept(null); + } + } + } else { + handler.accept(null); + } + }, errorHandler + )); + } else { + handler.accept(null); + } + }; + + // Step 1. Find the time span of the most recent N bucket results, where N is the number of buckets + // required to consider memory usage "established" + BucketsQueryBuilder.BucketsQuery bucketQuery = new BucketsQueryBuilder() + .sortField(Result.TIMESTAMP.getPreferredName()) + .sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1) + .includeInterim(false) + .build(); + bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> { + if (e instanceof ResourceNotFoundException) { + handler.accept(null); + } else { + errorHandler.accept(e); + } + }); + } + + /** + * A model size of 0 implies a completely uninitialised model. This method converts 0 to null + * before calling a handler. + */ + private static void handleModelBytesOrNull(Consumer handler, ModelSizeStats modelSizeStats) { + long modelBytes = modelSizeStats.getModelBytes(); + handler.accept(modelBytes > 0 ? modelBytes : null); + } + /** * Maps authorization failures when querying ML indexes to job-specific authorization failures attributed to the ML actions. * Works by replacing the action name with another provided by the caller, and appending the job ID. diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java new file mode 100644 index 00000000000..503f31548be --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -0,0 +1,230 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; + +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; + +public class EstablishedMemUsageIT extends BaseMlIntegTestCase { + + private long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis(); + + private JobProvider jobProvider; + private JobResultsPersister jobResultsPersister; + + @Before + public void createComponents() { + Settings settings = nodeSettings(0); + jobProvider = new JobProvider(client(), settings); + jobResultsPersister = new JobResultsPersister(settings, client()); + } + + public void testEstablishedMem_givenNoResults() throws Exception { + String jobId = "no-results-established-mem-job"; + + initClusterAndJob(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + public void testEstablishedMem_givenNoStatsLongHistory() throws Exception { + String jobId = "no-stats-long-history-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 25); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + public void testEstablishedMem_givenNoStatsShortHistory() throws Exception { + String jobId = "no-stats-short-history-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 5); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + public void testEstablishedMem_givenHistoryTooShort() throws Exception { + String jobId = "too-short-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 19); + createModelSizeStats(jobId, 1, 19000L); + createModelSizeStats(jobId, 10, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + public void testEstablishedMem_givenHistoryJustEnoughLowVariation() throws Exception { + String jobId = "just-enough-low-cv-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 20); + createModelSizeStats(jobId, 1, 19000L); + createModelSizeStats(jobId, 10, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + } + + public void testEstablishedMem_givenHistoryJustEnoughHighVariation() throws Exception { + String jobId = "just-enough-high-cv-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 20); + createModelSizeStats(jobId, 1, 1000L); + createModelSizeStats(jobId, 10, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + public void testEstablishedMem_givenLongEstablished() throws Exception { + String jobId = "long-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 25); + createModelSizeStats(jobId, 1, 10000L); + createModelSizeStats(jobId, 2, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + } + + public void testEstablishedMem_givenOneRecentChange() throws Exception { + String jobId = "one-recent-change-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 25); + createModelSizeStats(jobId, 1, 10000L); + createModelSizeStats(jobId, 10, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + } + + public void testEstablishedMem_givenOneRecentChangeOnly() throws Exception { + String jobId = "one-recent-change-only-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 25); + createModelSizeStats(jobId, 10, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + } + + public void testEstablishedMem_givenHistoricHighVariationRecentLowVariation() throws Exception { + String jobId = "historic-high-cv-recent-low-cv-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 40); + createModelSizeStats(jobId, 1, 1000L); + createModelSizeStats(jobId, 3, 2000L); + createModelSizeStats(jobId, 10, 6000L); + createModelSizeStats(jobId, 19, 9000L); + createModelSizeStats(jobId, 30, 19000L); + createModelSizeStats(jobId, 35, 20000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L)); + } + + public void testEstablishedMem_givenHistoricLowVariationRecentHighVariation() throws Exception { + String jobId = "historic-low-cv-recent-high-cv-established-mem-job"; + + initClusterAndJob(jobId); + + createBuckets(jobId, 40); + createModelSizeStats(jobId, 1, 19000L); + createModelSizeStats(jobId, 3, 20000L); + createModelSizeStats(jobId, 25, 21000L); + createModelSizeStats(jobId, 27, 39000L); + createModelSizeStats(jobId, 30, 67000L); + createModelSizeStats(jobId, 35, 95000L); + jobResultsPersister.commitResultWrites(jobId); + + assertThat(queryEstablishedMemoryUsage(jobId), nullValue()); + } + + private void initClusterAndJob(String jobId) { + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableCluster(1); + + Job.Builder job = createJob(jobId); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + assertTrue(putJobResponse.isAcknowledged()); + } + + private void createBuckets(String jobId, int count) { + JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId); + for (int i = 1; i <= count; ++i) { + Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); + builder.persistBucket(bucket); + } + builder.executeRequest(); + } + + private void createModelSizeStats(String jobId, int bucketNum, long modelBytes) { + ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId); + modelSizeStats.setTimestamp(new Date(bucketSpan * bucketNum)); + modelSizeStats.setLogTime(new Date(bucketSpan * bucketNum + randomIntBetween(1, 1000))); + modelSizeStats.setModelBytes(modelBytes); + jobResultsPersister.persistModelSizeStats(modelSizeStats.build()); + } + + private Long queryEstablishedMemoryUsage(String jobId) throws Exception { + AtomicReference establishedModelMemoryUsage = new AtomicReference<>(); + AtomicReference exception = new AtomicReference<>(); + + CountDownLatch latch = new CountDownLatch(1); + + jobProvider.getEstablishedMemoryUsage(jobId, memUse -> { + establishedModelMemoryUsage.set(memUse); + latch.countDown(); + }, e -> { + exception.set(e); + latch.countDown(); + }); + + latch.await(); + + if (exception.get() != null) { + throw exception.get(); + } + + return establishedModelMemoryUsage.get(); + } +}