diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index bd14eadaf9e..40276b5609c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -486,7 +486,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable> tasks = new HashMap<>(); - tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, - new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", ""))); - - MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(0L, tasks)).build(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", null, tasksBuilder); + MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); ClusterState state = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) .nodes(DiscoveryNodes.builder().add(node)) @@ -79,12 +71,9 @@ public class MlAssignmentNotifierTests extends ESTestCase { new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) .build(); - Map> tasks = new HashMap<>(); - tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, - new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes"))); - - MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(0L, tasks)).build(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", null, null, tasksBuilder); + MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); ClusterState state = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) .build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 55b9cc1d451..e7fb9ff719b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; @@ -27,13 +26,12 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Collections; import java.util.Date; -import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; @@ -150,10 +148,11 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - PersistentTask task = createJobTask("1", null, JobState.CLOSED, 0L); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("1", null, JobState.CLOSED, tasksBuilder); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-1", task)))); + () -> builder2.deleteJob("1", tasksBuilder.build())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -271,11 +270,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { builder.putDatafeed(datafeedConfig1); MlMetadata beforeMetadata = builder.build(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); - PersistentTask taskInProgress = - new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasksInProgress = - new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); + PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setScrollSize(5000); @@ -333,11 +331,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); - PersistentTask taskInProgress = - new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasksInProgress = - new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); + PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build(); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index 07e0d50fb8d..bb41e26e382 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -19,15 +19,13 @@ import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCase { @@ -58,14 +56,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"))); - Map> tasks = new HashMap<>(); - PersistentTask jobTask = createJobTask("job_id", null, JobState.OPENED, 1L); - tasks.put("job-job_id", jobTask); - tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STARTED, 2L)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", null, JobState.OPENED, tasksBuilder); + addTask("datafeed_id", 0L, null, DatafeedState.STARTED, tasksBuilder); ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(1L, tasks))).build(); + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build(); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, @@ -73,15 +69,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa assertEquals(RestStatus.CONFLICT, e.status()); assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); - tasks = new HashMap<>(); - tasks.put("job-job_id", jobTask); + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", null, JobState.OPENED, tasksBuilder); if (randomBoolean()) { - tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STOPPED, 3L)); + addTask("datafeed_id", 0L, null, DatafeedState.STOPPED, tasksBuilder); } ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(3L, tasks))).build(); + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build(); CloseJobAction.validateAndReturnJobTask("job_id", cs2); } @@ -102,38 +97,25 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3", Collections.singletonList("*"))); - Map> tasks = new HashMap<>(); - PersistentTask jobTask = createJobTask("job_id_1", null, JobState.OPENED, 1L); - tasks.put("job-job_id_1", jobTask); - - jobTask = createJobTask("job_id_2", null, JobState.CLOSED, 2L); - tasks.put("job-job_id_2", jobTask); - - jobTask = createJobTask("job_id_3", null, JobState.FAILED, 3L); - tasks.put("job-job_id_3", jobTask); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder); + addJobTask("job_id_2", null, JobState.CLOSED, tasksBuilder); + addJobTask("job_id_3", null, JobState.FAILED, tasksBuilder); ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(1L, tasks))) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) .build(); assertEquals(Arrays.asList("job_id_1", "job_id_3"), CloseJobAction.resolveAndValidateJobId("_all", cs1)); } - public static PersistentTask createTask(String datafeedId, - long startTime, - String nodeId, - DatafeedState state, - long allocationId) { - PersistentTask task = - new PersistentTask<>(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME, - new StartDatafeedAction.Request(datafeedId, startTime), - allocationId, - new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); - task = new PersistentTask<>(task, state); - return task; + public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state, + PersistentTasksCustomMetaData.Builder tasks) { + tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME, + new StartDatafeedAction.Request(datafeedId, startTime), new Assignment(nodeId, "test assignment")); + tasks.updateTaskStatus(MlMetadata.datafeedTaskId(datafeedId), state); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 236db2796f9..6f528cbcbec 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -20,12 +20,10 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; @@ -36,7 +34,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.net.InetAddress; import java.util.ArrayList; @@ -55,9 +52,9 @@ public class OpenJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTask task = - createJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); OpenJobAction.validate("job_id", mlBuilder.build(), tasks); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap())); @@ -92,14 +89,11 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); - taskMap.put("0L", new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 0L, - new Assignment("_node_id1", "test assignment"))); - taskMap.put("1L", new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), 1L, - new Assignment("_node_id1", "test assignment"))); - taskMap.put("2L", new PersistentTask<>("2L", OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), 2L, - new Assignment("_node_id2", "test assignment"))); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", null, tasksBuilder); + addJobTask("job_id2", "_node_id1", null, tasksBuilder); + addJobTask("job_id3", "_node_id2", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); @@ -120,19 +114,17 @@ public class OpenJobActionTests extends ESTestCase { Map nodeAttr = new HashMap<>(); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); - Map> taskMap = new HashMap<>(); - long allocationId = 0; + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numNodes; i++) { String nodeId = "_node_id" + i; TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); for (int j = 0; j < maxRunningJobsPerNode; j++) { long id = j + (maxRunningJobsPerNode * i); - String taskId = UUIDs.base64UUID(); - taskMap.put(taskId, createJobTask("job_id" + id, nodeId, JobState.OPENED, allocationId++)); + addJobTask("job_id" + id, nodeId, JobState.OPENED, tasksBuilder); } } - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); @@ -156,10 +148,9 @@ public class OpenJobActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = - new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 1L, - new Assignment("_node_id1", "test assignment")); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); @@ -186,13 +177,13 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); - taskMap.put("0L", createJobTask("job_id1", "_node_id1", null, 0L)); - taskMap.put("1L", createJobTask("job_id2", "_node_id1", null, 1L)); - taskMap.put("2L", createJobTask("job_id3", "_node_id2", null, 2L)); - taskMap.put("3L", createJobTask("job_id4", "_node_id2", null, 3L)); - taskMap.put("4L", createJobTask("job_id5", "_node_id3", null, 4L)); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", null, tasksBuilder); + addJobTask("job_id2", "_node_id1", null, tasksBuilder); + addJobTask("job_id3", "_node_id2", null, tasksBuilder); + addJobTask("job_id4", "_node_id2", null, tasksBuilder); + addJobTask("job_id5", "_node_id3", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.nodes(nodes); @@ -207,9 +198,9 @@ public class OpenJobActionTests extends ESTestCase { Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); assertEquals("_node_id3", result.getExecutorNode()); - PersistentTask lastTask = createJobTask("job_id6", "_node_id3", null, 6L); - taskMap.put("5L", lastTask); - tasks = new PersistentTasksCustomMetaData(6L, taskMap); + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + addJobTask("job_id6", "_node_id3", null, tasksBuilder); + tasks = tasksBuilder.build(); csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); @@ -218,8 +209,9 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put("5L", new PersistentTask<>(lastTask, 7L, new Assignment("_node_id3", "test assignment"))); - tasks = new PersistentTasksCustomMetaData(7L, taskMap); + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id6"), new Assignment("_node_id3", "test assignment")); + tasks = tasksBuilder.build(); csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); @@ -228,8 +220,9 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put("5L", new PersistentTask<>(lastTask, (Task.Status) null)); - tasks = new PersistentTasksCustomMetaData(8L, taskMap); + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null); + tasks = tasksBuilder.build(); csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); @@ -274,15 +267,12 @@ public class OpenJobActionTests extends ESTestCase { assertEquals(indexToRemove, result.get(0)); } - public static PersistentTask createJobTask(String jobId, String nodeId, JobState jobState, - long allocationId) { - PersistentTask task = - new PersistentTask<>("job-" + jobId, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId, - new Assignment(nodeId, "test assignment")); + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { + builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.NAME, new OpenJobAction.Request(jobId), + new Assignment(nodeId, "test assignment")); if (jobState != null) { - task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId)); + builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId())); } - return task; } private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 17e862b11c9..b5cea6563ee 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.net.InetAddress; import java.util.ArrayList; @@ -45,7 +44,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; -import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; @@ -66,9 +65,10 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); - PersistentTask task = createJobTask(job.getId(), "node_id", jobState, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-job_id", task)); + addJobTask(job.getId(), "node_id", jobState, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -88,8 +88,9 @@ public class StartDatafeedActionTests extends ESTestCase { assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); - task = createJobTask(job.getId(), "node_id", JobState.OPENED, 1L); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -121,8 +122,9 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); List> states = new ArrayList<>(2); states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); @@ -161,8 +163,9 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); List> states = new ArrayList<>(2); states.add(new Tuple<>(0, ShardRoutingState.STARTED)); @@ -200,8 +203,9 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() @@ -231,8 +235,9 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); String nodeId = randomBoolean() ? "node_id2" : null; - PersistentTask task = createJobTask(job.getId(), nodeId, JobState.OPENED, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -252,8 +257,9 @@ public class StartDatafeedActionTests extends ESTestCase { assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", result.getExplanation()); - task = createJobTask(job.getId(), "node_id1", JobState.OPENED, 0L); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); + tasks = tasksBuilder.build(); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -280,9 +286,9 @@ public class StartDatafeedActionTests extends ESTestCase { MlMetadata mlMetadata1 = new MlMetadata.Builder() .putJob(job1, false) .build(); - PersistentTask task = - new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 9375327d823..14f050da780 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -17,15 +17,12 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; -import java.util.Map; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; @@ -54,10 +51,11 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testValidate() { - PersistentTask task = new PersistentTask("datafeed-foo", StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); - task = new PersistentTask<>(task, DatafeedState.STARTED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("datafeed-foo", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo"), StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), new Assignment("node_id", "")); + tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("foo"), DatafeedState.STARTED); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); Job job = createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); @@ -75,9 +73,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe public void testValidate_alreadyStopped() { PersistentTasksCustomMetaData tasks; if (randomBoolean()) { - PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo2"), StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo2", 0L), new Assignment("node_id", "")); + tasks = tasksBuilder.build(); } else { tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap()); } @@ -94,34 +93,30 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testResolveAll() { - Map> taskMap = new HashMap<>(); Builder mlMetadataBuilder = new MlMetadata.Builder(); - - PersistentTask task = new PersistentTask("datafeed-datafeed_1", StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); - task = new PersistentTask<>(task, DatafeedState.STARTED); - taskMap.put("datafeed-datafeed_1", task); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_1"), StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", "")); + tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_1"), DatafeedState.STARTED); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask("datafeed-datafeed_2", StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); - task = new PersistentTask<>(task, DatafeedState.STOPPED); - taskMap.put("datafeed-datafeed_2", task); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_2"), StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", "")); + tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_2"), DatafeedState.STOPPED); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask("3L", StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); - task = new PersistentTask<>(task, DatafeedState.STARTED); - taskMap.put("datafeed-datafeed_3", task); + tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_3"), StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_3", 0L), new Assignment("node_id", "")); + tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_3"), DatafeedState.STARTED); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index aad62287025..ab52df354c5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.FlushJobAction; -import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask; @@ -63,7 +62,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; -import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; +import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; @@ -98,8 +97,9 @@ public class DatafeedManagerTests extends ESTestCase { Job job = createDatafeedJob().build(new Date()); mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); - PersistentTask task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index 8ef4e23c93c..67de0f985e5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -74,7 +74,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ @Override protected Custom makeTestChanges(Custom testInstance) { - Builder builder = new Builder((PersistentTasksCustomMetaData) testInstance); + Builder builder = PersistentTasksCustomMetaData.builder((PersistentTasksCustomMetaData) testInstance); switch (randomInt(3)) { case 0: addRandomTask(builder); @@ -196,9 +196,9 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ for (int i = 0; i < randomIntBetween(10, 100); i++) { final Builder builder; if (randomBoolean()) { - builder = new Builder(); + builder = PersistentTasksCustomMetaData.builder(); } else { - builder = new Builder(persistentTasks); + builder = PersistentTasksCustomMetaData.builder(persistentTasks); } boolean changed = false; for (int j = 0; j < randomIntBetween(1, 10); j++) {