[7.x][ML] Data frame analytics max_num_threads setting (#59254) (#59308)

This adds a setting to data frame analytics jobs called
`max_number_threads`. The setting expects a positive integer.
When used the user specifies the max number of threads that may
be used by the analysis. Note that the actual number of threads
used is limited by the number of processors on the node where
the job is assigned. Also, the process may use a couple more threads
for operational functionality that is not the analysis itself.

This setting may also be updated for a stopped job.

More threads may reduce the time it takes to complete the job at the cost
of using more CPU.

Backport of #59254 and #57274
This commit is contained in:
Dimitris Athanasiou 2020-07-09 19:15:46 +03:00 committed by GitHub
parent 28ef997953
commit b2243337d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 260 additions and 32 deletions

View File

@ -57,6 +57,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
static final ParseField CREATE_TIME = new ParseField("create_time"); static final ParseField CREATE_TIME = new ParseField("create_time");
static final ParseField VERSION = new ParseField("version"); static final ParseField VERSION = new ParseField("version");
static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads");
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);
@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
ValueType.VALUE); ValueType.VALUE);
PARSER.declareString(Builder::setVersion, Version::fromString, VERSION); PARSER.declareString(Builder::setVersion, Version::fromString, VERSION);
PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
PARSER.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS);
} }
private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException { private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException {
@ -100,11 +102,13 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
private final Instant createTime; private final Instant createTime;
private final Version version; private final Version version;
private final Boolean allowLazyStart; private final Boolean allowLazyStart;
private final Integer maxNumThreads;
private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source, private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source,
@Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis, @Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis,
@Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit, @Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) { @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart,
@Nullable Integer maxNumThreads) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.source = source; this.source = source;
@ -115,6 +119,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());; this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());;
this.version = version; this.version = version;
this.allowLazyStart = allowLazyStart; this.allowLazyStart = allowLazyStart;
this.maxNumThreads = maxNumThreads;
} }
public String getId() { public String getId() {
@ -157,6 +162,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
return allowLazyStart; return allowLazyStart;
} }
public Integer getMaxNumThreads() {
return maxNumThreads;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -193,6 +202,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
if (allowLazyStart != null) { if (allowLazyStart != null) {
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
} }
if (maxNumThreads != null) {
builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -212,12 +224,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
&& Objects.equals(modelMemoryLimit, other.modelMemoryLimit) && Objects.equals(modelMemoryLimit, other.modelMemoryLimit)
&& Objects.equals(createTime, other.createTime) && Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version) && Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart); && Objects.equals(allowLazyStart, other.allowLazyStart)
&& Objects.equals(maxNumThreads, other.maxNumThreads);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart); return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart,
maxNumThreads);
} }
@Override @Override
@ -237,6 +251,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
private Instant createTime; private Instant createTime;
private Version version; private Version version;
private Boolean allowLazyStart; private Boolean allowLazyStart;
private Integer maxNumThreads;
private Builder() {} private Builder() {}
@ -290,9 +305,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
return this; return this;
} }
public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}
public DataFrameAnalyticsConfig build() { public DataFrameAnalyticsConfig build() {
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime,
version, allowLazyStart); version, allowLazyStart, maxNumThreads);
} }
} }
} }

View File

