diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java index 7cc2000a44e..0f79048261a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java @@ -57,6 +57,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { static final ParseField CREATE_TIME = new ParseField("create_time"); static final ParseField VERSION = new ParseField("version"); static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); + static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads"); private static final ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); @@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { ValueType.VALUE); PARSER.declareString(Builder::setVersion, Version::fromString, VERSION); PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); + PARSER.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS); } private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException { @@ -100,11 +102,13 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { private final Instant createTime; private final Version version; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source, @Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis, @Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) { + @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.source = source; @@ -115,6 +119,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());; this.version = version; this.allowLazyStart = allowLazyStart; + this.maxNumThreads = maxNumThreads; } public String getId() { @@ -157,6 +162,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -193,6 +202,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { if (allowLazyStart != null) { builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -212,12 +224,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { && Objects.equals(modelMemoryLimit, other.modelMemoryLimit) && Objects.equals(createTime, other.createTime) && Objects.equals(version, other.version) - && Objects.equals(allowLazyStart, other.allowLazyStart); + && Objects.equals(allowLazyStart, other.allowLazyStart) + && Objects.equals(maxNumThreads, other.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart); + return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart, + maxNumThreads); } @Override @@ -237,6 +251,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { private Instant createTime; private Version version; private Boolean allowLazyStart; + private Integer maxNumThreads; private Builder() {} @@ -290,9 +305,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfig build() { return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, - version, allowLazyStart); + version, allowLazyStart, maxNumThreads); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java index 1d5ecb66577..f6bda01bcf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -51,22 +51,25 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, VALUE); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); - + PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS); } private final String id; private final String description; private final ByteSizeValue modelMemoryLimit; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfigUpdate(String id, @Nullable String description, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Boolean allowLazyStart) { + @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.modelMemoryLimit = modelMemoryLimit; this.allowLazyStart = allowLazyStart; + this.maxNumThreads = maxNumThreads; } public String getId() { @@ -85,6 +88,10 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -98,6 +105,9 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { if (allowLazyStart != null) { builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -117,12 +127,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return Objects.equals(this.id, that.id) && Objects.equals(this.description, that.description) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) - && Objects.equals(this.allowLazyStart, that.allowLazyStart); + && Objects.equals(this.allowLazyStart, that.allowLazyStart) + && Objects.equals(this.maxNumThreads, that.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } public static class Builder { @@ -131,6 +142,7 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { private String description; private ByteSizeValue modelMemoryLimit; private Boolean allowLazyStart; + private Integer maxNumThreads; private Builder() {} @@ -158,8 +170,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfigUpdate build() { - return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 59e2304aa0f..d4c13b9bade 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -1338,6 +1338,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields())); assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value assertThat(createdConfig.getDescription(), equalTo("some description")); + assertThat(createdConfig.getMaxNumThreads(), equalTo(1)); } public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 2c0081fcf33..d771de53fae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3040,6 +3040,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { .setAnalyzedFields(analyzedFields) // <5> .setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6> .setDescription("this is an example description") // <7> + .setMaxNumThreads(1) // <8> .build(); // end::put-data-frame-analytics-config @@ -3096,6 +3097,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { .setId("my-analytics-config") // <1> .setDescription("new description") // <2> .setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3> + .setMaxNumThreads(4) // <4> .build(); // end::update-data-frame-analytics-config-update diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java index 6b0d1ee7760..0be652c597f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -69,6 +69,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase The fields to be included in / excluded from the analysis <6> The memory limit for the model created as part of the analysis process <7> Optionally, a human-readable description +<8> The maximum number of threads to be used by the analysis. Defaults to 1. [id="{upid}-{api}-query-config"] diff --git a/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc index b6df7d25d04..a110baa49e6 100644 --- a/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc @@ -34,6 +34,7 @@ include-tagged::{doc-tests-file}[{api}-config-update] <1> The {dfanalytics-job} ID <2> The human-readable description <3> The memory limit for the model created as part of the analysis process +<4> The maximum number of threads to be used by the analysis [id="{upid}-{api}-query-config"] diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index 35c76acf34c..41fd4214291 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -324,6 +324,14 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] `dest`:: (Required, object) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest] + +`max_num_threads`:: +(Optional, integer) +The maximum number of threads to be used by the analysis. +The default value is `1`. Using more threads may decrease the time +necessary to complete the analysis at the cost of using more CPU. +Note that the process may use additional threads for operational +functionality other than the analysis itself. `model_memory_limit`:: (Optional, string) @@ -508,7 +516,8 @@ The API returns the following result: "model_memory_limit": "1gb", "create_time" : 1562265491319, "version" : "7.6.0", - "allow_lazy_start" : false + "allow_lazy_start" : false, + "max_num_threads": 1 } ---- // TESTRESPONSE[s/1562265491319/$body.$_path/] diff --git a/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc index 0c3a36e95e4..3cb11d7b32b 100644 --- a/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc @@ -71,6 +71,14 @@ the `starting` state until sufficient {ml} node capacity is available. (Optional, string) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] +`max_num_threads`:: +(Optional, integer) +The maximum number of threads to be used by the analysis. +The default value is `1`. Using more threads may decrease the time +necessary to complete the analysis at the cost of using more CPU. +Note that the process may use additional threads for operational +functionality other than the analysis itself. + `model_memory_limit`:: (Optional, string) The approximate maximum amount of memory resources that are permitted for diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 2ea69966298..5b46e9d0dc7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -60,6 +60,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField VERSION = new ParseField("version"); public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); + public static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads"); public static final ObjectParser STRICT_PARSER = createParser(false); public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { parser.declareField(Builder::setModelMemoryLimit, (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE); parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); + parser.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS); if (ignoreUnknownFields) { // Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected. // (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.) @@ -122,10 +124,12 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { private final Instant createTime; private final Version version; private final boolean allowLazyStart; + private final int maxNumThreads; private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, - DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, - FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) { + DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, + FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart, + Integer maxNumThreads) { this.id = ExceptionsHelper.requireNonNull(id, ID); this.description = description; this.source = ExceptionsHelper.requireNonNull(source, SOURCE); @@ -137,6 +141,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli()); this.version = version; this.allowLazyStart = allowLazyStart; + + if (maxNumThreads != null && maxNumThreads < 1) { + throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", MAX_NUM_THREADS.getPreferredName()); + } + this.maxNumThreads = maxNumThreads == null ? 1 : maxNumThreads; } public DataFrameAnalyticsConfig(StreamInput in) throws IOException { @@ -164,6 +173,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { } else { allowLazyStart = false; } + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + maxNumThreads = in.readVInt(); + } else { + maxNumThreads = 1; + } } public String getId() { @@ -210,6 +224,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -242,6 +260,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { builder.field(VERSION.getPreferredName(), version); } builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); + builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads); builder.endObject(); return builder; } @@ -270,6 +289,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeBoolean(allowLazyStart); } + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeVInt(maxNumThreads); + } } @Override @@ -288,13 +310,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { && Objects.equals(analyzedFields, other.analyzedFields) && Objects.equals(createTime, other.createTime) && Objects.equals(version, other.version) - && Objects.equals(allowLazyStart, other.allowLazyStart); + && Objects.equals(allowLazyStart, other.allowLazyStart) + && maxNumThreads == other.maxNumThreads; } @Override public int hashCode() { return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version, - allowLazyStart); + allowLazyStart, maxNumThreads); } @Override @@ -329,6 +352,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { private Instant createTime; private Version version; private boolean allowLazyStart; + private Integer maxNumThreads; public Builder() {} @@ -351,6 +375,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { this.createTime = config.createTime; this.version = config.version; this.allowLazyStart = config.allowLazyStart; + this.maxNumThreads = config.maxNumThreads; } public String getId() { @@ -412,13 +437,18 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + /** * Builds {@link DataFrameAnalyticsConfig} object. */ public DataFrameAnalyticsConfig build() { applyMaxModelMemoryLimit(); return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields, - createTime, version, allowLazyStart); + createTime, version, allowLazyStart, maxNumThreads); } /** @@ -438,7 +468,8 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { analyzedFields, createTime, version, - allowLazyStart); + allowLazyStart, + maxNumThreads); } private void applyMaxModelMemoryLimit() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java index c2fea3932e3..c6206edf767 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -13,6 +14,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; @@ -33,22 +35,30 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, VALUE); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); - + PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS); } private final String id; private final String description; private final ByteSizeValue modelMemoryLimit; private final Boolean allowLazyStart; + private final Integer maxNumThreads; private DataFrameAnalyticsConfigUpdate(String id, @Nullable String description, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Boolean allowLazyStart) { + @Nullable Boolean allowLazyStart, + @Nullable Integer maxNumThreads) { this.id = id; this.description = description; this.modelMemoryLimit = modelMemoryLimit; this.allowLazyStart = allowLazyStart; + + if (maxNumThreads != null && maxNumThreads < 1) { + throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", + DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName()); + } + this.maxNumThreads = maxNumThreads; } public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException { @@ -56,6 +66,11 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje this.description = in.readOptionalString(); this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new); this.allowLazyStart = in.readOptionalBoolean(); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + this.maxNumThreads = in.readOptionalVInt(); + } else { + this.maxNumThreads = null; + } } @Override @@ -64,6 +79,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje out.writeOptionalString(description); out.writeOptionalWriteable(modelMemoryLimit); out.writeOptionalBoolean(allowLazyStart); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalVInt(maxNumThreads); + } } public String getId() { @@ -82,6 +100,10 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return allowLazyStart; } + public Integer getMaxNumThreads() { + return maxNumThreads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -95,6 +117,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje if (allowLazyStart != null) { builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); } + if (maxNumThreads != null) { + builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads); + } builder.endObject(); return builder; } @@ -120,6 +145,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje if (allowLazyStart != null) { builder.setAllowLazyStart(allowLazyStart); } + if (maxNumThreads != null) { + builder.setMaxNumThreads(maxNumThreads); + } return builder; } @@ -127,7 +155,8 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje * Whether this update applied to the given source config requires analytics task restart. */ public boolean requiresRestart(DataFrameAnalyticsConfig source) { - return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false; + return (getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false) + || (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false); } @Override @@ -145,12 +174,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return Objects.equals(this.id, that.id) && Objects.equals(this.description, that.description) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) - && Objects.equals(this.allowLazyStart, that.allowLazyStart); + && Objects.equals(this.allowLazyStart, that.allowLazyStart) + && Objects.equals(this.maxNumThreads, that.maxNumThreads); } @Override public int hashCode() { - return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } public static class Builder { @@ -159,6 +189,7 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje private String description; private ByteSizeValue modelMemoryLimit; private Boolean allowLazyStart; + private Integer maxNumThreads; public Builder(String id) { this.id = id; @@ -188,8 +219,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje return this; } + public Builder setMaxNumThreads(Integer maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + public DataFrameAnalyticsConfigUpdate build() { - return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 1fa00b65a83..372b9b2e0d5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -311,6 +311,7 @@ public final class ReservedFieldNames { DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName(), DataFrameAnalyticsConfig.CREATE_TIME.getPreferredName(), DataFrameAnalyticsConfig.VERSION.getPreferredName(), + DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), DataFrameAnalyticsDest.INDEX.getPreferredName(), DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), DataFrameAnalyticsSource.INDEX.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index 4c8cda279db..c880aa0c714 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -325,6 +325,9 @@ "job_version" : { "type" : "keyword" }, + "max_num_threads" : { + "type" : "integer" + }, "model_plot_config" : { "properties" : { "enabled" : { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index c5327b37d2e..cc095adb225 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; @@ -117,6 +118,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC builder.setCreateTime(null); builder.setVersion(null); } + if (version.before(Version.V_7_9_0)) { + builder.setMaxNumThreads(null); + } return builder.build(); } @@ -227,6 +231,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC if (randomBoolean()) { builder.setAllowLazyStart(randomBoolean()); } + if (randomBoolean()) { + builder.setMaxNumThreads(randomIntBetween(1, 20)); + } return builder; } @@ -484,6 +491,32 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue())); } + public void testCtor_GivenMaxNumThreadsIsZero() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() + .setId("test_config") + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setDest(new DataFrameAnalyticsDest("dest_index", null)) + .setAnalysis(new Regression("foo")) + .setMaxNumThreads(0) + .build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + + public void testCtor_GivenMaxNumThreadsIsNegative() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() + .setId("test_config") + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setDest(new DataFrameAnalyticsDest("dest_index", null)) + .setAnalysis(new Regression("foo")) + .setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)) + .build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + private static void assertTooSmall(ElasticsearchStatusException e) { assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb.")); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java index ae6bc9952bb..6f974fbdc8f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java @@ -5,9 +5,11 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; @@ -47,6 +49,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest if (randomBoolean()) { builder.setAllowLazyStart(randomBoolean()); } + if (randomBoolean()) { + builder.setMaxNumThreads(randomIntBetween(1, 20)); + } return builder.build(); } @@ -81,6 +86,15 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build()))); } + public void testMergeWithConfig_UpdatedMaxNumThreads() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(3).build(); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(5).build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setMaxNumThreads(5).build()))); + } + public void testMergeWithConfig_UpdatedAllUpdatableProperties() { String id = randomValidId(); DataFrameAnalyticsConfig config = @@ -88,12 +102,14 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest .setDescription("old description") .setModelMemoryLimit(new ByteSizeValue(1024)) .setAllowLazyStart(false) + .setMaxNumThreads(1) .build(); DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id) .setDescription("new description") .setModelMemoryLimit(new ByteSizeValue(2048)) .setAllowLazyStart(true) + .setMaxNumThreads(4) .build(); assertThat( update.mergeWithConfig(config).build(), @@ -102,6 +118,7 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest .setDescription("new description") .setModelMemoryLimit(new ByteSizeValue(2048)) .setAllowLazyStart(true) + .setMaxNumThreads(4) .build()))); } @@ -155,9 +172,35 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest assertThat(update.requiresRestart(config), is(true)); } + public void testRequiresRestart_MaxNumThreadsUpdateRequiresRestart() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(1).build(); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(8).build(); + + assertThat(update.requiresRestart(config), is(true)); + } + + public void testCtor_GivenMaxNumberThreadsIsZero() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(0).build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + + public void testCtor_GivenMaxNumberThreadsIsNegative() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)).build()); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer")); + } + private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) { return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription())) && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit())) - && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())); + && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())) + && (update.getMaxNumThreads() == null || Objects.equals(config.getMaxNumThreads(), update.getMaxNumThreads())); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 950f4b4937f..ff7a82cde02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -699,7 +700,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, dataFrameAnalyticsAuditor, trainedModelProvider, modelLoadingService, - resultsPersisterService); + resultsPersisterService, + EsExecutors.allocatedProcessors(settings)); MemoryUsageEstimationProcessManager memoryEstimationProcessManager = new MemoryUsageEstimationProcessManager( threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java index 2a2287c4fbb..dd93357e3e2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessConfig.java @@ -61,6 +61,10 @@ public class AnalyticsProcessConfig implements ToXContentObject { return cols; } + public int threads() { + return threads; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index e0072403c2a..1991f935409 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -70,6 +70,7 @@ public class AnalyticsProcessManager { private final TrainedModelProvider trainedModelProvider; private final ModelLoadingService modelLoadingService; private final ResultsPersisterService resultsPersisterService; + private final int numAllocatedProcessors; public AnalyticsProcessManager(Client client, ThreadPool threadPool, @@ -77,7 +78,8 @@ public class AnalyticsProcessManager { DataFrameAnalyticsAuditor auditor, TrainedModelProvider trainedModelProvider, ModelLoadingService modelLoadingService, - ResultsPersisterService resultsPersisterService) { + ResultsPersisterService resultsPersisterService, + int numAllocatedProcessors) { this( client, threadPool.generic(), @@ -86,7 +88,8 @@ public class AnalyticsProcessManager { auditor, trainedModelProvider, modelLoadingService, - resultsPersisterService); + resultsPersisterService, + numAllocatedProcessors); } // Visible for testing @@ -97,7 +100,8 @@ public class AnalyticsProcessManager { DataFrameAnalyticsAuditor auditor, TrainedModelProvider trainedModelProvider, ModelLoadingService modelLoadingService, - ResultsPersisterService resultsPersisterService) { + ResultsPersisterService resultsPersisterService, + int numAllocatedProcessors) { this.client = Objects.requireNonNull(client); this.executorServiceForJob = Objects.requireNonNull(executorServiceForJob); this.executorServiceForProcess = Objects.requireNonNull(executorServiceForProcess); @@ -106,6 +110,7 @@ public class AnalyticsProcessManager { this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); this.modelLoadingService = Objects.requireNonNull(modelLoadingService); this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); + this.numAllocatedProcessors = numAllocatedProcessors; } public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) { @@ -452,7 +457,7 @@ public class AnalyticsProcessManager { dataExtractor.set(dataExtractorFactory.newExtractor(false)); AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields()); - LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); + LOGGER.debug("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); // If we have no rows, that means there is no data so no point in starting the native process // just finish the task if (analyticsProcessConfig.rows() == 0) { @@ -468,12 +473,13 @@ public class AnalyticsProcessManager { ExtractedFields extractedFields) { DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); Set categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis()); + int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors); return new AnalyticsProcessConfig( config.getId(), dataSummary.rows, dataSummary.cols, config.getModelMemoryLimit(), - 1, + threads, config.getDest().getResultsField(), categoricalFields, config.getAnalysis(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 1100b86109e..1f1c2851820 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -113,7 +113,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase { resultsPersisterService = mock(ResultsPersisterService.class); modelLoadingService = mock(ModelLoadingService.class); processManager = new AnalyticsProcessManager(client, executorServiceForJob, executorServiceForProcess, processFactory, auditor, - trainedModelProvider, modelLoadingService, resultsPersisterService); + trainedModelProvider, modelLoadingService, resultsPersisterService, 1); } public void testRunJob_TaskIsStopping() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 9922f721255..f75f61f37c2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -2118,12 +2118,14 @@ setup: "analysis": {"outlier_detection":{}}, "description": "before update", "model_memory_limit": "20mb", - "allow_lazy_start": false + "allow_lazy_start": false, + "max_num_threads": 1 } - match: { id: "update-test-job" } - match: { description: "before update" } - match: { model_memory_limit: "20mb" } - match: { allow_lazy_start: false } + - match: { max_num_threads: 1 } - do: ml.update_data_frame_analytics: @@ -2132,12 +2134,14 @@ setup: { "description": "after update", "model_memory_limit": "30mb", - "allow_lazy_start": true + "allow_lazy_start": true, + "max_num_threads": 2 } - match: { id: "update-test-job" } - match: { description: "after update" } - match: { model_memory_limit: "30mb" } - match: { allow_lazy_start: true } + - match: { max_num_threads: 2 } --- "Test update given missing analytics":