[ML] Use scaling thread pool and xpack.ml.max_open_jobs cluster-wide dynamic (#39736)

This change does the following:

1. Makes the per-node setting xpack.ml.max_open_jobs
   into a cluster-wide dynamic setting
2. Changes the job node selection to continue to use the
   per-node attributes storing the maximum number of open
   jobs if any node in the cluster is older than 7.1, and
   use the dynamic cluster-wide setting if all nodes are on
   7.1 or later
3. Changes the docs to reflect this
4. Changes the thread pools for native process communication
   from fixed size to scaling, to support the dynamic nature
   of xpack.ml.max_open_jobs
5. Renames the autodetect thread pool to the job comms
   thread pool to make clear that it will be used for other
   types of ML jobs (data frame analytics in particular)

Backport of #39320
This commit is contained in:
David Roberts 2019-03-06 12:29:34 +00:00 committed by GitHub
parent 52fd102e23
commit 5f8f91c03b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 152 additions and 116 deletions

View File

@ -134,7 +134,7 @@ update their values:
`ephemeral_id`::: The node ephemeral ID.
`transport_address`::: The host and port where transport HTTP connections are
accepted. For example, `127.0.0.1:9300`.
`attributes`::: For example, `{"ml.max_open_jobs": "10"}`.
`attributes`::: For example, `{"ml.machine_memory": "17179869184"}`.
`state`::
(string) The status of the {dfeed}, which can be one of the following values: +

View File

@ -86,8 +86,7 @@ The API returns the following results:
"transport_address": "127.0.0.1:9300",
"attributes": {
"ml.machine_memory": "17179869184",
"ml.max_open_jobs": "20",
"ml.enabled": "true"
"ml.max_open_jobs": "20"
}
},
"assignment_explanation": ""

View File

@ -229,4 +229,4 @@ This information is available only for open jobs.
(string) The host and port where transport HTTP connections are accepted.
`attributes`::
(object) For example, {"ml.max_open_jobs": "10"}.
(object) For example, {"ml.machine_memory": "17179869184"}.

View File

@ -1,3 +1,4 @@
[role="xpack"]
[[ml-settings]]
=== Machine learning settings in Elasticsearch
@ -44,27 +45,32 @@ IMPORTANT: If you want to use {ml} features in your cluster, you must have
`xpack.ml.enabled` set to `true` on all master-eligible nodes. This is the
default behavior.
`xpack.ml.max_machine_memory_percent`::
`xpack.ml.max_machine_memory_percent` (<<cluster-update-settings,Dynamic>>)::
The maximum percentage of the machine's memory that {ml} may use for running
analytics processes. (These processes are separate to the {es} JVM.) Defaults to
`30` percent. The limit is based on the total memory of the machine, not current
free memory. Jobs will not be allocated to a node if doing so would cause the
estimated memory use of {ml} jobs to exceed the limit.
`xpack.ml.max_model_memory_limit`::
`xpack.ml.max_model_memory_limit` (<<cluster-update-settings,Dynamic>>)::
The maximum `model_memory_limit` property value that can be set for any job on
this node. If you try to create a job with a `model_memory_limit` property value
that is greater than this setting value, an error occurs. Existing jobs are not
affected when you update this setting. For more information about the
`model_memory_limit` property, see <<ml-apilimits>>.
`xpack.ml.max_open_jobs`::
The maximum number of jobs that can run on a node. Defaults to `20`.
The maximum number of jobs is also constrained by memory usage, so fewer
jobs than specified by this setting will run on a node if the estimated
memory use of the jobs would be higher than allowed.
`xpack.ml.max_open_jobs` (<<cluster-update-settings,Dynamic>>)::
The maximum number of jobs that can run simultaneously on a node. Defaults to
`20`. In this context, jobs include both anomaly detector jobs and data frame
analytics jobs. The maximum number of jobs is also constrained by memory usage.
Thus if the estimated memory usage of the jobs would be higher than allowed,
fewer jobs will run on a node. Prior to version 7.1, this setting was a per-node
non-dynamic setting. It became a cluster-wide dynamic
setting in version 7.1. As a result, changes to its value after node startup
are used only after every node in the cluster is running version 7.1 or higher.
The maximum permitted value is `512`.
`xpack.ml.node_concurrent_job_allocations`::
`xpack.ml.node_concurrent_job_allocations` (<<cluster-update-settings,Dynamic>>)::
The maximum number of jobs that can concurrently be in the `opening` state on
each node. Typically, jobs spend a small amount of time in this state before
they move to `open` state. Jobs that must restore large models when they are