@ -51,22 +51,25 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
VALUE); VALUE);
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS);
} }
private final String id; private final String id;
private final String description; private final String description;
private final ByteSizeValue modelMemoryLimit; private final ByteSizeValue modelMemoryLimit;
private final Boolean allowLazyStart; private final Boolean allowLazyStart;
private final Integer maxNumThreads;
private DataFrameAnalyticsConfigUpdate(String id, private DataFrameAnalyticsConfigUpdate(String id,
@Nullable String description, @Nullable String description,
@Nullable ByteSizeValue modelMemoryLimit, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Boolean allowLazyStart) { @Nullable Boolean allowLazyStart,
@Nullable Integer maxNumThreads) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.modelMemoryLimit = modelMemoryLimit; this.modelMemoryLimit = modelMemoryLimit;
this.allowLazyStart = allowLazyStart; this.allowLazyStart = allowLazyStart;
this.maxNumThreads = maxNumThreads;
} }
public String getId() { public String getId() {
@ -85,6 +88,10 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
return allowLazyStart; return allowLazyStart;
} }
public Integer getMaxNumThreads() {
return maxNumThreads;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -98,6 +105,9 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
if (allowLazyStart != null) { if (allowLazyStart != null) {
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
} }
if (maxNumThreads != null) {
builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -117,12 +127,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
return Objects.equals(this.id, that.id) return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description) && Objects.equals(this.description, that.description)
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
&& Objects.equals(this.allowLazyStart, that.allowLazyStart); && Objects.equals(this.allowLazyStart, that.allowLazyStart)
&& Objects.equals(this.maxNumThreads, that.maxNumThreads);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
} }
public static class Builder { public static class Builder {
@ -131,6 +142,7 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
private String description; private String description;
private ByteSizeValue modelMemoryLimit; private ByteSizeValue modelMemoryLimit;
private Boolean allowLazyStart; private Boolean allowLazyStart;
private Integer maxNumThreads;
private Builder() {} private Builder() {}
@ -158,8 +170,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
return this; return this;
} }
public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}
public DataFrameAnalyticsConfigUpdate build() { public DataFrameAnalyticsConfigUpdate build() {
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
} }
} }
} }

View File

@ -1338,6 +1338,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields())); assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields()));
assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value
assertThat(createdConfig.getDescription(), equalTo("some description")); assertThat(createdConfig.getDescription(), equalTo("some description"));
assertThat(createdConfig.getMaxNumThreads(), equalTo(1));
} }
public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception { public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception {

View File

@ -3040,6 +3040,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
.setAnalyzedFields(analyzedFields) // <5> .setAnalyzedFields(analyzedFields) // <5>
.setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6> .setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6>
.setDescription("this is an example description") // <7> .setDescription("this is an example description") // <7>
.setMaxNumThreads(1) // <8>
.build(); .build();
// end::put-data-frame-analytics-config // end::put-data-frame-analytics-config
@ -3096,6 +3097,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
.setId("my-analytics-config") // <1> .setId("my-analytics-config") // <1>
.setDescription("new description") // <2> .setDescription("new description") // <2>
.setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3> .setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3>
.setMaxNumThreads(4) // <4>
.build(); .build();
// end::update-data-frame-analytics-config-update // end::update-data-frame-analytics-config-update

View File

@ -69,6 +69,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase<Data
if (randomBoolean()) { if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean()); builder.setAllowLazyStart(randomBoolean());
} }
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder.build(); return builder.build();
} }

View File

@ -46,6 +46,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractXContentTestCas
if (randomBoolean()) { if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean()); builder.setAllowLazyStart(randomBoolean());
} }
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder.build(); return builder.build();
} }

View File

@ -38,6 +38,7 @@ include-tagged::{doc-tests-file}[{api}-config]
<5> The fields to be included in / excluded from the analysis <5> The fields to be included in / excluded from the analysis
<6> The memory limit for the model created as part of the analysis process <6> The memory limit for the model created as part of the analysis process
<7> Optionally, a human-readable description <7> Optionally, a human-readable description
<8> The maximum number of threads to be used by the analysis. Defaults to 1.
[id="{upid}-{api}-query-config"] [id="{upid}-{api}-query-config"]

View File

@ -34,6 +34,7 @@ include-tagged::{doc-tests-file}[{api}-config-update]
<1> The {dfanalytics-job} ID <1> The {dfanalytics-job} ID
<2> The human-readable description <2> The human-readable description
<3> The memory limit for the model created as part of the analysis process <3> The memory limit for the model created as part of the analysis process
<4> The maximum number of threads to be used by the analysis
[id="{upid}-{api}-query-config"] [id="{upid}-{api}-query-config"]

View File

