From cb3f3d2d0489c6b7cfd68b51d0c7da2cffb91e3e Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 11 Aug 2017 09:00:33 +0100 Subject: [PATCH] [ML] Switch from max_running_jobs to xpack.ml.max_open_jobs (elastic/x-pack-elasticsearch#2232) This change makes 2 improvements to the max_running_jobs setting: 1. Namespaces it by adding the xpack.ml. prefix 2. Renames "running" to "open", because the "running" terminology is not used elsewhere The old max_running_jobs setting is used as a fallback if the new xpack.ml.max_open_jobs setting is not specified. max_running_jobs is deprecated and (to ease backporting in the short term) will be removed from 7.0 in a different PR closer to release of 7.0. Relates elastic/x-pack-elasticsearch#2185 Original commit: elastic/x-pack-elasticsearch@18c539f9bba838c6b63b6dde5abca6f7bf23e6e1 --- docs/en/rest-api/ml/datafeedresource.asciidoc | 2 +- docs/en/rest-api/ml/jobcounts.asciidoc | 2 +- .../xpack/ml/MachineLearning.java | 7 ++-- .../xpack/ml/action/OpenJobAction.java | 6 +-- .../autodetect/AutodetectProcessManager.java | 15 ++++--- .../xpack/ml/action/OpenJobActionTests.java | 2 +- .../xpack/ml/integration/TooManyJobsIT.java | 8 ++-- .../AutodetectProcessManagerTests.java | 41 ++++++++++++++++--- 8 files changed, 60 insertions(+), 23 deletions(-) diff --git a/docs/en/rest-api/ml/datafeedresource.asciidoc b/docs/en/rest-api/ml/datafeedresource.asciidoc index 17c6ee7716b..21712e748e2 100644 --- a/docs/en/rest-api/ml/datafeedresource.asciidoc +++ b/docs/en/rest-api/ml/datafeedresource.asciidoc @@ -104,7 +104,7 @@ progress of a {dfeed}. For example: `ephemeral_id`::: The node ephemeral ID. `transport_address`::: The host and port where transport HTTP connections are accepted. For example, `127.0.0.1:9300`. - `attributes`::: For example, `{"max_running_jobs": "10"}`. + `attributes`::: For example, `{"ml.max_open_jobs": "10"}`. `state`:: (string) The status of the {dfeed}, which can be one of the following values: + diff --git a/docs/en/rest-api/ml/jobcounts.asciidoc b/docs/en/rest-api/ml/jobcounts.asciidoc index 4b354829c2d..48394a126f2 100644 --- a/docs/en/rest-api/ml/jobcounts.asciidoc +++ b/docs/en/rest-api/ml/jobcounts.asciidoc @@ -196,4 +196,4 @@ This information is available only for open jobs. (string) The host and port where transport HTTP connections are accepted. `attributes`:: - (object) For example, {"max_running_jobs": "10"}. + (object) For example, {"ml.max_open_jobs": "10"}. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 362b3afdf5f..1c0c6e793f0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -190,7 +190,8 @@ public class MachineLearning implements ActionPlugin { ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, - AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE)); + AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, + AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE)); } public Settings additionalSettings() { @@ -204,7 +205,7 @@ public class MachineLearning implements ActionPlugin { // TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0 additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true"); additionalSettings.put("node.attr." + MAX_OPEN_JOBS_NODE_ATTR, - AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings)); + AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)); } return additionalSettings.build(); } @@ -434,7 +435,7 @@ public class MachineLearning implements ActionPlugin { if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) { return emptyList(); } - int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); + int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); // 4 threads per job: for cpp logging, result processing, state processing and // AutodetectProcessManager worker thread: FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 6427de84d55..127aaa17b4f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -84,7 +84,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; -import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; +import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE; public class OpenJobAction extends Action { @@ -578,7 +578,7 @@ public class OpenJobAction extends Action MAX_RUNNING_JOBS_PER_NODE = - Setting.intSetting("max_running_jobs", 10, 1, 512, Setting.Property.NodeScope); + Setting.intSetting("max_running_jobs", 10, 1, 512, Property.NodeScope, Property.Deprecated); + public static final Setting MAX_OPEN_JOBS_PER_NODE = + Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope); private final Client client; private final ThreadPool threadPool; @@ -116,7 +121,7 @@ public class AutodetectProcessManager extends AbstractComponent { this.client = client; this.threadPool = threadPool; this.xContentRegistry = xContentRegistry; - this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings); + this.maxAllowedRunningJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); this.autodetectProcessFactory = autodetectProcessFactory; this.normalizerFactory = normalizerFactory; this.jobManager = jobManager; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 45c3847ace1..dfebb506517 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -151,7 +151,7 @@ public class OpenJobActionTests extends ESTestCase { Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, maxRunningJobsPerNode, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode - + "], max_running_jobs [" + maxRunningJobsPerNode + "]")); + + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); } public void testSelectLeastLoadedMlNode_noMlNodes() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 2655ca11edb..3691cdf903e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -88,7 +88,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { } catch (ElasticsearchStatusException e) { assertTrue(e.getMessage(), e.getMessage().startsWith("Could not open job because no suitable nodes were found, allocation explanation")); assertTrue(e.getMessage(), e.getMessage().endsWith("because this node is full. Number of opened jobs [" + maxNumberOfJobsPerNode + - "], max_running_jobs [" + maxNumberOfJobsPerNode + "]]")); + "], xpack.ml.max_open_jobs [" + maxNumberOfJobsPerNode + "]]")); logger.info("good news everybody --> reached maximum number of allowed opened jobs, after trying to open the {}th job", i); // close the first job and check if the latest job gets opened: @@ -111,12 +111,12 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { } private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception { - // clear all nodes, so that we can set max_running_jobs setting: + // clear all nodes, so that we can set xpack.ml.max_open_jobs setting: internalCluster().ensureAtMostNumDataNodes(0); - logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); + logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); for (int i = 0; i < numNodes; i++) { internalCluster().startNode(Settings.builder() - .put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); } logger.info("Started [{}] nodes", numNodes); ensureStableCluster(numNodes); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 88125e8fc0b..636f9d2e029 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -122,6 +122,37 @@ public class AutodetectProcessManagerTests extends ESTestCase { }).when(jobProvider).getAutodetectParams(any(), any(), any()); } + public void testMaxOpenJobsSetting_givenDefault() { + int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY); + assertEquals(10, maxOpenJobs); + } + + public void testMaxOpenJobsSetting_givenNewSettingOnly() { + Settings.Builder settings = Settings.builder(); + settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7); + int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build()); + assertEquals(7, maxOpenJobs); + } + + public void testMaxOpenJobsSetting_givenOldSettingOnly() { + Settings.Builder settings = Settings.builder(); + settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9); + int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build()); + assertEquals(9, maxOpenJobs); + assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! " + + "See the breaking changes documentation for the next major version."); + } + + public void testMaxOpenJobsSetting_givenOldAndNewSettings() { + Settings.Builder settings = Settings.builder(); + settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7); + settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 9); + int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build()); + assertEquals(7, maxOpenJobs); + assertWarnings("[max_running_jobs] setting was deprecated in Elasticsearch and will be removed in a future release! " + + "See the breaking changes documentation for the next major version."); + } + public void testOpenJob_withoutVersion() { Client client = mock(Client.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); @@ -137,7 +168,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobTask.getJobId()).thenReturn(job.getId()); AtomicReference errorHolder = new AtomicReference<>(); - manager.openJob(jobTask, e -> errorHolder.set(e)); + manager.openJob(jobTask, errorHolder::set); Exception error = errorHolder.get(); assertThat(error, is(notNullValue())); @@ -180,7 +211,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess; Settings.Builder settings = Settings.builder(); - settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); + settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); @@ -245,6 +276,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); XContentType xContentType = randomFrom(XContentType.values()); doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[3]; handler.accept(null, new IOException("blah")); return null; @@ -354,6 +386,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { FlushJobParams params = FlushJobParams.builder().build(); doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; handler.accept(null, new IOException("blah")); return null; @@ -455,9 +488,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); DataCounts[] dataCounts = new DataCounts[1]; manager.processData(jobTask, inputStream, - randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { - dataCounts[0] = dataCounts1; - }); + randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> dataCounts[0] = dataCounts1); assertThat(dataCounts[0], equalTo(new DataCounts("foo"))); }