View File

@ -51,7 +51,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
@ -256,7 +256,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final String BASE_PATH = "/_ml/";
public static final String PRE_V7_BASE_PATH = "/_xpack/ml/";
public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed";
public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect";
public static final String JOB_COMMS_THREAD_POOL_NAME = NAME + "_job_comms";
public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility";
// This is for performance testing. It's not exposed to the end user.
@ -276,6 +276,17 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
// as the current node could be running in a cluster where some nodes are still using
// that setting. From 8.0.0 onwards we have the flexibility to increase it...
private static final int MAX_MAX_OPEN_JOBS_PER_NODE = 512;
// This setting is cluster-wide and can be set dynamically. However, prior to version 7.1 it was
// a non-dynamic per-node setting. n a mixed version cluster containing 6.7 or 7.0 nodes those
// older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters
// allocation will be based on the value first read at node startup rather than the current value.
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
private final Settings settings;
@ -315,7 +326,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
}
@ -333,8 +344,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below)
// The ML UI will need to be changed to check machineMemoryAttrName instead before this is done
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)));
String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
@ -608,35 +621,37 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)
);
}
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled || transportClientMode) {
return emptyList();
}
int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
// 4 threads per job: for cpp logging, result processing, state processing and
// AutodetectProcessManager worker thread:
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,
maxNumberOfJobs * 4, maxNumberOfJobs * 4, "xpack.ml.autodetect_thread_pool");
// 4 threads per job: processing logging, result and state of the renormalization process.
// Renormalization does't run for the entire lifetime of a job, so additionally autodetect process
// based operation (open, close, flush, post data), datafeed based operations (start and stop)
// and deleting expired data use this threadpool too and queue up if all threads are busy.
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool");
// These thread pools scale such that they can accommodate the maximum number of jobs per node
// that is permitted to be configured. It is up to other code to enforce the configured maximum
// number of jobs per node.
// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
// autodetect process are created at the same time then these two different TPs can merge.
FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME,
maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool");
return Arrays.asList(autoDetect, renormalizer, datafeed);
// 4 threads per job process: for input, c++ logger output, result processing and state processing.
ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME,
4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool");
// This pool is used by renormalization, plus some other parts of ML that
// need to kick off non-trivial activities that mustn't block other threads.
ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME,
1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool");
ScalingExecutorBuilder datafeed = new ScalingExecutorBuilder(DATAFEED_THREAD_POOL_NAME,
1, MAX_MAX_OPEN_JOBS_PER_NODE, TimeValue.timeValueMinutes(1), "xpack.ml.datafeed_thread_pool");
return Arrays.asList(jobComms, utility, datafeed);
}
@Override
public Map<String, AnalysisProvider<TokenizerFactory>> getTokenizers() {
return Collections.singletonMap(MlClassicTokenizer.NAME, MlClassicTokenizerFactory::new);
}
@Override
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
@ -68,7 +69,7 @@ import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
/*
This class extends from TransportMasterNodeAction for cluster state observing purposes.
@ -131,11 +132,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, Job job,
ClusterState clusterState,
int dynamicMaxOpenJobs,
int maxConcurrentJobAllocations,
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
boolean isMemoryTrackerRecentlyRefreshed,
Logger logger) {
// TODO: remove in 8.0.0
boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_1_0);
// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
@ -223,16 +227,19 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
Map<String, String> nodeAttributes = node.getAttributes();
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
int maxNumberOfOpenJobs;
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
// TODO: remove this in 8.0.0
if (allNodesHaveDynamicMaxWorkers == false) {
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
}
}
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (availableCount == 0) {
@ -538,6 +545,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile int maxOpenJobs;
private volatile ClusterState clusterState;
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
@ -550,12 +558,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.clusterService = clusterService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
clusterService.addListener(event -> clusterState = event.state());
}
@ -596,6 +606,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
params.getJob(),
clusterState,
maxOpenJobs,
maxConcurrentJobAllocations,
maxMachineMemoryPercent,
memoryTracker,
@ -672,22 +683,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}
void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_MACHINE_MEMORY_PERCENT.getKey(),
this.maxMachineMemoryPercent, maxMachineMemoryPercent);
this.maxMachineMemoryPercent = maxMachineMemoryPercent;
}
void setMaxLazyMLNodes(int maxLazyMLNodes) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(),
this.maxLazyMLNodes, maxLazyMLNodes);
this.maxLazyMLNodes = maxLazyMLNodes;
}
void setMaxOpenJobs(int maxOpenJobs) {
this.maxOpenJobs = maxOpenJobs;
}
}
public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {

View File

@ -100,17 +100,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class AutodetectProcessManager implements ClusterStateListener {
// We should be able from the job config to estimate the memory/cpu a job needs to have,
// and if we know that then we can prior to assigning a job to a node fail based on the
// available resources on that node: https://github.com/elastic/x-pack-elasticsearch/issues/546
// However, it is useful to also be able to apply a hard limit.
// WARNING: This setting cannot be made DYNAMIC, because it is tied to several threadpools
// and a threadpool's size can't be changed at runtime.
// See MachineLearning#getExecutorBuilders(...)
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, 512, Property.NodeScope);
// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope);
@ -134,7 +123,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
// a map that manages the allocation of temporary space to jobs
private final ConcurrentMap<String, Path> nativeTmpStorage = new ConcurrentHashMap<>();
private final int maxAllowedRunningJobs;
private volatile int maxAllowedRunningJobs;
private final NamedXContentRegistry xContentRegistry;
@ -151,7 +140,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
this.maxAllowedRunningJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxAllowedRunningJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.autodetectProcessFactory = autodetectProcessFactory;
this.normalizerFactory = normalizerFactory;
this.jobManager = jobManager;
@ -161,6 +150,12 @@ public class AutodetectProcessManager implements ClusterStateListener {
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
clusterService.addListener(this);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxAllowedRunningJobs);
}
void setMaxAllowedRunningJobs(int maxAllowedRunningJobs) {
this.maxAllowedRunningJobs = maxAllowedRunningJobs;
}
public void onNodeStartup() {
@ -522,11 +517,14 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodetectParams, BiConsumer<Exception, Boolean> handler) {
// Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME
// Copy for consistency within a single method call
int localMaxAllowedRunningJobs = maxAllowedRunningJobs;
// Closing jobs can still be using some or all threads in MachineLearning.JOB_COMMS_THREAD_POOL_NAME
// that an open job uses, so include them too when considering if enough threads are available.
int currentRunningJobs = processByAllocation.size();
if (currentRunningJobs > maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
// TODO: in future this will also need to consider jobs that are not anomaly detector jobs
if (currentRunningJobs > localMaxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + localMaxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
}
@ -547,7 +545,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory);

View File

@ -19,6 +19,18 @@ import static org.mockito.Mockito.when;
public class MachineLearningTests extends ESTestCase {
public void testMaxOpenWorkersSetting_givenDefault() {
int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
assertEquals(20, maxOpenWorkers);
}
public void testMaxOpenWorkersSetting_givenSetting() {
Settings.Builder settings = Settings.builder();
settings.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), 7);
int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(7, maxOpenWorkers);
}
public void testNoAttributes_givenNoClash() {
Settings.Builder builder = Settings.builder();
if (randomBoolean()) {

View File

@ -75,6 +75,7 @@ import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file
public class TransportOpenJobActionTests extends ESTestCase {
private MlMemoryTracker memoryTracker;
@ -142,12 +143,11 @@ public class TransportOpenJobActionTests extends ESTestCase {
jobBuilder.setJobVersion(Version.CURRENT);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("", result.getExplanation());
assertEquals("_node_id3", result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_maxCapacity() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
@ -178,11 +178,11 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), maxRunningJobsPerNode, 2,
30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
assertTrue(result.getExplanation(), result.getExplanation().contains("because this node is full. Number of opened jobs ["
+ maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
}
public void testSelectLeastLoadedMlNode_noMlNodes() {
@ -205,7 +205,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 20, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
assertNull(result.getExecutorNode());
@ -241,7 +241,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
ClusterState cs = csBuilder.build();
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("_node_id3", result.getExecutorNode());
@ -252,8 +252,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -264,8 +264,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
@ -276,8 +276,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertNull("no node selected, because null state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
@ -316,7 +316,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
// Allocation won't be possible if the stale failed job is treated as opening
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("_node_id1", result.getExecutorNode());
@ -327,8 +327,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
@ -360,7 +360,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 10, 2, 30,
memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
assertNull(result.getExecutorNode());
@ -391,7 +391,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(),
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), 10,
2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString(
"because the job's model snapshot requires a node of version [6.3.0] or higher"));
@ -420,7 +420,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.metaData(metaData);
Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString(
"because jobs using custom_rules require a node of version [6.4.0] or higher"));
@ -449,7 +449,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
cs.metaData(metaData);
Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertNotNull(result.getExecutorNode());
}
@ -539,7 +539,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES)
MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
@ -556,7 +556,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES)
MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
@ -23,7 +25,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
public class TooManyJobsIT extends BaseMlIntegTestCase {
@ -66,10 +67,10 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
int maxNumberOfJobsPerNode = 1;
int maxNumberOfLazyNodes = 2;
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
logger.info("[{}] is [{}]", MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
}
logger.info("Started [{}] nodes", numNodes);
ensureStableCluster(numNodes);
@ -111,7 +112,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
// Add another Node so we can get allocated
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
ensureStableCluster(numNodes+1);
// We should automatically get allocated and opened to new node
@ -124,15 +125,15 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
}
public void testSingleNode() throws Exception {
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 20));
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 20), randomBoolean());
}
public void testMultipleNodes() throws Exception {
verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 20));
verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 20), randomBoolean());
}
private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
startMlCluster(numNodes, maxNumberOfJobsPerNode);
private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode, boolean testDynamicChange) throws Exception {
startMlCluster(numNodes, testDynamicChange ? 1 : maxNumberOfJobsPerNode);
long maxMlMemoryPerNode = calculateMaxMlMemory();
ByteSizeValue jobModelMemoryLimit = new ByteSizeValue(2, ByteSizeUnit.MB);
long memoryFootprintPerJob = jobModelMemoryLimit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes();
@ -140,6 +141,11 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode;
boolean expectMemoryLimitBeforeCountLimit = maxJobsPerNodeDueToMemoryLimit < maxNumberOfJobsPerNode;
for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) {
if (i == 2 && testDynamicChange) {
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder().put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode).build());
client().execute(ClusterUpdateSettingsAction.INSTANCE, clusterUpdateSettingsRequest).actionGet();
}
Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i), jobModelMemoryLimit);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
@ -192,13 +198,13 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs");
}
private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception {
private void startMlCluster(int numNodes, int maxNumberOfWorkersPerNode) throws Exception {
// clear all nodes, so that we can set xpack.ml.max_open_jobs setting:
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
logger.info("[{}] is [{}]", MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfWorkersPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfWorkersPerNode));
}
logger.info("Started [{}] nodes", numNodes);
ensureStableCluster(numNodes);

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -42,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
@ -144,6 +146,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
normalizerFactory = mock(NormalizerFactory.class);
auditor = mock(Auditor.class);
clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings =
new ClusterSettings(Settings.EMPTY, Collections.singleton(MachineLearning.MAX_OPEN_JOBS_PER_NODE));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
MetaData metaData = mock(MetaData.class);
SortedMap<String, AliasOrIndex> aliasOrIndexSortedMap = new TreeMap<>();
aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class));
@ -170,23 +175,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}
@After
public void stopThreadPool() throws InterruptedException {
public void stopThreadPool() {
terminate(threadPool);
}
public void testMaxOpenJobsSetting_givenDefault() {
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
assertEquals(20, maxOpenJobs);
}
public void testMaxOpenJobsSetting_givenNewSettingOnly() {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7);
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build());
assertEquals(7, maxOpenJobs);
}
public void testOpenJob() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
@ -207,7 +199,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L, null)), any());
}
public void testOpenJob_withoutVersion() {
Client client = mock(Client.class);
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
@ -259,7 +250,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessFactory autodetectProcessFactory =
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
settings.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService));
@ -571,7 +562,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(communicator).killProcess(false, false, true);
}
public void testKillingAMissingJobFinishesTheTask() throws IOException {
public void testKillingAMissingJobFinishesTheTask() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);