[ML] Use datafeed and job id as task id.

Now that task id are strings instead of longs (elastic/x-pack-elasticsearch#1035), ml can use the job and datafeed as task id.
This removes logic that would otherwise iterate over all tasks and check if the task's request id was equal to the provided id and instead just do lookup in the task map.
Job and datafeed task ids are prefixed with either 'job-' or 'datafeed-', because job and datafeed ids don't have to be unique as they are stored separately from each other.

Original commit: elastic/x-pack-elasticsearch@b48c2b368a
This commit is contained in:
Martijn van Groningen 2017-04-11 20:31:04 +02:00
parent f8f7454130
commit 9d683dfe13
11 changed files with 90 additions and 132 deletions

View File

@ -22,8 +22,6 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
@ -45,7 +43,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
public class MlMetadata implements MetaData.Custom { public class MlMetadata implements MetaData.Custom {
@ -324,11 +321,7 @@ public class MlMetadata implements MetaData.Custom {
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) { private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) {
if (persistentTasks != null) { if (persistentTasks != null) {
Predicate<PersistentTask<?>> predicate = t -> { if (persistentTasks.getTask(datafeedTaskId(datafeedId)) != null) {
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
if (persistentTasks.tasksExist(StartDatafeedAction.NAME, predicate)) {
throw ExceptionsHelper.conflictStatusException(msg.get()); throw ExceptionsHelper.conflictStatusException(msg.get());
} }
} }
@ -377,32 +370,36 @@ public class MlMetadata implements MetaData.Custom {
} }
/**
* Namespaces the task ids for jobs.
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
}
@Nullable @Nullable
public static PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { public static PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
if (tasks != null) { if (tasks == null) {
Predicate<PersistentTask<?>> p = t -> { return null;
OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest();
return storedRequest.getJobId().equals(jobId);
};
for (PersistentTask<?> task : tasks.findTasks(OpenJobAction.NAME, p)) {
return task;
}
} }
return null; return tasks.getTask(jobTaskId(jobId));
}
/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
} }
@Nullable @Nullable
public static PersistentTask<?> getDatafeedTask(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { public static PersistentTask<?> getDatafeedTask(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
if (tasks != null) { if (tasks == null) {
Predicate<PersistentTask<?>> p = t -> { return null;
StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest();
return storedRequest.getDatafeedId().equals(datafeedId);
};
for (PersistentTask<?> task : tasks.findTasks(StartDatafeedAction.NAME, p)) {
return task;
}
} }
return null; return tasks.getTask(datafeedTaskId(datafeedId));
} }
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
@ -23,7 +24,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -315,10 +315,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot open job [" + request.getJobId() +
"] because it has already been opened", RestStatus.CONFLICT, e);
}
listener.onFailure(e); listener.onFailure(e);
} }
}; };
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), NAME, request, finalListener); persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(request.jobId), NAME, request, finalListener);
} else { } else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
} }
@ -447,10 +451,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
if (job.isDeleted()) { if (job.isDeleted()) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted"); throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted");
} }
PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
if (task != null) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has already been opened");
}
} }
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations, static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
@ -24,7 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -38,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -362,10 +364,14 @@ public class StartDatafeedAction
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("cannot start datafeed [" + request.getDatafeedId() +
"] because it has already been started", RestStatus.CONFLICT, e);
}
listener.onFailure(e); listener.onFailure(e);
} }
}; };
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), NAME, request, finalListener); persistentTasksService.startPersistentTask(MlMetadata.datafeedTaskId(request.datafeedId), NAME, request, finalListener);
} else { } else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
} }
@ -465,11 +471,6 @@ public class StartDatafeedAction
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() + throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +
"] is not open"); "] is not open");
} }
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because it has already been started");
}
} }
static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState, static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState,

View File

@ -150,10 +150,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue()); assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", "1", null, JobState.CLOSED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask("1", null, JobState.CLOSED, 0L);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result); MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)))); () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-1", task))));
assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.status(), equalTo(RestStatus.CONFLICT));
} }
@ -273,7 +273,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
@ -335,7 +335,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));

View File

@ -59,9 +59,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id",
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<String, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask("1L", "job_id", null, JobState.OPENED, 1L); PersistentTask<?> jobTask = createJobTask("job_id", null, JobState.OPENED, 1L);
tasks.put("1L", jobTask); tasks.put("job-job_id", jobTask);
tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STARTED, 2L)); tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STARTED, 2L));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE,
@ -74,9 +74,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>(); tasks = new HashMap<>();
tasks.put("1L", jobTask); tasks.put("job-job_id", jobTask);
if (randomBoolean()) { if (randomBoolean()) {
tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STOPPED, 3L)); tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STOPPED, 3L));
} }
ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
@ -103,14 +103,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<String, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask("1L", "job_id_1", null, JobState.OPENED, 1L); PersistentTask<?> jobTask = createJobTask("job_id_1", null, JobState.OPENED, 1L);
tasks.put("1L", jobTask); tasks.put("job-job_id_1", jobTask);
jobTask = createJobTask("2L", "job_id_2", null, JobState.CLOSED, 2L); jobTask = createJobTask("job_id_2", null, JobState.CLOSED, 2L);
tasks.put("2L", jobTask); tasks.put("job-job_id_2", jobTask);
jobTask = createJobTask("3L", "job_id_3", null, JobState.FAILED, 3L); jobTask = createJobTask("job_id_3", null, JobState.FAILED, 3L);
tasks.put("3L", jobTask); tasks.put("job-job_id_3", jobTask);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
@ -122,14 +122,13 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
CloseJobAction.resolveAndValidateJobId("_all", cs1)); CloseJobAction.resolveAndValidateJobId("_all", cs1));
} }
public static PersistentTask<StartDatafeedAction.Request> createTask(String id, public static PersistentTask<StartDatafeedAction.Request> createTask(String datafeedId,
String datafeedId,
long startTime, long startTime,
String nodeId, String nodeId,
DatafeedState state, DatafeedState state,
long allocationId) { long allocationId) {
PersistentTask<StartDatafeedAction.Request> task = PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new PersistentTask<>(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME,
new StartDatafeedAction.Request(datafeedId, startTime), new StartDatafeedAction.Request(datafeedId, startTime),
allocationId, allocationId,
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));

View File

@ -56,7 +56,7 @@ public class OpenJobActionTests extends ESTestCase {
mlBuilder.putJob(buildJobBuilder("job_id").build(), false); mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
createJobTask("1L", "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L); createJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks); OpenJobAction.validate("job_id", mlBuilder.build(), tasks);
@ -80,18 +80,6 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage());
} }
public void testValidate_unexpectedState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = createJobTask("1L", "job_id", "_node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1));
assertEquals("Cannot open job [job_id] because it has already been opened", e.getMessage());
}
public void testSelectLeastLoadedMlNode() { public void testSelectLeastLoadedMlNode() {
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10"); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), "10");
@ -141,7 +129,7 @@ public class OpenJobActionTests extends ESTestCase {
for (int j = 0; j < maxRunningJobsPerNode; j++) { for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i); long id = j + (maxRunningJobsPerNode * i);
String taskId = UUIDs.base64UUID(); String taskId = UUIDs.base64UUID();
taskMap.put(taskId, createJobTask(taskId, "job_id" + id, nodeId, JobState.OPENED, allocationId++)); taskMap.put(taskId, createJobTask("job_id" + id, nodeId, JobState.OPENED, allocationId++));
} }
} }
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap);
@ -199,11 +187,11 @@ public class OpenJobActionTests extends ESTestCase {
.build(); .build();
Map<String, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put("0L", createJobTask("0L", "job_id1", "_node_id1", null, 0L)); taskMap.put("0L", createJobTask("job_id1", "_node_id1", null, 0L));
taskMap.put("1L", createJobTask("1L", "job_id2", "_node_id1", null, 1L)); taskMap.put("1L", createJobTask("job_id2", "_node_id1", null, 1L));
taskMap.put("2L", createJobTask("2L", "job_id3", "_node_id2", null, 2L)); taskMap.put("2L", createJobTask("job_id3", "_node_id2", null, 2L));
taskMap.put("3L", createJobTask("3L", "job_id4", "_node_id2", null, 3L)); taskMap.put("3L", createJobTask("job_id4", "_node_id2", null, 3L));
taskMap.put("4L", createJobTask("4L", "job_id5", "_node_id3", null, 4L)); taskMap.put("4L", createJobTask("job_id5", "_node_id3", null, 4L));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
@ -219,7 +207,7 @@ public class OpenJobActionTests extends ESTestCase {
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode()); assertEquals("_node_id3", result.getExecutorNode());
PersistentTask<OpenJobAction.Request> lastTask = createJobTask("5L", "job_id6", "_node_id3", null, 6L); PersistentTask<OpenJobAction.Request> lastTask = createJobTask("job_id6", "_node_id3", null, 6L);
taskMap.put("5L", lastTask); taskMap.put("5L", lastTask);
tasks = new PersistentTasksCustomMetaData(6L, taskMap); tasks = new PersistentTasksCustomMetaData(6L, taskMap);
@ -286,10 +274,10 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals(indexToRemove, result.get(0)); assertEquals(indexToRemove, result.get(0));
} }
public static PersistentTask<OpenJobAction.Request> createJobTask(String id, String jobId, String nodeId, JobState jobState, public static PersistentTask<OpenJobAction.Request> createJobTask(String jobId, String nodeId, JobState jobState,
long allocationId) { long allocationId) {
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId, new PersistentTask<>("job-" + jobId, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId,
new Assignment(nodeId, "test assignment")); new Assignment(nodeId, "test assignment"));
if (jobState != null) { if (jobState != null) {
task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId)); task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId));

View File

@ -33,7 +33,6 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
@ -44,9 +43,7 @@ import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
@ -70,8 +67,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", jobState, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", jobState, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-job_id", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -91,8 +88,8 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
"] while state [opened] is required", result.getExplanation()); "] while state [opened] is required", result.getExplanation());
task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 1L); task = createJobTask(job.getId(), "node_id", JobState.OPENED, 1L);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -124,8 +121,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
@ -164,8 +161,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.STARTED)); states.add(new Tuple<>(0, ShardRoutingState.STARTED));
@ -203,8 +200,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
@ -234,8 +231,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
String nodeId = randomBoolean() ? "node_id2" : null; String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), nodeId, JobState.OPENED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), nodeId, JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -255,8 +252,8 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
result.getExplanation()); result.getExplanation());
task = createJobTask("0L", job.getId(), "node_id1", JobState.OPENED, 0L); task = createJobTask(job.getId(), "node_id1", JobState.OPENED, 0L);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task));
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -295,29 +292,6 @@ public class StartDatafeedActionTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is not open")); assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is not open"));
} }
public void testValidate_dataFeedAlreadyStarted() {
Job job1 = createScheduledJob("job_id").build(new Date());
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
PersistentTask<OpenJobAction.Request> jobTask = createJobTask("0L", "job_id", "node_id", JobState.OPENED, 0L);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>("1L", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
1L, new Assignment("node_id", "test assignment"));
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<String, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put("0L", jobTask);
taskMap.put("1L", datafeedTask);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks));
assertThat(e.getMessage(), equalTo("cannot start datafeed [datafeed_id] because it has already been started"));
}
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId, TaskId parentTaskId,
StartDatafeedAction.Request request, StartDatafeedAction.Request request,

View File

@ -54,10 +54,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
} }
public void testValidate() { public void testValidate() {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME, PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("datafeed-foo", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("datafeed-foo", task));
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
@ -97,26 +97,26 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
Map<String, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
Builder mlMetadataBuilder = new MlMetadata.Builder(); Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME, PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_1", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put("1L", task); taskMap.put("datafeed-datafeed_1", task);
Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>("2L", StartDatafeedAction.NAME, task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_2", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED); task = new PersistentTask<>(task, DatafeedState.STOPPED);
taskMap.put("2L", task); taskMap.put("datafeed-datafeed_2", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>("3L", StartDatafeedAction.NAME, task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_3", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put("3L", task); taskMap.put("datafeed-datafeed_3", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);

View File

@ -98,7 +98,7 @@ public class DatafeedManagerTests extends ESTestCase {
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
mlMetadata.putJob(job, false); mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),

View File

@ -169,7 +169,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertBusy(() -> { assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTask task = tasks.taskMap().values().iterator().next(); PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("job_id"));
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>(); Map<String, String> expectedNodeAttr = new HashMap<>();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.OpenJobAction;
@ -54,10 +55,8 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(1, tasks.taskMap().size()); assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened: // now just double check that the first job is still opened:
PersistentTasksCustomMetaData.PersistentTask task = tasks.taskMap().values().iterator().next(); PersistentTasksCustomMetaData.PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("1"));
assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState()); assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());
} }
public void testSingleNode() throws Exception { public void testSingleNode() throws Exception {