From b2243337d8c83f83bb00f7fd48d9e1491072e759 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 9 Jul 2020 19:15:46 +0300 Subject: [PATCH] [7.x][ML] Data frame analytics max_num_threads setting (#59254) (#59308) This adds a setting to data frame analytics jobs called `max_number_threads`. The setting expects a positive integer. When used the user specifies the max number of threads that may be used by the analysis. Note that the actual number of threads used is limited by the number of processors on the node where the job is assigned. Also, the process may use a couple more threads for operational functionality that is not the analysis itself. This setting may also be updated for a stopped job. More threads may reduce the time it takes to complete the job at the cost of using more CPU. Backport of #59254 and #57274 --- .../dataframe/DataFrameAnalyticsConfig.java | 28 +++++++++-- .../DataFrameAnalyticsConfigUpdate.java | 27 +++++++++-- .../client/MachineLearningIT.java | 1 + .../MlClientDocumentationIT.java | 2 + .../DataFrameAnalyticsConfigTests.java | 3 ++ .../DataFrameAnalyticsConfigUpdateTests.java | 3 ++ .../ml/put-data-frame-analytics.asciidoc | 1 + .../ml/update-data-frame-analytics.asciidoc | 1 + .../apis/put-dfanalytics.asciidoc | 11 ++++- .../apis/update-dfanalytics.asciidoc | 8 ++++ .../dataframe/DataFrameAnalyticsConfig.java | 43 ++++++++++++++--- .../DataFrameAnalyticsConfigUpdate.java | 48 ++++++++++++++++--- .../ml/job/results/ReservedFieldNames.java | 1 + .../xpack/core/ml/config_index_mappings.json | 3 ++ .../DataFrameAnalyticsConfigTests.java | 33 +++++++++++++ .../DataFrameAnalyticsConfigUpdateTests.java | 45 ++++++++++++++++- .../xpack/ml/MachineLearning.java | 4 +- .../process/AnalyticsProcessConfig.java | 4 ++ .../process/AnalyticsProcessManager.java | 16 +++++-- .../process/AnalyticsProcessManagerTests.java | 2 +- .../test/ml/data_frame_analytics_crud.yml | 8 +++- 21 files changed, 260 insertions(+), 32 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java index 7cc2000a44e..0f79048261a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java @@ -57,6 +57,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { static final ParseField CREATE_TIME = new ParseField("create_time"); static final ParseField VERSION = new ParseField("version"); static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); + static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads"); private static final ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); @@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { ValueType.VALUE); PARSER.declareString(Builder::setVersion, Version::fromString, VERSION); PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); + PARSER.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS); } private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException { @@ -100,11 +102,13 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { private final Instant createTime; private final Version version; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source, @Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis, @Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) { + @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.source = source; @@ -115,6 +119,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());; this.version = version; this.allowLazyStart = allowLazyStart; + this.maxNumThreads = maxNumThreads; } public String getId() { @@ -157,6 +162,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -193,6 +202,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { if (allowLazyStart != null) { builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -212,12 +224,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { && Objects.equals(modelMemoryLimit, other.modelMemoryLimit) && Objects.equals(createTime, other.createTime) && Objects.equals(version, other.version) - && Objects.equals(allowLazyStart, other.allowLazyStart); + && Objects.equals(allowLazyStart, other.allowLazyStart) + && Objects.equals(maxNumThreads, other.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart); + return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart, + maxNumThreads); } @Override @@ -237,6 +251,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { private Instant createTime; private Version version; private Boolean allowLazyStart; + private Integer maxNumThreads; private Builder() {} @@ -290,9 +305,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfig build() { return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, - version, allowLazyStart); + version, allowLazyStart, maxNumThreads); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java index 1d5ecb66577..f6bda01bcf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -51,22 +51,25 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, VALUE); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); - + PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS); } private final String id; private final String description; private final ByteSizeValue modelMemoryLimit; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfigUpdate(String id, @Nullable String description, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Boolean allowLazyStart) { + @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.modelMemoryLimit = modelMemoryLimit; this.allowLazyStart = allowLazyStart; + this.maxNumThreads = maxNumThreads; } public String getId() { @@ -85,6 +88,10 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -98,6 +105,9 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { if (allowLazyStart != null) { builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -117,12 +127,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return Objects.equals(this.id, that.id) && Objects.equals(this.description, that.description) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) - && Objects.equals(this.allowLazyStart, that.allowLazyStart); + && Objects.equals(this.allowLazyStart, that.allowLazyStart) + && Objects.equals(this.maxNumThreads, that.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } public static class Builder { @@ -131,6 +142,7 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { private String description; private ByteSizeValue modelMemoryLimit; private Boolean allowLazyStart; + private Integer maxNumThreads; private Builder() {} @@ -158,8 +170,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfigUpdate build() { - return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 59e2304aa0f..d4c13b9bade 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -1338,6 +1338,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields())); assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value assertThat(createdConfig.getDescription(), equalTo("some description")); + assertThat(createdConfig.getMaxNumThreads(), equalTo(1)); } public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 2c0081fcf33..d771de53fae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3040,6 +3040,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { .setAnalyzedFields(analyzedFields) // <5> .setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6> .setDescription("this is an example description") // <7> + .setMaxNumThreads(1) // <8> .build(); // end::put-data-frame-analytics-config @@ -3096,6 +3097,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { .setId("my-analytics-config") // <1> .setDescription("new description") // <2> .setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3> + .setMaxNumThreads(4) // <4> .build(); // end::update-data-frame-analytics-config-update diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java index 6b0d1ee7760..0be652c597f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -69,6 +69,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase The fields to be included in / excluded from the analysis <6> The memory limit for the model created as part of the analysis process <7> Optionally, a human-readable description +<8> The maximum number of threads to be used by the analysis. Defaults to 1. [id="{upid}-{api}-query-config"] diff --git a/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc index b6df7d25d04..a110baa49e6 100644 --- a/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc @@ -34,6 +34,7 @@ include-tagged::{doc-tests-file}[{api}-config-update] <1> The {dfanalytics-job} ID <2> The human-readable description <3> The memory limit for the model created as part of the analysis process +<4> The maximum number of threads to be used by the analysis [id="{upid}-{api}-query-config"] diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index 35c76acf34c..41fd4214291 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -324,6 +324,14 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] `dest`:: (Required, object) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest] + +`max_num_threads`:: +(Optional, integer) +The maximum number of threads to be used by the analysis. +The default value is `1`. Using more threads may decrease the time +necessary to complete the analysis at the cost of using more CPU. +Note that the process may use additional threads for operational +functionality other than the analysis itself. `model_memory_limit`:: (Optional, string) @@ -508,7 +516,8 @@ The API returns the following result: "model_memory_limit": "1gb", "create_time" : 1562265491319, "version" : "7.6.0", - "allow_lazy_start" : false + "allow_lazy_start" : false, + "max_num_threads": 1 } ---- // TESTRESPONSE[s/1562265491319/$body.$_path/] diff --git a/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc index 0c3a36e95e4..3cb11d7b32b 100644 --- a/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc @@ -71,6 +71,14 @@ the `starting` state until sufficient {ml} node capacity is available. (Optional, string) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] +`max_num_threads`:: +(Optional, integer) +The maximum number of threads to be used by the analysis. +The default value is `1`. Using more threads may decrease the time +necessary to complete the analysis at the cost of using more CPU. +Note that the process may use additional threads for operational +functionality other than the analysis itself. + `model_memory_limit`:: (Optional, string) The approximate maximum amount of memory resources that are permitted for diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 2ea69966298..5b46e9d0dc7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -60,6 +60,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField VERSION = new ParseField("version"); public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); + public static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads"); public static final ObjectParser STRICT_PARSER = createParser(false); public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { parser.declareField(Builder::setModelMemoryLimit, (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE); parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); + parser.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS); if (ignoreUnknownFields) { // Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected. // (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.) @@ -122,10 +124,12 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { private final Instant createTime; private final Version version; private final boolean allowLazyStart; + private final int maxNumThreads; private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, - DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, - FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) { + DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, + FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart, + Integer maxNumThreads) { this.id = ExceptionsHelper.requireNonNull(id, ID); this.description = description; this.source = ExceptionsHelper.requireNonNull(source, SOURCE); @@ -137,6 +141,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli()); this.version = version; this.allowLazyStart = allowLazyStart; + + if (maxNumThreads != null && maxNumThreads < 1) { + throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", MAX_NUM_THREADS.getPreferredName()); + } + this.maxNumThreads = maxNumThreads == null ? 1 : maxNumThreads; } public DataFrameAnalyticsConfig(StreamInput in) throws IOException { @@ -164,6 +173,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { } else { allowLazyStart = false; } + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + maxNumThreads = in.readVInt(); + } else { + maxNumThreads = 1; + } } public String getId() { @@ -210,6 +224,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -242,6 +260,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { builder.field(VERSION.getPreferredName(), version); } builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); + builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads); builder.endObject(); return builder; } @@ -270,6 +289,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeBoolean(allowLazyStart); } + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeVInt(maxNumThreads); + } } @Override @@ -288,13 +310,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { && Objects.equals(analyzedFields, other.analyzedFields) && Objects.equals(createTime, other.createTime) && Objects.equals(version, other.version) - && Objects.equals(allowLazyStart, other.allowLazyStart); + && Objects.equals(allowLazyStart, other.allowLazyStart) + && maxNumThreads == other.maxNumThreads; } @Override public int hashCode() { return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version, - allowLazyStart); + allowLazyStart, maxNumThreads); } @Override @@ -329,6 +352,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { private Instant createTime; private Version version; private boolean allowLazyStart; + private Integer maxNumThreads; public Builder() {} @@ -351,6 +375,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { this.createTime = config.createTime; this.version = config.version; this.allowLazyStart = config.allowLazyStart; + this.maxNumThreads = config.maxNumThreads; } public String getId() { @@ -412,13 +437,18 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + /** * Builds {@link DataFrameAnalyticsConfig} object. */ public DataFrameAnalyticsConfig build() { applyMaxModelMemoryLimit(); return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields, - createTime, version, allowLazyStart); + createTime, version, allowLazyStart, maxNumThreads); } /** @@ -438,7 +468,8 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { analyzedFields, createTime, version, - allowLazyStart); + allowLazyStart, + maxNumThreads); } private void applyMaxModelMemoryLimit() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java index c2fea3932e3..c6206edf767 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -13,6 +14,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; @@ -33,22 +35,30 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, VALUE); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); - + PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS); } private final String id; private final String description; private final ByteSizeValue modelMemoryLimit; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfigUpdate(String id, @Nullable String description, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Boolean allowLazyStart) { + @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.modelMemoryLimit = modelMemoryLimit; this.allowLazyStart = allowLazyStart; + + if (maxNumThreads != null && maxNumThreads < 1) { + throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", + DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName()); + } + this.maxNumThreads = maxNumThreads; } public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException { @@ -56,6 +66,11 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje this.description = in.readOptionalString(); this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new); this.allowLazyStart = in.readOptionalBoolean(); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + this.maxNumThreads = in.readOptionalVInt(); + } else { + this.maxNumThreads = null; + } } @Override @@ -64,6 +79,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje out.writeOptionalString(description); out.writeOptionalWriteable(modelMemoryLimit); out.writeOptionalBoolean(allowLazyStart); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalVInt(maxNumThreads); + } } public String getId() { @@ -82,6 +100,10 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -95,6 +117,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje if (allowLazyStart != null) { builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -120,6 +145,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje if (allowLazyStart != null) { builder.setAllowLazyStart(allowLazyStart); } + if (maxNumThreads != null) { + builder.setMaxNumThreads(maxNumThreads); + } return builder; } @@ -127,7 +155,8 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje * Whether this update applied to the given source config requires analytics task restart. */ public boolean requiresRestart(DataFrameAnalyticsConfig source) { - return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false; + return (getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false) + || (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false); } @Override @@ -145,12 +174,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return Objects.equals(this.id, that.id) && Objects.equals(this.description, that.description) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) - && Objects.equals(this.allowLazyStart, that.allowLazyStart); + && Objects.equals(this.allowLazyStart, that.allowLazyStart) + && Objects.equals(this.maxNumThreads, that.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } public static class Builder { @@ -159,6 +189,7 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje private String description; private ByteSizeValue modelMemoryLimit; private Boolean allowLazyStart; + private Integer maxNumThreads; public Builder(String id) { this.id = id; @@ -188,8 +219,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfigUpdate build() { - return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 1fa00b65a83..372b9b2e0d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -311,6 +311,7 @@ public final class ReservedFieldNames { DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName(), DataFrameAnalyticsConfig.CREATE_TIME.getPreferredName(), DataFrameAnalyticsConfig.VERSION.getPreferredName(), + DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), DataFrameAnalyticsDest.INDEX.getPreferredName(), DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), DataFrameAnalyticsSource.INDEX.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index 4c8cda279db..c880aa0c714 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -325,6 +325,9 @@ "job_version" : { "type" : "keyword" }, + "max_num_threads" : { + "type" : "integer" + }, "model_plot_config" : { "properties" : { "enabled" : { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index c5327b37d2e..cc095adb225 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; @@ -117,6 +118,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC builder.setCreateTime(null); builder.setVersion(null); } + if (version.before(Version.V_7_9_0)) { + builder.setMaxNumThreads(null); + } return builder.build(); } @@ -227,6 +231,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC if (randomBoolean()) { builder.setAllowLazyStart(randomBoolean()); } + if (randomBoolean()) { + builder.setMaxNumThreads(randomIntBetween(1, 20)); + } return builder; } @@ -484,6 +491,32 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue())); } + public void testCtor_GivenMaxNumThreadsIsZero() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() + .setId("test_config") + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setDest(new DataFrameAnalyticsDest("dest_index", null)) + .setAnalysis(new Regression("foo")) + .setMaxNumThreads(0) + .build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + + public void testCtor_GivenMaxNumThreadsIsNegative() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() + .setId("test_config") + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setDest(new DataFrameAnalyticsDest("dest_index", null)) + .setAnalysis(new Regression("foo")) + .setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)) + .build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + private static void assertTooSmall(ElasticsearchStatusException e) { assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb.")); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java index ae6bc9952bb..6f974fbdc8f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java @@ -5,9 +5,11 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; @@ -47,6 +49,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest if (randomBoolean()) { builder.setAllowLazyStart(randomBoolean()); } + if (randomBoolean()) { + builder.setMaxNumThreads(randomIntBetween(1, 20)); + } return builder.build(); } @@ -81,6 +86,15 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build()))); } + public void testMergeWithConfig_UpdatedMaxNumThreads() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(3).build(); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(5).build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setMaxNumThreads(5).build()))); + } + public void testMergeWithConfig_UpdatedAllUpdatableProperties() { String id = randomValidId(); DataFrameAnalyticsConfig config = @@ -88,12 +102,14 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest .setDescription("old description") .setModelMemoryLimit(new ByteSizeValue(1024)) .setAllowLazyStart(false) + .setMaxNumThreads(1) .build(); DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id) .setDescription("new description") .setModelMemoryLimit(new ByteSizeValue(2048)) .setAllowLazyStart(true) + .setMaxNumThreads(4) .build(); assertThat( update.mergeWithConfig(config).build(), @@ -102,6 +118,7 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest .setDescription("new description") .setModelMemoryLimit(new ByteSizeValue(2048)) .setAllowLazyStart(true) + .setMaxNumThreads(4) .build()))); } @@ -155,9 +172,35 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest assertThat(update.requiresRestart(config), is(true)); } + public void testRequiresRestart_MaxNumThreadsUpdateRequiresRestart() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(1).build(); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(8).build(); + + assertThat(update.requiresRestart(config), is(true)); + } + + public void testCtor_GivenMaxNumberThreadsIsZero() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(0).build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + + public void testCtor_GivenMaxNumberThreadsIsNegative() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)).build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) { return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription())) && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit())) - && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())); + && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())) + && (update.getMaxNumThreads() == null || Objects.equals(config.getMaxNumThreads(), update.getMaxNumThreads())); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 950f4b4937f..ff7a82cde02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -699,7 +700,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, dataFrameAnalyticsAuditor, trainedModelProvider, modelLoadingService, - resultsPersisterService); + resultsPersisterService, + EsExecutors.allocatedProcessors(settings)); MemoryUsageEstimationProcessManager memoryEstimationProcessManager = new MemoryUsageEstimationProcessManager( threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java index 2a2287c4fbb..dd93357e3e2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java @@ -61,6 +61,10 @@ public class AnalyticsProcessConfig implements ToXContentObject { return cols; } + public int threads() { + return threads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index e0072403c2a..1991f935409 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -70,6 +70,7 @@ public class AnalyticsProcessManager { private final TrainedModelProvider trainedModelProvider; private final ModelLoadingService modelLoadingService; private final ResultsPersisterService resultsPersisterService; + private final int numAllocatedProcessors; public AnalyticsProcessManager(Client client, ThreadPool threadPool, @@ -77,7 +78,8 @@ public class AnalyticsProcessManager { DataFrameAnalyticsAuditor auditor, TrainedModelProvider trainedModelProvider, ModelLoadingService modelLoadingService, - ResultsPersisterService resultsPersisterService) { + ResultsPersisterService resultsPersisterService, + int numAllocatedProcessors) { this( client, threadPool.generic(), @@ -86,7 +88,8 @@ public class AnalyticsProcessManager { auditor, trainedModelProvider, modelLoadingService, - resultsPersisterService); + resultsPersisterService, + numAllocatedProcessors); } // Visible for testing @@ -97,7 +100,8 @@ public class AnalyticsProcessManager { DataFrameAnalyticsAuditor auditor, TrainedModelProvider trainedModelProvider, ModelLoadingService modelLoadingService, - ResultsPersisterService resultsPersisterService) { + ResultsPersisterService resultsPersisterService, + int numAllocatedProcessors) { this.client = Objects.requireNonNull(client); this.executorServiceForJob = Objects.requireNonNull(executorServiceForJob); this.executorServiceForProcess = Objects.requireNonNull(executorServiceForProcess); @@ -106,6 +110,7 @@ public class AnalyticsProcessManager { this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); this.modelLoadingService = Objects.requireNonNull(modelLoadingService); this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); + this.numAllocatedProcessors = numAllocatedProcessors; } public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) { @@ -452,7 +457,7 @@ public class AnalyticsProcessManager { dataExtractor.set(dataExtractorFactory.newExtractor(false)); AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields()); - LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); + LOGGER.debug("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); // If we have no rows, that means there is no data so no point in starting the native process // just finish the task if (analyticsProcessConfig.rows() == 0) { @@ -468,12 +473,13 @@ public class AnalyticsProcessManager { ExtractedFields extractedFields) { DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); Set categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis()); + int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors); return new AnalyticsProcessConfig( config.getId(), dataSummary.rows, dataSummary.cols, config.getModelMemoryLimit(), - 1, + threads, config.getDest().getResultsField(), categoricalFields, config.getAnalysis(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 1100b86109e..1f1c2851820 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -113,7 +113,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase { resultsPersisterService = mock(ResultsPersisterService.class); modelLoadingService = mock(ModelLoadingService.class); processManager = new AnalyticsProcessManager(client, executorServiceForJob, executorServiceForProcess, processFactory, auditor, - trainedModelProvider, modelLoadingService, resultsPersisterService); + trainedModelProvider, modelLoadingService, resultsPersisterService, 1); } public void testRunJob_TaskIsStopping() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 9922f721255..f75f61f37c2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -2118,12 +2118,14 @@ setup: "analysis": {"outlier_detection":{}}, "description": "before update", "model_memory_limit": "20mb", - "allow_lazy_start": false + "allow_lazy_start": false, + "max_num_threads": 1 } - match: { id: "update-test-job" } - match: { description: "before update" } - match: { model_memory_limit: "20mb" } - match: { allow_lazy_start: false } + - match: { max_num_threads: 1 } - do: ml.update_data_frame_analytics: @@ -2132,12 +2134,14 @@ setup: { "description": "after update", "model_memory_limit": "30mb", - "allow_lazy_start": true + "allow_lazy_start": true, + "max_num_threads": 2 } - match: { id: "update-test-job" } - match: { description: "after update" } - match: { model_memory_limit: "30mb" } - match: { allow_lazy_start: true } + - match: { max_num_threads: 2 } --- "Test update given missing analytics":