@ -324,6 +324,14 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
`dest`:: `dest`::
(Required, object) (Required, object)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest] include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest]
`max_num_threads`::
(Optional, integer)
The maximum number of threads to be used by the analysis.
The default value is `1`. Using more threads may decrease the time
necessary to complete the analysis at the cost of using more CPU.
Note that the process may use additional threads for operational
functionality other than the analysis itself.
`model_memory_limit`:: `model_memory_limit`::
(Optional, string) (Optional, string)
@ -508,7 +516,8 @@ The API returns the following result:
"model_memory_limit": "1gb", "model_memory_limit": "1gb",
"create_time" : 1562265491319, "create_time" : 1562265491319,
"version" : "7.6.0", "version" : "7.6.0",
"allow_lazy_start" : false "allow_lazy_start" : false,
"max_num_threads": 1
} }
---- ----
// TESTRESPONSE[s/1562265491319/$body.$_path/] // TESTRESPONSE[s/1562265491319/$body.$_path/]

View File

@ -71,6 +71,14 @@ the `starting` state until sufficient {ml} node capacity is available.
(Optional, string) (Optional, string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
`max_num_threads`::
(Optional, integer)
The maximum number of threads to be used by the analysis.
The default value is `1`. Using more threads may decrease the time
necessary to complete the analysis at the cost of using more CPU.
Note that the process may use additional threads for operational
functionality other than the analysis itself.
`model_memory_limit`:: `model_memory_limit`::
(Optional, string) (Optional, string)
The approximate maximum amount of memory resources that are permitted for The approximate maximum amount of memory resources that are permitted for

View File

@ -60,6 +60,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField VERSION = new ParseField("version"); public static final ParseField VERSION = new ParseField("version");
public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
public static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads");
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false); public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true); public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
parser.declareField(Builder::setModelMemoryLimit, parser.declareField(Builder::setModelMemoryLimit,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE); (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE);
parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
parser.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS);
if (ignoreUnknownFields) { if (ignoreUnknownFields) {
// Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected. // Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected.
// (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.) // (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.)
@ -122,10 +124,12 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private final Instant createTime; private final Instant createTime;
private final Version version; private final Version version;
private final boolean allowLazyStart; private final boolean allowLazyStart;
private final int maxNumThreads;
private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit, DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) { FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart,
Integer maxNumThreads) {
this.id = ExceptionsHelper.requireNonNull(id, ID); this.id = ExceptionsHelper.requireNonNull(id, ID);
this.description = description; this.description = description;
this.source = ExceptionsHelper.requireNonNull(source, SOURCE); this.source = ExceptionsHelper.requireNonNull(source, SOURCE);
@ -137,6 +141,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli()); this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.version = version; this.version = version;
this.allowLazyStart = allowLazyStart; this.allowLazyStart = allowLazyStart;
if (maxNumThreads != null && maxNumThreads < 1) {
throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", MAX_NUM_THREADS.getPreferredName());
}
this.maxNumThreads = maxNumThreads == null ? 1 : maxNumThreads;
} }
public DataFrameAnalyticsConfig(StreamInput in) throws IOException { public DataFrameAnalyticsConfig(StreamInput in) throws IOException {
@ -164,6 +173,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
} else { } else {
allowLazyStart = false; allowLazyStart = false;
} }
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
maxNumThreads = in.readVInt();
} else {
maxNumThreads = 1;
}
} }
public String getId() { public String getId() {
@ -210,6 +224,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
return allowLazyStart; return allowLazyStart;
} }
public Integer getMaxNumThreads() {
return maxNumThreads;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -242,6 +260,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
builder.field(VERSION.getPreferredName(), version); builder.field(VERSION.getPreferredName(), version);
} }
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -270,6 +289,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
if (out.getVersion().onOrAfter(Version.V_7_5_0)) { if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyStart); out.writeBoolean(allowLazyStart);
} }
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeVInt(maxNumThreads);
}
} }
@Override @Override
@ -288,13 +310,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
&& Objects.equals(analyzedFields, other.analyzedFields) && Objects.equals(analyzedFields, other.analyzedFields)
&& Objects.equals(createTime, other.createTime) && Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version) && Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart); && Objects.equals(allowLazyStart, other.allowLazyStart)
&& maxNumThreads == other.maxNumThreads;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version, return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version,
allowLazyStart); allowLazyStart, maxNumThreads);
} }
@Override @Override
@ -329,6 +352,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private Instant createTime; private Instant createTime;
private Version version; private Version version;
private boolean allowLazyStart; private boolean allowLazyStart;
private Integer maxNumThreads;
public Builder() {} public Builder() {}
@ -351,6 +375,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
this.createTime = config.createTime; this.createTime = config.createTime;
this.version = config.version; this.version = config.version;
this.allowLazyStart = config.allowLazyStart; this.allowLazyStart = config.allowLazyStart;
this.maxNumThreads = config.maxNumThreads;
} }
public String getId() { public String getId() {
@ -412,13 +437,18 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
return this; return this;
} }
public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}
/** /**
* Builds {@link DataFrameAnalyticsConfig} object. * Builds {@link DataFrameAnalyticsConfig} object.
*/ */
public DataFrameAnalyticsConfig build() { public DataFrameAnalyticsConfig build() {
applyMaxModelMemoryLimit(); applyMaxModelMemoryLimit();
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields, return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields,
createTime, version, allowLazyStart); createTime, version, allowLazyStart, maxNumThreads);
} }
/** /**
@ -438,7 +468,8 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
analyzedFields, analyzedFields,
createTime, createTime,
version, version,
allowLazyStart); allowLazyStart,
maxNumThreads);
} }
private void applyMaxModelMemoryLimit() { private void applyMaxModelMemoryLimit() {

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.ml.dataframe; package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -13,6 +14,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -33,22 +35,30 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
VALUE); VALUE);
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS);
} }
private final String id; private final String id;
private final String description; private final String description;
private final ByteSizeValue modelMemoryLimit; private final ByteSizeValue modelMemoryLimit;
private final Boolean allowLazyStart; private final Boolean allowLazyStart;
private final Integer maxNumThreads;
private DataFrameAnalyticsConfigUpdate(String id, private DataFrameAnalyticsConfigUpdate(String id,
@Nullable String description, @Nullable String description,
@Nullable ByteSizeValue modelMemoryLimit, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Boolean allowLazyStart) { @Nullable Boolean allowLazyStart,
@Nullable Integer maxNumThreads) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.modelMemoryLimit = modelMemoryLimit; this.modelMemoryLimit = modelMemoryLimit;
this.allowLazyStart = allowLazyStart; this.allowLazyStart = allowLazyStart;
if (maxNumThreads != null && maxNumThreads < 1) {
throw ExceptionsHelper.badRequestException("[{}] must be a positive integer",
DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName());
}
this.maxNumThreads = maxNumThreads;
} }
public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException { public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException {
@ -56,6 +66,11 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
this.description = in.readOptionalString(); this.description = in.readOptionalString();
this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new); this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new);
this.allowLazyStart = in.readOptionalBoolean(); this.allowLazyStart = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
this.maxNumThreads = in.readOptionalVInt();
} else {
this.maxNumThreads = null;
}
} }
@Override @Override
@ -64,6 +79,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
out.writeOptionalString(description); out.writeOptionalString(description);
out.writeOptionalWriteable(modelMemoryLimit); out.writeOptionalWriteable(modelMemoryLimit);
out.writeOptionalBoolean(allowLazyStart); out.writeOptionalBoolean(allowLazyStart);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalVInt(maxNumThreads);
}
} }
public String getId() { public String getId() {
@ -82,6 +100,10 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
return allowLazyStart; return allowLazyStart;
} }
public Integer getMaxNumThreads() {
return maxNumThreads;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -95,6 +117,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
if (allowLazyStart != null) { if (allowLazyStart != null) {
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
} }
if (maxNumThreads != null) {
builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -120,6 +145,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
if (allowLazyStart != null) { if (allowLazyStart != null) {
builder.setAllowLazyStart(allowLazyStart); builder.setAllowLazyStart(allowLazyStart);
} }
if (maxNumThreads != null) {
builder.setMaxNumThreads(maxNumThreads);
}
return builder; return builder;
} }
@ -127,7 +155,8 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
* Whether this update applied to the given source config requires analytics task restart. * Whether this update applied to the given source config requires analytics task restart.
*/ */
public boolean requiresRestart(DataFrameAnalyticsConfig source) { public boolean requiresRestart(DataFrameAnalyticsConfig source) {
return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false; return (getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false)
|| (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false);
} }
@Override @Override
@ -145,12 +174,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
return Objects.equals(this.id, that.id) return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description) && Objects.equals(this.description, that.description)
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
&& Objects.equals(this.allowLazyStart, that.allowLazyStart); && Objects.equals(this.allowLazyStart, that.allowLazyStart)
&& Objects.equals(this.maxNumThreads, that.maxNumThreads);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
} }
public static class Builder { public static class Builder {
@ -159,6 +189,7 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
private String description; private String description;
private ByteSizeValue modelMemoryLimit; private ByteSizeValue modelMemoryLimit;
private Boolean allowLazyStart; private Boolean allowLazyStart;
private Integer maxNumThreads;
public Builder(String id) { public Builder(String id) {
this.id = id; this.id = id;
@ -188,8 +219,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
return this; return this;
} }
public Builder setMaxNumThreads(Integer maxNumThreads) {
this.maxNumThreads = maxNumThreads;
return this;
}
public DataFrameAnalyticsConfigUpdate build() { public DataFrameAnalyticsConfigUpdate build() {
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
} }
} }
} }

