[ML] Add a setting (`xpack.ml.node_concurrent_job_allocations`) to control the maximum concurrent job allocations.

A job allocation is either a job task in OPENING state or a job task that has been assigned to an executor node, but the executor node hasn't had the opportunity to set the job task status to OPENING.

In order to keep track of restarted tasks, `allocationIdOnLastStatusUpdate` field was added to `PersistentTaskInProgress` class.
This will allow persistent task implementors to detect whether the executor node has changed or has been unset since the last status update has occured.

Original commit: elastic/x-pack-elasticsearch@b7b85a8274
This commit is contained in:
Martijn van Groningen 2017-02-17 14:16:33 +01:00
parent 0d1181eabb
commit 4b1ed5b453
13 changed files with 310 additions and 76 deletions

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Module;
@ -24,7 +23,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.ActionPlugin;
@ -122,6 +120,7 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
@ -129,7 +128,6 @@ import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
@ -157,6 +155,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
public static final String ALLOCATION_ENABLED_ATTR = "xpack.ml.allocation_enabled";
public static final Setting<Boolean> ALLOCATION_ENABLED = Setting.boolSetting("node.attr." + ALLOCATION_ENABLED_ATTR,
XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
private final Settings settings;
private final Environment env;
@ -179,6 +179,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
return Collections.unmodifiableList(
Arrays.asList(USE_NATIVE_PROCESS_OPTION,
ALLOCATION_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING,
ProcessCtrl.MAX_ANOMALY_RECORDS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,

View File

@ -344,11 +344,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
PersistentTaskInProgress<?> task = validateAndFindTask(jobId, currentState);
PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
Map<Long, PersistentTaskInProgress<?>> updatedTasks = new HashMap<>(currentTasks.taskMap());
for (PersistentTaskInProgress<?> taskInProgress : currentTasks.tasks()) {
if (taskInProgress.getId() == task.getId()) {
updatedTasks.put(taskInProgress.getId(), new PersistentTaskInProgress<>(taskInProgress, JobState.CLOSING));
}
}
PersistentTaskInProgress<?> taskToUpdate = currentTasks.getTask(task.getId());
taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING);
updatedTasks.put(taskToUpdate.getId(), taskToUpdate);
PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks);
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);

View File

