[ML] Switch from max_running_jobs to xpack.ml.max_open_jobs (elastic/x-pack-elasticsearch#2232)

This change makes 2 improvements to the max_running_jobs setting:

1. Namespaces it by adding the xpack.ml. prefix
2. Renames "running" to "open", because the "running" terminology
   is not used elsewhere

The old max_running_jobs setting is used as a fallback if the new
xpack.ml.max_open_jobs setting is not specified.  max_running_jobs
is deprecated and (to ease backporting in the short term) will be
removed from 7.0 in a different PR closer to release of 7.0.

Relates elastic/x-pack-elasticsearch#2185

Original commit: elastic/x-pack-elasticsearch@18c539f9bb
This commit is contained in:
David Roberts 2017-08-11 09:00:33 +01:00 committed by GitHub
parent 5f30508efd
commit cb3f3d2d04
8 changed files with 60 additions and 23 deletions

View File

@ -104,7 +104,7 @@ progress of a {dfeed}. For example:
`ephemeral_id`::: The node ephemeral ID.
`transport_address`::: The host and port where transport HTTP connections are
accepted. For example, `127.0.0.1:9300`.
`attributes`::: For example, `{"max_running_jobs": "10"}`.
`attributes`::: For example, `{"ml.max_open_jobs": "10"}`.
`state`::
(string) The status of the {dfeed}, which can be one of the following values: +

View File

@ -196,4 +196,4 @@ This information is available only for open jobs.
(string) The host and port where transport HTTP connections are accepted.
`attributes`::
(object) For example, {"max_running_jobs": "10"}.
(object) For example, {"ml.max_open_jobs": "10"}.

View File

@ -190,7 +190,8 @@ public class MachineLearning implements ActionPlugin {
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE));
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE));
}
public Settings additionalSettings() {
@ -204,7 +205,7 @@ public class MachineLearning implements ActionPlugin {
// TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0
additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true");
additionalSettings.put("node.attr." + MAX_OPEN_JOBS_NODE_ATTR,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings));
}
return additionalSettings.build();
}
@ -434,7 +435,7 @@ public class MachineLearning implements ActionPlugin {
if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) {
return emptyList();
}
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
// 4 threads per job: for cpp logging, result processing, state processing and
// AutodetectProcessManager worker thread:
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,

View File

@ -84,7 +84,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE;
public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.Response, OpenJobAction.RequestBuilder> {
@ -578,7 +578,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
AutodetectProcessManager autodetectProcessManager) {
super(settings, TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.autodetectProcessManager = autodetectProcessManager;
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
@ -739,7 +739,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() +
" [" + maxNumberOfOpenJobs + "]";
logger.trace(reason);
reasons.add(reason);

View File

@ -74,19 +74,24 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.common.settings.Setting.Property;
public class AutodetectProcessManager extends AbstractComponent {
// TODO: Ideally this setting shouldn't need to exist
// We should be able from the job config to estimate the memory/cpu a job needs to have,
// and if we know that then we can prior to assigning a job to a node fail based on the
// available resources on that node: https://github.com/elastic/x-pack-elasticsearch/issues/546
// Note: on small instances on cloud, this setting will be set to: 1
// However, it is useful to also be able to apply a hard limit.
// WARNING: This setting cannot be made DYNAMIC, because it is tied to several threadpools
// WARNING: These settings cannot be made DYNAMIC, because they are tied to several threadpools
// and a threadpool's size can't be changed at runtime.
// See MachineLearning#getExecutorBuilders(...)
// TODO: Remove the deprecated setting in 7.0 and move the default value to the replacement setting
@Deprecated
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope);
Setting.intSetting("max_running_jobs", 10, 1, 512, Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope);
private final Client client;
private final ThreadPool threadPool;
@ -116,7 +121,7 @@ public class AutodetectProcessManager extends AbstractComponent {
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxAllowedRunningJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.autodetectProcessFactory = autodetectProcessFactory;
this.normalizerFactory = normalizerFactory;
this.jobManager = jobManager;

View File

@ -151,7 +151,7 @@ public class OpenJobActionTests extends ESTestCase {
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], max_running_jobs [" + maxRunningJobsPerNode + "]"));
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
}
public void testSelectLeastLoadedMlNode_noMlNodes() {

View File

@ -88,7 +88,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
} catch (ElasticsearchStatusException e) {
assertTrue(e.getMessage(), e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation"));
assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" + maxNumberOfJobsPerNode +
"], max_running_jobs [" + maxNumberOfJobsPerNode + "]]"));
"], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]"));
logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i);
// close the first job and check if the latest job gets opened:
@ -111,12 +111,12 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
}
private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
// clear all nodes, so that we can set max_running_jobs setting:
// clear all nodes, so that we can set xpack.ml.max_open_jobs setting:
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
}
logger.info("Started [{}] nodes", numNodes);
ensureStableCluster(numNodes);

View File

@ -122,6 +122,37 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}).when(jobProvider).getAutodetectParams(any(), any(), any());
}
public void testMaxOpenJobsSetting_givenDefault() {
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
assertEquals(10, maxOpenJobs);
}
public void testMaxOpenJobsSetting_givenNewSettingOnly() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(7, maxOpenJobs);
}
public void testMaxOpenJobsSetting_givenOldSettingOnly() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(9, maxOpenJobs);
assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! "
+ "See the breaking changes documentation for the next major version.");
}
public void testMaxOpenJobsSetting_givenOldAndNewSettings() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7);
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(7, maxOpenJobs);
assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! "
+ "See the breaking changes documentation for the next major version.");
}
public void testOpenJob_withoutVersion() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
@ -137,7 +168,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn(job.getId());
AtomicReference<Exception> errorHolder = new AtomicReference<>();
manager.openJob(jobTask, e -> errorHolder.set(e));
manager.openJob(jobTask, errorHolder::set);
Exception error = errorHolder.get();
assertThat(error, is(notNullValue()));
@ -180,7 +211,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor));
@ -245,6 +276,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
XContentType xContentType = randomFrom(XContentType.values());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
BiConsumer<DataCounts, Exception> handler = (BiConsumer<DataCounts, Exception>) invocationOnMock.getArguments()[3];
handler.accept(null, new IOException("blah"));
return null;
@ -354,6 +386,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
FlushJobParams params = FlushJobParams.builder().build();
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
BiConsumer<Void, Exception> handler = (BiConsumer<Void, Exception>) invocationOnMock.getArguments()[1];
handler.accept(null, new IOException("blah"));
return null;
@ -455,9 +488,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
DataCounts[] dataCounts = new DataCounts[1];
manager.processData(jobTask, inputStream,
randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {
dataCounts[0] = dataCounts1;
});
randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> dataCounts[0] = dataCounts1);
assertThat(dataCounts[0], equalTo(new DataCounts("foo")));
}