View File

@ -311,6 +311,7 @@ public final class ReservedFieldNames {
DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName(), DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName(),
DataFrameAnalyticsConfig.CREATE_TIME.getPreferredName(), DataFrameAnalyticsConfig.CREATE_TIME.getPreferredName(),
DataFrameAnalyticsConfig.VERSION.getPreferredName(), DataFrameAnalyticsConfig.VERSION.getPreferredName(),
DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(),
DataFrameAnalyticsDest.INDEX.getPreferredName(), DataFrameAnalyticsDest.INDEX.getPreferredName(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(),
DataFrameAnalyticsSource.INDEX.getPreferredName(), DataFrameAnalyticsSource.INDEX.getPreferredName(),

View File

@ -325,6 +325,9 @@
"job_version" : { "job_version" : {
"type" : "keyword" "type" : "keyword"
}, },
"max_num_threads" : {
"type" : "integer"
},
"model_plot_config" : { "model_plot_config" : {
"properties" : { "properties" : {
"enabled" : { "enabled" : {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase;
@ -117,6 +118,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
builder.setCreateTime(null); builder.setCreateTime(null);
builder.setVersion(null); builder.setVersion(null);
} }
if (version.before(Version.V_7_9_0)) {
builder.setMaxNumThreads(null);
}
return builder.build(); return builder.build();
} }
@ -227,6 +231,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
if (randomBoolean()) { if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean()); builder.setAllowLazyStart(randomBoolean());
} }
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder; return builder;
} }
@ -484,6 +491,32 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue())); assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
} }
public void testCtor_GivenMaxNumThreadsIsZero() {
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(new Regression("foo"))
.setMaxNumThreads(0)
.build());
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
}
public void testCtor_GivenMaxNumThreadsIsNegative() {
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(new Regression("foo"))
.setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0))
.build());
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
}
private static void assertTooSmall(ElasticsearchStatusException e) { private static void assertTooSmall(ElasticsearchStatusException e) {
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb.")); assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
} }

