[ML] Resolve 7.0.0 TODOs in ML code (#36842)

This change cleans up a number of ugly BWC
workarounds in the ML code.

7.0 cannot run in a mixed version cluster with
versions prior to 6.7, so code that deals with
these old versions is no longer required.

Closes #29963
This commit is contained in:
David Roberts 2018-12-20 12:49:57 +00:00 committed by GitHub
parent 1236461e3e
commit 0f2f00a20a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 78 additions and 247 deletions

View File

@ -39,7 +39,6 @@ public class OpenJobAction extends Action<AcknowledgedResponse> {
public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open";
private OpenJobAction() {
super(NAME);
}
@ -132,15 +131,12 @@ public class OpenJobAction extends Action<AcknowledgedResponse> {
public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {
/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField JOB = new ParseField("job");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareObject(JobParams::setJob, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOB);

View File

@ -130,7 +130,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField SOURCE = new ParseField("_source");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
@ -185,9 +184,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return parsedScriptFields;
}, SCRIPT_FIELDS);
parser.declareInt(Builder::setScrollSize, SCROLL_SIZE);
// TODO this is to read former _source field. Remove in v7.0.0
parser.declareBoolean((builder, value) -> {
}, SOURCE);
parser.declareObject(Builder::setChunkingConfig, ignoreUnknownFields ? ChunkingConfig.LENIENT_PARSER : ChunkingConfig.STRICT_PARSER,
CHUNKING_CONFIG);

View File

@ -136,30 +136,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
detectors = Collections.unmodifiableList(in.readList(Detector::new));
influencers = Collections.unmodifiableList(in.readList(StreamInput::readString));
// BWC for result_finalization_window and overlapping_buckets
// TODO Remove in 7.0.0
if (in.getVersion().before(Version.V_6_6_0)) {
in.readOptionalBoolean();
in.readOptionalLong();
}
multivariateByFields = in.readOptionalBoolean();
// BWC for removed multiple_bucket_spans
// TODO Remove in 7.0.0
if (in.getVersion().before(Version.V_6_5_0)) {
if (in.readBoolean()) {
final int arraySize = in.readVInt();
for (int i = 0; i < arraySize; i++) {
in.readTimeValue();
}
}
}
// BWC for removed per-partition normalization
// TODO Remove in 7.0.0
if (in.getVersion().before(Version.V_6_5_0)) {
in.readBoolean();
}
}
@Override
@ -180,25 +157,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
out.writeList(detectors);
out.writeStringList(influencers);
// BWC for result_finalization_window and overlapping_buckets
// TODO Remove in 7.0.0
if (out.getVersion().before(Version.V_6_6_0)) {
out.writeOptionalBoolean(null);
out.writeOptionalLong(null);
}
out.writeOptionalBoolean(multivariateByFields);
// BWC for removed multiple_bucket_spans
// TODO Remove in 7.0.0
if (out.getVersion().before(Version.V_6_5_0)) {
out.writeBoolean(false);
}
// BWC for removed per-partition normalization
// TODO Remove in 7.0.0
if (out.getVersion().before(Version.V_6_5_0)) {
out.writeBoolean(false);
}
}
/**

View File

@ -266,7 +266,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
xContentBuilder.field("job_id", jobId);
xContentBuilder.array("indexes", "airline-data");
xContentBuilder.array("types", "_doc");
xContentBuilder.field("_source", true);
xContentBuilder.endObject();
Request request = new Request("PUT", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
request.setJsonEntity(Strings.toString(xContentBuilder));

View File

@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
@ -259,7 +260,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Boolean> ML_ENABLED =
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope);
public static final String ML_ENABLED_NODE_ATTR = "ml.enabled";
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
private static final String PRE_V7_ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
@ -289,6 +291,14 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
public static boolean isMlNode(DiscoveryNode node) {
Map<String, String> nodeAttributes = node.getAttributes();
try {
return Integer.parseInt(nodeAttributes.get(MAX_OPEN_JOBS_NODE_ATTR)) > 0;
} catch (NumberFormatException e) {
return false;
}
}
public List<Setting<?>> getSettings() {
return Collections.unmodifiableList(
@ -299,16 +309,14 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
MAX_LAZY_ML_NODES,
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
}
public Settings additionalSettings() {
String mlEnabledNodeAttrName = "node.attr." + ML_ENABLED_NODE_ATTR;
String mlEnabledNodeAttrName = "node.attr." + PRE_V7_ML_ENABLED_NODE_ATTR;
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR;
String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
@ -320,12 +328,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0
addMlNodeAttribute(additionalSettings, mlEnabledNodeAttrName, "true");
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)));
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
disallowMlNodeAttributes(mlEnabledNodeAttrName);
} else {
disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
}

View File

@ -143,8 +143,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
int mlNodeCount = 0;
for (DiscoveryNode node : clusterState.getNodes()) {
String enabled = node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR);
if (Boolean.parseBoolean(enabled)) {
if (MachineLearning.isMlNode(node)) {
++mlNodeCount;
}
}

View File

@ -399,9 +399,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId),
// TODO: remove in 7.0
Quantiles.v54DocumentId(jobId));
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setAbortOnVersionConflict(false);
@ -436,9 +434,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum),
// TODO: remove in 7.0
CategorizerState.v54DocumentId(jobId, docNum));
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setAbortOnVersionConflict(false);

View File

@ -69,7 +69,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@ -104,7 +103,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
@ -114,14 +112,12 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
XPackLicenseState licenseState, ClusterService clusterService,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider,
MlMemoryTracker memoryTracker) {
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
OpenJobAction.Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
@ -151,7 +147,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, @Nullable Job job,
ClusterState clusterState,
int maxConcurrentJobAllocations,
int fallbackMaxNumberOfOpenJobs,
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
Logger logger) {
@ -189,9 +184,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
DiscoveryNode minLoadedNodeByMemory = null;
PersistentTasksCustomMetaData persistentTasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String enabled = nodeAttributes.get(MachineLearning.ML_ENABLED_NODE_ATTR);
if (Boolean.valueOf(enabled) == false) {
if (MachineLearning.isMlNode(node) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node)
+ "], because this node isn't a ml node.";
logger.trace(reason);
@ -281,10 +274,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
continue;
}
Map<String, String> nodeAttributes = node.getAttributes();
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
int maxNumberOfOpenJobs = fallbackMaxNumberOfOpenJobs;
// TODO: remove leniency and reject the node if the attribute is null in 7.0
if (maxNumberOfOpenJobsStr != null) {
int maxNumberOfOpenJobs;
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
@ -294,7 +286,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
reasons.add(reason);
continue;
}
}
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (availableCount == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
@ -311,9 +302,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
long machineMemory = -1;
// TODO: remove leniency and reject the node if the attribute is null in 7.0
if (machineMemoryStr != null) {
long machineMemory;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
@ -323,7 +312,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
reasons.add(reason);
continue;
}
}
if (allocateByMemory) {
if (machineMemory > 0) {
@ -735,13 +723,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final MlMemoryTracker memoryTracker;
private final Client client;
/**
* The maximum number of open jobs can be different on each node. However, nodes on older versions
* won't add their setting to the cluster state, so for backwards compatibility with these nodes we
* assume the older node's setting is the same as that of the node running this code.
* TODO: remove this member in 7.0
*/
private final int fallbackMaxNumberOfOpenJobs;
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
@ -753,7 +734,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
this.autodetectProcessManager = autodetectProcessManager;
this.memoryTracker = memoryTracker;
this.client = client;
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
@ -770,14 +750,13 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
params.getJob(),
clusterState,
maxConcurrentJobAllocations,
fallbackMaxNumberOfOpenJobs,
maxMachineMemoryPercent,
memoryTracker,
logger);
if (assignment.getExecutorNode() == null) {
int numMlNodes = 0;
for (DiscoveryNode node : clusterState.getNodes()) {
if (Boolean.valueOf(node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR))) {
if (MachineLearning.isMlNode(node)) {
numMlNodes++;
}
}

View File

@ -5,21 +5,13 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
public class TransportPersistJobAction extends TransportJobTaskAction<PersistJobAction.Request, PersistJobAction.Response> {
@ -44,33 +36,4 @@ public class TransportPersistJobAction extends TransportJobTaskAction<PersistJob
}
});
}
@Override
protected void doExecute(Task task, PersistJobAction.Request request, ActionListener<PersistJobAction.Response> listener) {
// TODO Remove this overridden method in 7.0.0
DiscoveryNodes nodes = clusterService.state().nodes();
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(request.getJobId(), tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
logger.debug("[{}] Cannot persist the job because the job is not open", request.getJobId());
listener.onResponse(new PersistJobAction.Response(false));
return;
}
DiscoveryNode executorNode = nodes.get(jobTask.getExecutorNode());
if (executorNode == null) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot persist job [{}] as" +
"executor node [{}] cannot be found", request.getJobId(), jobTask.getExecutorNode()));
return;
}
Version nodeVersion = executorNode.getVersion();
if (nodeVersion.before(Version.V_6_3_0)) {
listener.onFailure(
new ElasticsearchException("Cannot persist job [" + request.getJobId() + "] on node with version " + nodeVersion));
return;
}
super.doExecute(task, request, listener);
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import java.util.List;
@ -59,45 +58,6 @@ public class JobDataDeleter {
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
// TODO: remove in 7.0
ActionListener<BulkResponse> docDeleteListener = ActionListener.wrap(
response -> {
// if the doc delete worked then don't bother trying the old types
if (response.hasFailures() == false) {
listener.onResponse(response);
return;
}
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (ModelSnapshot modelSnapshot : modelSnapshots) {
for (String stateDocId : modelSnapshot.legacyStateDocumentIds()) {
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE, stateDocId));
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.v54DocumentId(modelSnapshot)));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
bulkRequestBuilder.execute(ActionListener.wrap(
listener::onResponse,
// ignore problems relating to single type indices - if we're running against a single type
// index then it must be type doc, so just return the response from deleting that type
e -> {
if (e instanceof IllegalArgumentException
&& e.getMessage().contains("as the final mapping would have more than 1 type")) {
listener.onResponse(response);
}
listener.onFailure(e);
}
));
} catch (Exception e) {
listener.onFailure(e);
}
},
listener::onFailure
);
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for (ModelSnapshot modelSnapshot : modelSnapshots) {
for (String stateDocId : modelSnapshot.stateDocumentIds()) {
@ -110,7 +70,7 @@ public class JobDataDeleter {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), docDeleteListener);
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), listener);
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -82,16 +82,9 @@ public class AutodetectBuilder {
/**
* The maximum number of anomaly records that will be written each bucket
*/
@Deprecated
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS,
Setting.Property.NodeScope, Setting.Property.Deprecated);
// Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value.
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting(
"xpack.ml.max_anomaly_records",
MAX_ANOMALY_RECORDS_SETTING,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic);
// Though this setting is dynamic, it is only set when a new job is opened. So, already running jobs will not get the updated value.
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting("xpack.ml.max_anomaly_records",
DEFAULT_MAX_NUM_RECORDS, Setting.Property.NodeScope, Setting.Property.Dynamic);
/**
* Config setting storing the flag that disables model persistence

View File

@ -95,15 +95,11 @@ public class AutodetectProcessManager {
// available resources on that node: https://github.com/elastic/x-pack-elasticsearch/issues/546
// However, it is useful to also be able to apply a hard limit.
// WARNING: These settings cannot be made DYNAMIC, because they are tied to several threadpools
// WARNING: This setting cannot be made DYNAMIC, because it is tied to several threadpools
// and a threadpool's size can't be changed at runtime.
// See MachineLearning#getExecutorBuilders(...)
// TODO: Remove the deprecated setting in 7.0 and move the default value to the replacement setting
@Deprecated
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 20, 1, 512, Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope);
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, 512, Property.NodeScope);
// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =

View File

@ -342,7 +342,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (int i = 0; i < nodeCount; i++) {
Map<String, String> attrs = new HashMap<>();
attrs.put(MachineLearning.ML_ENABLED_NODE_ATTR, Boolean.toString(true));
attrs.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(20));
Set<DiscoveryNode.Role> roles = new HashSet<>();
roles.add(DiscoveryNode.Role.DATA);
roles.add(DiscoveryNode.Role.MASTER);

View File

@ -108,8 +108,9 @@ public class TransportOpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode_byCount() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
// MachineLearning.MACHINE_MEMORY_NODE_ATTR not set, so this will fall back to allocating by count
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
// MachineLearning.MACHINE_MEMORY_NODE_ATTR negative, so this will fall back to allocating by count
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -135,7 +136,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
jobBuilder.setJobVersion(Version.CURRENT);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
cs.build(), 2, 10, 30, memoryTracker, logger);
cs.build(), 2, 30, memoryTracker, logger);
assertEquals("", result.getExplanation());
assertEquals("_node_id3", result.getExecutorNode());
}
@ -146,7 +147,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
int maxRunningJobsPerNode = randomIntBetween(1, 100);
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
String[] jobIds = new String[numNodes * maxRunningJobsPerNode];
@ -171,7 +173,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
maxRunningJobsPerNode, 30, memoryTracker, logger);
30, memoryTracker, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
@ -197,14 +199,15 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, logger);
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
assertNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -231,7 +234,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
ClusterState cs = csBuilder.build();
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, logger);
assertEquals("_node_id3", result.getExecutorNode());
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@ -241,7 +244,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -252,7 +255,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -263,14 +266,15 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
assertNull("no node selected, because null state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -301,7 +305,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
// Allocation won't be possible if the stale failed job is treated as opening
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
assertEquals("_node_id1", result.getExecutorNode());
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@ -311,14 +315,15 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
@ -342,7 +347,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30,
memoryTracker, logger);
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
assertNull(result.getExecutorNode());
@ -350,7 +355,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.V_6_2_0))
@ -373,7 +379,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(),
2, 10, 30, memoryTracker, logger);
2, 30, memoryTracker, logger);
assertThat(result.getExplanation(), containsString(
"because the job's model snapshot requires a node of version [6.3.0] or higher"));
assertNull(result.getExecutorNode());
@ -381,7 +387,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersion() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.V_6_2_0))
@ -400,7 +407,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.metaData(metaData);
Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
logger);
assertThat(result.getExplanation(), containsString(
"because jobs using custom_rules require a node of version [6.4.0] or higher"));
@ -409,7 +416,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.V_6_2_0))
@ -428,7 +436,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.metaData(metaData);
Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
logger);
assertNotNull(result.getExecutorNode());
}

View File

@ -209,7 +209,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
PersistentTask<?> task = tasks.getTask(MlTasks.jobTaskId(jobId));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
@ -425,7 +424,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertNotNull(task.getExecutorNode());
assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes()));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
JobTaskState jobTaskState = (JobTaskState) task.getState();

View File

@ -153,25 +153,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(7, maxOpenJobs);
}
public void testMaxOpenJobsSetting_givenOldSettingOnly() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(9, maxOpenJobs);
assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! "
+ "See the breaking changes documentation for the next major version.");
}
public void testMaxOpenJobsSetting_givenOldAndNewSettings() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7);
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(7, maxOpenJobs);
assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! "
+ "See the breaking changes documentation for the next major version.");
}
public void testOpenJob() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);

View File

@ -160,7 +160,7 @@ setup:
- match: { datafeeds.0.state: "started"}
- is_true: datafeeds.0.node.name
- is_true: datafeeds.0.node.transport_address
- match: { datafeeds.0.node.attributes.ml\.enabled: "true"}
- match: { datafeeds.0.node.attributes.ml\.max_open_jobs: "20"}
---
"Test implicit get all datafeed stats given started datafeeds":

View File

@ -97,7 +97,7 @@ setup:
- match: { jobs.0.state: opened }
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.ml\.enabled: "true"}
- match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
- is_true: jobs.0.open_time
---