@ -56,6 +56,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@ -247,6 +249,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
private final AutodetectProcessManager autodetectProcessManager;
private XPackLicenseState licenseState;
private volatile int maxConcurrentJobAllocations;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentActionService persistentActionService, PersistentActionRegistry persistentActionRegistry,
@ -258,6 +262,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
this.clusterService = clusterService;
this.autodetectProcessManager = autodetectProcessManager;
this.observer = new JobStateObserver(threadPool, clusterService);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
}
@Override
@ -266,7 +273,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
ClusterState clusterState = clusterService.state();
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, logger) == null) {
if (selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger) == null) {
throw new ElasticsearchStatusException("no nodes available to open job [" + request.getJobId() + "]",
RestStatus.TOO_MANY_REQUESTS);
}
@ -291,7 +298,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedMlNode(request.getJobId(), clusterState, logger);
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
}
@Override
@ -321,6 +328,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
});
}
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}
}
/**
@ -356,36 +368,69 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, Logger logger) {
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
Logger logger) {
long maxAvailable = Long.MIN_VALUE;
DiscoveryNode leastLoadedNode = null;
List<String> reasons = new LinkedList<>();
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
Map<String, String> nodeAttributes = node.getAttributes();
String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR);
if ("true".equals(allocationEnabled) == false) {
logger.debug("Not opening job [{}] on node [{}], because this node isn't a ml node.", jobId, node);
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node isn't a ml node.";
logger.debug(reason);
reasons.add(reason);
continue;
}
long numberOfOpenedJobs;
long numberOfAssignedJobs;
int numberOfAllocatingJobs;
if (persistentTasksInProgress != null) {
numberOfOpenedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAssignedJobs = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME);
numberOfAllocatingJobs = persistentTasksInProgress.findTasks(OpenJobAction.NAME, task -> {
if (node.getId().equals(task.getExecutorNode()) == false) {
return false;
}
JobState jobTaskState = (JobState) task.getStatus();
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
jobTaskState == JobState.OPENING || // executor node is busy starting the cpp process
task.isCurrentStatus() == false; // previous executor node failed and
// current executor node didn't have the chance to set job status to OPENING
}).size();
} else {
numberOfOpenedJobs = 0;
numberOfAssignedJobs = 0;
numberOfAllocatingJobs = 0;
}
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey()));
long available = maxNumberOfOpenJobs - numberOfOpenedJobs;
if (available == 0) {
logger.debug("Not opening job [{}] on node [{}], because this node is full. Number of opened jobs [{}], {} [{}]",
jobId, node, numberOfOpenedJobs, MAX_RUNNING_JOBS_PER_NODE.getKey(), maxNumberOfOpenJobs);
if (numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because node exceeds [" + numberOfAllocatingJobs +
"] the maximum number of jobs [" + maxConcurrentJobAllocations + "] in opening state";
logger.debug(reason);
reasons.add(reason);
continue;
}
long maxNumberOfOpenJobs = Long.parseLong(node.getAttributes().get(MAX_RUNNING_JOBS_PER_NODE.getKey()));
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +
"Number of opened jobs [" + numberOfAssignedJobs + "], " + MAX_RUNNING_JOBS_PER_NODE.getKey() +
" [" + maxNumberOfOpenJobs + "]";
logger.debug(reason);
reasons.add(reason);
continue;
}
if (maxAvailable < available) {
maxAvailable = available;
leastLoadedNode = node;
minLoadedNode = node;
}
}
return leastLoadedNode;
if (minLoadedNode != null) {
logger.info("selected node [{}] for job [{}]", minLoadedNode, jobId);
} else {
logger.info("no node selected for job [{}], reasons [{}]", jobId, String.join(",\n", reasons));
}
return minLoadedNode;
}
}

View File

@ -403,11 +403,13 @@ public class MlMetadata implements MetaData.Custom {
public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) {
PersistentTasksInProgress.PersistentTaskInProgress<?> task = getJobTask(jobId, tasks);
if (task != null && task.getStatus() != null) {
return (JobState) task.getStatus();
} else {
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
return JobState.CLOSED;
JobState jobTaskState = (JobState) task.getStatus();
if (jobTaskState != null) {
return jobTaskState;
}
}
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
return JobState.CLOSED;
}
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) {

View File

@ -57,4 +57,5 @@ public class PersistentTask extends CancellableTask {
public void setPersistentTaskId(long persistentTaskId) {
this.persistentTaskId = persistentTaskId;
}
}

View File

@ -93,6 +93,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
taskBuilder.setStatus(builder.status);
}, ACTION_PARSER, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}
/**
@ -194,24 +196,28 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private final Status status;
@Nullable
private final String executorNode;
@Nullable
private final Long allocationIdOnLastStatusUpdate;
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
String executorNode) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode);
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode, null);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, String newExecutorNode) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
newExecutorNode);
newExecutorNode, task.allocationId);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode);
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
task.executorNode, task.allocationId);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status, String executorNode) {
boolean stopped, boolean removeOnCompletion, Status status,
String executorNode, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
@ -220,6 +226,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.executorNode = executorNode;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
}
@ -234,6 +241,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
allocationIdOnLastStatusUpdate = in.readOptionalLong();
}
@Override
@ -246,6 +254,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
}
@Override
@ -260,12 +269,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode);
Objects.equals(executorNode, that.executorNode) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode,
allocationIdOnLastStatusUpdate);
}
@Override
@ -307,6 +318,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return removeOnCompletion;
}
/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
*/
public boolean isCurrentStatus() {
return allocationIdOnLastStatusUpdate == allocationId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -329,6 +348,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.field("executor_node", executorNode);
if (allocationIdOnLastStatusUpdate != null) {
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
@ -352,6 +374,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private boolean removeOnCompletion;
private Status status;
private String executorNode;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) {
this.id = id;
@ -394,8 +417,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return this;
}
public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
executorNode, allocationIdOnLastStatusUpdate);
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.util.Collections;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
public class CloseJobActionTests extends ESTestCase {
@ -26,9 +27,7 @@ public class CloseJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null);
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED));
createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));
@ -52,10 +51,7 @@ public class CloseJobActionTests extends ESTestCase {
public void testMoveJobToClosingState_unexpectedJobState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null);
task = new PersistentTaskInProgress<>(task, JobState.OPENING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", null, JobState.OPENING);
ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))));

View File