View File

@ -5,9 +5,11 @@
*/ */
package org.elasticsearch.xpack.core.ml.dataframe; package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException; import java.io.IOException;
@ -47,6 +49,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
if (randomBoolean()) { if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean()); builder.setAllowLazyStart(randomBoolean());
} }
if (randomBoolean()) {
builder.setMaxNumThreads(randomIntBetween(1, 20));
}
return builder.build(); return builder.build();
} }
@ -81,6 +86,15 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build()))); is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build())));
} }
public void testMergeWithConfig_UpdatedMaxNumThreads() {
String id = randomValidId();
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(3).build();
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(5).build();
assertThat(
update.mergeWithConfig(config).build(),
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setMaxNumThreads(5).build())));
}
public void testMergeWithConfig_UpdatedAllUpdatableProperties() { public void testMergeWithConfig_UpdatedAllUpdatableProperties() {
String id = randomValidId(); String id = randomValidId();
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfig config =
@ -88,12 +102,14 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
.setDescription("old description") .setDescription("old description")
.setModelMemoryLimit(new ByteSizeValue(1024)) .setModelMemoryLimit(new ByteSizeValue(1024))
.setAllowLazyStart(false) .setAllowLazyStart(false)
.setMaxNumThreads(1)
.build(); .build();
DataFrameAnalyticsConfigUpdate update = DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id) new DataFrameAnalyticsConfigUpdate.Builder(id)
.setDescription("new description") .setDescription("new description")
.setModelMemoryLimit(new ByteSizeValue(2048)) .setModelMemoryLimit(new ByteSizeValue(2048))
.setAllowLazyStart(true) .setAllowLazyStart(true)
.setMaxNumThreads(4)
.build(); .build();
assertThat( assertThat(
update.mergeWithConfig(config).build(), update.mergeWithConfig(config).build(),
@ -102,6 +118,7 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
.setDescription("new description") .setDescription("new description")
.setModelMemoryLimit(new ByteSizeValue(2048)) .setModelMemoryLimit(new ByteSizeValue(2048))
.setAllowLazyStart(true) .setAllowLazyStart(true)
.setMaxNumThreads(4)
.build()))); .build())));
} }
@ -155,9 +172,35 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
assertThat(update.requiresRestart(config), is(true)); assertThat(update.requiresRestart(config), is(true));
} }
public void testRequiresRestart_MaxNumThreadsUpdateRequiresRestart() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(1).build();
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(8).build();
assertThat(update.requiresRestart(config), is(true));
}
public void testCtor_GivenMaxNumberThreadsIsZero() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(0).build());
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
}
public void testCtor_GivenMaxNumberThreadsIsNegative() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)).build());
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
}
private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) { private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) {
return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription())) return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription()))
&& (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit())) && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit()))
&& (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())); && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart()))
&& (update.getMaxNumThreads() == null || Objects.equals(config.getMaxNumThreads(), update.getMaxNumThreads()));
} }
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
@ -699,7 +700,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
dataFrameAnalyticsAuditor, dataFrameAnalyticsAuditor,
trainedModelProvider, trainedModelProvider,
modelLoadingService, modelLoadingService,
resultsPersisterService); resultsPersisterService,
EsExecutors.allocatedProcessors(settings));
MemoryUsageEstimationProcessManager memoryEstimationProcessManager = MemoryUsageEstimationProcessManager memoryEstimationProcessManager =
new MemoryUsageEstimationProcessManager( new MemoryUsageEstimationProcessManager(
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory);

View File

@ -61,6 +61,10 @@ public class AnalyticsProcessConfig implements ToXContentObject {
return cols; return cols;
} }
public int threads() {
return threads;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();

View File

@ -70,6 +70,7 @@ public class AnalyticsProcessManager {
private final TrainedModelProvider trainedModelProvider; private final TrainedModelProvider trainedModelProvider;
private final ModelLoadingService modelLoadingService; private final ModelLoadingService modelLoadingService;
private final ResultsPersisterService resultsPersisterService; private final ResultsPersisterService resultsPersisterService;
private final int numAllocatedProcessors;
public AnalyticsProcessManager(Client client, public AnalyticsProcessManager(Client client,
ThreadPool threadPool, ThreadPool threadPool,
@ -77,7 +78,8 @@ public class AnalyticsProcessManager {
DataFrameAnalyticsAuditor auditor, DataFrameAnalyticsAuditor auditor,
TrainedModelProvider trainedModelProvider, TrainedModelProvider trainedModelProvider,
ModelLoadingService modelLoadingService, ModelLoadingService modelLoadingService,
ResultsPersisterService resultsPersisterService) { ResultsPersisterService resultsPersisterService,
int numAllocatedProcessors) {
this( this(
client, client,
threadPool.generic(), threadPool.generic(),
@ -86,7 +88,8 @@ public class AnalyticsProcessManager {
auditor, auditor,
trainedModelProvider, trainedModelProvider,
modelLoadingService, modelLoadingService,
resultsPersisterService); resultsPersisterService,
numAllocatedProcessors);
} }
// Visible for testing // Visible for testing
@ -97,7 +100,8 @@ public class AnalyticsProcessManager {
DataFrameAnalyticsAuditor auditor, DataFrameAnalyticsAuditor auditor,
TrainedModelProvider trainedModelProvider, TrainedModelProvider trainedModelProvider,
ModelLoadingService modelLoadingService, ModelLoadingService modelLoadingService,
ResultsPersisterService resultsPersisterService) { ResultsPersisterService resultsPersisterService,
int numAllocatedProcessors) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.executorServiceForJob = Objects.requireNonNull(executorServiceForJob); this.executorServiceForJob = Objects.requireNonNull(executorServiceForJob);
this.executorServiceForProcess = Objects.requireNonNull(executorServiceForProcess); this.executorServiceForProcess = Objects.requireNonNull(executorServiceForProcess);
@ -106,6 +110,7 @@ public class AnalyticsProcessManager {
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
this.modelLoadingService = Objects.requireNonNull(modelLoadingService); this.modelLoadingService = Objects.requireNonNull(modelLoadingService);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
this.numAllocatedProcessors = numAllocatedProcessors;
} }
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) { public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) {
@ -452,7 +457,7 @@ public class AnalyticsProcessManager {
dataExtractor.set(dataExtractorFactory.newExtractor(false)); dataExtractor.set(dataExtractorFactory.newExtractor(false));
AnalyticsProcessConfig analyticsProcessConfig = AnalyticsProcessConfig analyticsProcessConfig =
createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields()); createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields());
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig)); LOGGER.debug("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
// If we have no rows, that means there is no data so no point in starting the native process // If we have no rows, that means there is no data so no point in starting the native process
// just finish the task // just finish the task
if (analyticsProcessConfig.rows() == 0) { if (analyticsProcessConfig.rows() == 0) {
@ -468,12 +473,13 @@ public class AnalyticsProcessManager {
ExtractedFields extractedFields) { ExtractedFields extractedFields) {
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis()); Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors);
return new AnalyticsProcessConfig( return new AnalyticsProcessConfig(
config.getId(), config.getId(),
dataSummary.rows, dataSummary.rows,
dataSummary.cols, dataSummary.cols,
config.getModelMemoryLimit(), config.getModelMemoryLimit(),
1, threads,
config.getDest().getResultsField(), config.getDest().getResultsField(),
categoricalFields, categoricalFields,
config.getAnalysis(), config.getAnalysis(),

View File

@ -113,7 +113,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
resultsPersisterService = mock(ResultsPersisterService.class); resultsPersisterService = mock(ResultsPersisterService.class);
modelLoadingService = mock(ModelLoadingService.class); modelLoadingService = mock(ModelLoadingService.class);
processManager = new AnalyticsProcessManager(client, executorServiceForJob, executorServiceForProcess, processFactory, auditor, processManager = new AnalyticsProcessManager(client, executorServiceForJob, executorServiceForProcess, processFactory, auditor,
trainedModelProvider, modelLoadingService, resultsPersisterService); trainedModelProvider, modelLoadingService, resultsPersisterService, 1);
} }
public void testRunJob_TaskIsStopping() { public void testRunJob_TaskIsStopping() {

View File

@ -2118,12 +2118,14 @@ setup:
"analysis": {"outlier_detection":{}}, "analysis": {"outlier_detection":{}},
"description": "before update", "description": "before update",
"model_memory_limit": "20mb", "model_memory_limit": "20mb",
"allow_lazy_start": false "allow_lazy_start": false,
"max_num_threads": 1
} }
- match: { id: "update-test-job" } - match: { id: "update-test-job" }
- match: { description: "before update" } - match: { description: "before update" }
- match: { model_memory_limit: "20mb" } - match: { model_memory_limit: "20mb" }
- match: { allow_lazy_start: false } - match: { allow_lazy_start: false }
- match: { max_num_threads: 1 }
- do: - do:
ml.update_data_frame_analytics: ml.update_data_frame_analytics:
@ -2132,12 +2134,14 @@ setup:
{ {
"description": "after update", "description": "after update",
"model_memory_limit": "30mb", "model_memory_limit": "30mb",
"allow_lazy_start": true "allow_lazy_start": true,
"max_num_threads": 2
} }
- match: { id: "update-test-job" } - match: { id: "update-test-job" }
- match: { description: "after update" } - match: { description: "after update" }
- match: { model_memory_limit: "30mb" } - match: { model_memory_limit: "30mb" }
- match: { allow_lazy_start: true } - match: { allow_lazy_start: true }
- match: { max_num_threads: 2 }
--- ---
"Test update given missing analytics": "Test update given missing analytics":