@ -41,16 +41,14 @@ public class OpenJobActionTests extends ESTestCase {
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id");
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.CLOSED, JobState.FAILED));
createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id");
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
}
@ -79,19 +77,16 @@ public class OpenJobActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id");
JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING);
task = new PersistentTaskInProgress<>(task, jobState);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", jobState);
PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes));
assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage());
task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id");
jobState = randomFrom(JobState.OPENING, JobState.CLOSING);
task = new PersistentTaskInProgress<>(task, jobState);
task = createJobTask(1L, "job_id", "_other_node_id", jobState);
PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
e = expectThrows(ElasticsearchStatusException.class,
@ -124,7 +119,7 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
}
@ -152,7 +147,7 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
@ -174,8 +169,71 @@ public class OpenJobActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger);
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
assertNull(result);
}
public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ALLOCATION_ENABLED_ATTR, "true");
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
nodeAttr, Collections.emptySet(), Version.CURRENT))
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs.build(), 2, logger);
assertEquals("_node_id3", result.getId());
PersistentTaskInProgress<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because OPENING state", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3"));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because stale task", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null));
tasks = new PersistentTasksInProgress(6L, taskMap);
cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);
cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks));
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs.build(), 2, logger);
assertNull("no node selected, because null state", result);
}
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId);
task = new PersistentTaskInProgress<>(task, jobState);
return task;
}
}

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.hamcrest.Matchers.equalTo;
@ -41,10 +42,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), false, true, "node_id");
task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED,
JobState.CLOSING, JobState.OPENING));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
@ -61,7 +60,7 @@ public class StartDatafeedActionTests extends ESTestCase {
DiscoveryNode node = StartDatafeedAction.selectNode(logger, request, cs.build());
assertNull(node);
task = new PersistentTaskInProgress<>(task, JobState.OPENED);
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -110,10 +109,7 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
false, true, "node_id");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id");
@ -140,10 +136,7 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTaskInProgress<OpenJobAction.Request> jobTask =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"),
false, true, "node_id2");
jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED);
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id1");

View File

@ -22,10 +22,16 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
@ -155,7 +161,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
@ -172,7 +178,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
// job should get and remain in a failed state:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNull(task.getExecutorNode());
// The status remains to be opened as from ml we didn't had the chance to set the status to failed:
@ -186,7 +192,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
// job should be re-opened:
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
PersistentTaskInProgress task = tasks.taskMap().values().iterator().next();
assertNotNull(task.getExecutorNode());
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
@ -199,4 +205,111 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
cleanupWorkaround(3);
}
public void testMaxConcurrentJobAllocations() throws Exception {
int numMlNodes = 2;
internalCluster().ensureAtMostNumDataNodes(0);
// start non ml node, but that will hold the indices
logger.info("Start non ml node:");
String nonMlNode = internalCluster().startNode(Settings.builder()
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
logger.info("Starting ml nodes");
internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false)
.put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build());
ensureStableCluster(numMlNodes + 1);
int maxConcurrentJobAllocations = randomIntBetween(1, 4);
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), maxConcurrentJobAllocations))
.get();
// Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs.
List<String> violations = new CopyOnWriteArrayList<>();
internalCluster().clusterService(nonMlNode).addListener(event -> {
PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE);
if (tasks == null) {
return;
}
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTaskInProgress<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
return node.getId().equals(task.getExecutorNode()) &&
(task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false);
});
int count = foundTasks.size();
if (count > maxConcurrentJobAllocations) {
violations.add("Observed node [" + node.getName() + "] with [" + count + "] opening jobs on cluster state version [" +
event.state().version() + "]");
}
}
});
int numJobs = numMlNodes * 10;
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
}
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
});
logger.info("stopping ml nodes");
for (int i = 0; i < numMlNodes; i++) {
// fork so stopping all ml nodes proceeds quicker:
Runnable r = () -> {
try {
internalCluster()
.stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), false));
} catch (IOException e) {
logger.error("error stopping node", e);
}
};
new Thread(r).start();
}
ensureStableCluster(1, nonMlNode);
assertBusy(() -> {
ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNull(task.getExecutorNode());
}
});
logger.info("re-starting ml nodes");
internalCluster().startNodes(numMlNodes, Settings.builder()
.put("node.data", false)
.put("node.master", false)
.put(MachineLearning.ALLOCATION_ENABLED.getKey(), true).build());
ensureStableCluster(1 + numMlNodes);
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE);
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTaskInProgress<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
assertEquals(JobState.OPENED, task.getStatus());
}
}, 30, TimeUnit.SECONDS);
assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size());
cleanupWorkaround(numMlNodes + 1);
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.io.IOException;
import java.util.Collections;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
@ -146,11 +147,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), false, true, null),
JobState.CLOSED
);
PersistentTaskInProgress<OpenJobAction.Request> task = createJobTask(0L, "1", null, JobState.CLOSED);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task))));

View File

@ -131,7 +131,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.openJob("foo", 1L, false, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
UpdatePersistentTaskStatusAction.Request expectedRequest = new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED);
UpdatePersistentTaskStatusAction.Request expectedRequest =
new UpdatePersistentTaskStatusAction.Request(1L, JobState.OPENED);
verify(client).execute(eq(UpdatePersistentTaskStatusAction.INSTANCE), eq(expectedRequest), any());
}

View File

@ -115,7 +115,7 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
deleteAllDatafeeds(client());
deleteAllJobs(client());
for (int i = 0; i < numNodes; i++) {
internalCluster().stopRandomDataNode();
internalCluster().stopRandomNode(settings -> true);
}
internalCluster().startNode(Settings.builder().put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false));
ensureStableCluster(1);