[TEST] Use task builder instead of creating persistent tasks directly.

Original commit: elastic/x-pack-elasticsearch@f74792b23b
This commit is contained in:
Martijn van Groningen 2017-04-12 16:02:48 +02:00
parent 666e87c29b
commit f72967eb7f
9 changed files with 130 additions and 168 deletions

View File

@ -486,7 +486,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return lastAllocationId; return lastAllocationId;
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("last_allocation_id", lastAllocationId); builder.field("last_allocation_id", lastAllocationId);
@ -511,10 +510,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private long lastAllocationId; private long lastAllocationId;
private boolean changed; private boolean changed;
public Builder() { private Builder() {
} }
public Builder(PersistentTasksCustomMetaData tasksInProgress) { private Builder(PersistentTasksCustomMetaData tasksInProgress) {
if (tasksInProgress != null) { if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks); tasks.putAll(tasksInProgress.tasks);
lastAllocationId = tasksInProgress.lastAllocationId; lastAllocationId = tasksInProgress.lastAllocationId;
@ -523,6 +522,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
} }
public long getLastAllocationId() {
return lastAllocationId;
}
private Builder setLastAllocationId(long currentId) { private Builder setLastAllocationId(long currentId) {
this.lastAllocationId = currentId; this.lastAllocationId = currentId;
return this; return this;

View File

@ -16,18 +16,13 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -50,12 +45,9 @@ public class MlAssignmentNotifierTests extends ESTestCase {
new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build(); .build();
Map<String, PersistentTask<?>> tasks = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasks.put("0L", new PersistentTask<PersistentTaskParams>("0L", OpenJobAction.NAME, addJobTask("job_id", "node_id", null, tasksBuilder);
new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", ""))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build();
ClusterState state = ClusterState.builder(new ClusterName("_name")) ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData) .metaData(metaData)
.nodes(DiscoveryNodes.builder().add(node)) .nodes(DiscoveryNodes.builder().add(node))
@ -79,12 +71,9 @@ public class MlAssignmentNotifierTests extends ESTestCase {
new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build(); .build();
Map<String, PersistentTask<?>> tasks = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasks.put("0L", new PersistentTask<PersistentTaskParams>("0L", OpenJobAction.NAME, addJobTask("job_id", null, null, tasksBuilder);
new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes"))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build();
ClusterState state = ClusterState.builder(new ClusterName("_name")) ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData) .metaData(metaData)
.build(); .build();

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests;
@ -27,13 +26,12 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
@ -150,10 +148,11 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue()); assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTask<OpenJobAction.Request> task = createJobTask("1", null, JobState.CLOSED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("1", null, JobState.CLOSED, tasksBuilder);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result); MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-1", task)))); () -> builder2.deleteJob("1", tasksBuilder.build()));
assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.status(), equalTo(RestStatus.CONFLICT));
} }
@ -271,11 +270,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
builder.putDatafeed(datafeedConfig1); builder.putDatafeed(datafeedConfig1);
MlMetadata beforeMetadata = builder.build(); MlMetadata beforeMetadata = builder.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setScrollSize(5000); update.setScrollSize(5000);
@ -333,11 +331,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed1"), StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT);
new PersistentTask<>("datafeed-datafeed1", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = tasksBuilder.build();
PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
MlMetadata.Builder builder2 = new MlMetadata.Builder(result); MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,

View File

@ -19,15 +19,13 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> { public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
@ -58,14 +56,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id",
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<String, PersistentTask<?>> tasks = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTask<?> jobTask = createJobTask("job_id", null, JobState.OPENED, 1L); addJobTask("job_id", null, JobState.OPENED, tasksBuilder);
tasks.put("job-job_id", jobTask); addTask("datafeed_id", 0L, null, DatafeedState.STARTED, tasksBuilder);
tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STARTED, 2L));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build();
new PersistentTasksCustomMetaData(1L, tasks))).build();
ElasticsearchStatusException e = ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, expectThrows(ElasticsearchStatusException.class,
@ -73,15 +69,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
assertEquals(RestStatus.CONFLICT, e.status()); assertEquals(RestStatus.CONFLICT, e.status());
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>(); tasksBuilder = PersistentTasksCustomMetaData.builder();
tasks.put("job-job_id", jobTask); addJobTask("job_id", null, JobState.OPENED, tasksBuilder);
if (randomBoolean()) { if (randomBoolean()) {
tasks.put("datafeed-datafeed_id", createTask("datafeed_id", 0L, null, DatafeedState.STOPPED, 3L)); addTask("datafeed_id", 0L, null, DatafeedState.STOPPED, tasksBuilder);
} }
ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())).build();
new PersistentTasksCustomMetaData(3L, tasks))).build();
CloseJobAction.validateAndReturnJobTask("job_id", cs2); CloseJobAction.validateAndReturnJobTask("job_id", cs2);
} }
@ -102,38 +97,25 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3", mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3",
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<String, PersistentTask<?>> tasks = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTask<?> jobTask = createJobTask("job_id_1", null, JobState.OPENED, 1L); addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder);
tasks.put("job-job_id_1", jobTask); addJobTask("job_id_2", null, JobState.CLOSED, tasksBuilder);
addJobTask("job_id_3", null, JobState.FAILED, tasksBuilder);
jobTask = createJobTask("job_id_2", null, JobState.CLOSED, 2L);
tasks.put("job-job_id_2", jobTask);
jobTask = createJobTask("job_id_3", null, JobState.FAILED, 3L);
tasks.put("job-job_id_3", jobTask);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
new PersistentTasksCustomMetaData(1L, tasks)))
.build(); .build();
assertEquals(Arrays.asList("job_id_1", "job_id_3"), assertEquals(Arrays.asList("job_id_1", "job_id_3"),
CloseJobAction.resolveAndValidateJobId("_all", cs1)); CloseJobAction.resolveAndValidateJobId("_all", cs1));
} }
public static PersistentTask<StartDatafeedAction.Request> createTask(String datafeedId, public static void addTask(String datafeedId, long startTime, String nodeId, DatafeedState state,
long startTime, PersistentTasksCustomMetaData.Builder tasks) {
String nodeId, tasks.addTask(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME,
DatafeedState state, new StartDatafeedAction.Request(datafeedId, startTime), new Assignment(nodeId, "test assignment"));
long allocationId) { tasks.updateTaskStatus(MlMetadata.datafeedTaskId(datafeedId), state);
PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(MlMetadata.datafeedTaskId(datafeedId), StartDatafeedAction.NAME,
new StartDatafeedAction.Request(datafeedId, startTime),
allocationId,
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, state);
return task;
} }
} }

View File

@ -20,12 +20,10 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
@ -36,7 +34,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -55,9 +52,9 @@ public class OpenJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false); mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
createJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L); addJobTask("job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), tasksBuilder);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
OpenJobAction.validate("job_id", mlBuilder.build(), tasks); OpenJobAction.validate("job_id", mlBuilder.build(), tasks);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap())); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()));
@ -92,14 +89,11 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT)) nodeAttr, Collections.emptySet(), Version.CURRENT))
.build(); .build();
Map<String, PersistentTask<?>> taskMap = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
taskMap.put("0L", new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 0L, addJobTask("job_id1", "_node_id1", null, tasksBuilder);
new Assignment("_node_id1", "test assignment"))); addJobTask("job_id2", "_node_id1", null, tasksBuilder);
taskMap.put("1L", new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), 1L, addJobTask("job_id3", "_node_id2", null, tasksBuilder);
new Assignment("_node_id1", "test assignment"))); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
taskMap.put("2L", new PersistentTask<>("2L", OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), 2L,
new Assignment("_node_id2", "test assignment")));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
@ -120,19 +114,17 @@ public class OpenJobActionTests extends ESTestCase {
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Map<String, PersistentTask<?>> taskMap = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
long allocationId = 0;
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
String nodeId = "_node_id" + i; String nodeId = "_node_id" + i;
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i);
nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT));
for (int j = 0; j < maxRunningJobsPerNode; j++) { for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i); long id = j + (maxRunningJobsPerNode * i);
String taskId = UUIDs.base64UUID(); addJobTask("job_id" + id, nodeId, JobState.OPENED, tasksBuilder);
taskMap.put(taskId, createJobTask("job_id" + id, nodeId, JobState.OPENED, allocationId++));
} }
} }
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
@ -156,10 +148,9 @@ public class OpenJobActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 1L, addJobTask("job_id1", "_node_id1", null, tasksBuilder);
new Assignment("_node_id1", "test assignment")); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
@ -186,13 +177,13 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT)) nodeAttr, Collections.emptySet(), Version.CURRENT))
.build(); .build();
Map<String, PersistentTask<?>> taskMap = new HashMap<>(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
taskMap.put("0L", createJobTask("job_id1", "_node_id1", null, 0L)); addJobTask("job_id1", "_node_id1", null, tasksBuilder);
taskMap.put("1L", createJobTask("job_id2", "_node_id1", null, 1L)); addJobTask("job_id2", "_node_id1", null, tasksBuilder);
taskMap.put("2L", createJobTask("job_id3", "_node_id2", null, 2L)); addJobTask("job_id3", "_node_id2", null, tasksBuilder);
taskMap.put("3L", createJobTask("job_id4", "_node_id2", null, 3L)); addJobTask("job_id4", "_node_id2", null, tasksBuilder);
taskMap.put("4L", createJobTask("job_id5", "_node_id3", null, 4L)); addJobTask("job_id5", "_node_id3", null, tasksBuilder);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.nodes(nodes); csBuilder.nodes(nodes);
@ -207,9 +198,9 @@ public class OpenJobActionTests extends ESTestCase {
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode()); assertEquals("_node_id3", result.getExecutorNode());
PersistentTask<OpenJobAction.Request> lastTask = createJobTask("job_id6", "_node_id3", null, 6L); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
taskMap.put("5L", lastTask); addJobTask("job_id6", "_node_id3", null, tasksBuilder);
tasks = new PersistentTasksCustomMetaData(6L, taskMap); tasks = tasksBuilder.build();
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
@ -218,8 +209,9 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put("5L", new PersistentTask<>(lastTask, 7L, new Assignment("_node_id3", "test assignment"))); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
tasks = new PersistentTasksCustomMetaData(7L, taskMap); tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id6"), new Assignment("_node_id3", "test assignment"));
tasks = tasksBuilder.build();
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
@ -228,8 +220,9 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because stale task", result.getExecutorNode()); assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put("5L", new PersistentTask<>(lastTask, (Task.Status) null)); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
tasks = new PersistentTasksCustomMetaData(8L, taskMap); tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null);
tasks = tasksBuilder.build();
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
@ -274,15 +267,12 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals(indexToRemove, result.get(0)); assertEquals(indexToRemove, result.get(0));
} }
public static PersistentTask<OpenJobAction.Request> createJobTask(String jobId, String nodeId, JobState jobState, public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
long allocationId) { builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.NAME, new OpenJobAction.Request(jobId),
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>("job-" + jobId, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId,
new Assignment(nodeId, "test assignment")); new Assignment(nodeId, "test assignment"));
if (jobState != null) { if (jobState != null) {
task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId)); builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId()));
} }
return task;
} }
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) {

View File

@ -37,7 +37,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,7 +44,7 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
@ -66,9 +65,10 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putJob(job, false); mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", jobState, 0L); addJobTask(job.getId(), "node_id", jobState, tasksBuilder);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("job-job_id", task)); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -88,8 +88,9 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
"] while state [opened] is required", result.getExplanation()); "] while state [opened] is required", result.getExplanation());
task = createJobTask(job.getId(), "node_id", JobState.OPENED, 1L); tasksBuilder = PersistentTasksCustomMetaData.builder();
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -121,8 +122,9 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
@ -161,8 +163,9 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.STARTED)); states.add(new Tuple<>(0, ShardRoutingState.STARTED));
@ -200,8 +203,9 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
@ -231,8 +235,9 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
String nodeId = randomBoolean() ? "node_id2" : null; String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), nodeId, JobState.OPENED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -252,8 +257,9 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
result.getExplanation()); result.getExplanation());
task = createJobTask(job.getId(), "node_id1", JobState.OPENED, 0L); tasksBuilder = PersistentTasksCustomMetaData.builder();
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("job-job_id", task)); addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -280,9 +286,9 @@ public class StartDatafeedActionTests extends ESTestCase {
MlMetadata mlMetadata1 = new MlMetadata.Builder() MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false) .putJob(job1, false)
.build(); .build();
PersistentTask<OpenJobAction.Request> task = PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, INITIAL_ASSIGNMENT); addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1) .putDatafeed(datafeedConfig1)

View File

@ -17,15 +17,12 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
@ -54,10 +51,11 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
} }
public void testValidate() { public void testValidate() {
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("datafeed-foo", StartDatafeedAction.NAME, PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo"), StartDatafeedAction.NAME,
task = new PersistentTask<>(task, DatafeedState.STARTED); new StartDatafeedAction.Request("foo", 0L), new Assignment("node_id", ""));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("datafeed-foo", task)); tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("foo"), DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
@ -75,9 +73,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testValidate_alreadyStopped() { public void testValidate_alreadyStopped() {
PersistentTasksCustomMetaData tasks; PersistentTasksCustomMetaData tasks;
if (randomBoolean()) { if (randomBoolean()) {
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("1L", StartDatafeedAction.NAME, PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo2"), StartDatafeedAction.NAME,
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); new StartDatafeedAction.Request("foo2", 0L), new Assignment("node_id", ""));
tasks = tasksBuilder.build();
} else { } else {
tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap()); tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap());
} }
@ -94,34 +93,30 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
} }
public void testResolveAll() { public void testResolveAll() {
Map<String, PersistentTask<?>> taskMap = new HashMap<>();
Builder mlMetadataBuilder = new MlMetadata.Builder(); Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("datafeed-datafeed_1", StartDatafeedAction.NAME, tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_1"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_1"), DatafeedState.STARTED);
taskMap.put("datafeed-datafeed_1", task);
Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskParams>("datafeed-datafeed_2", StartDatafeedAction.NAME, tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_2"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_1", 0L), new Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED); tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_2"), DatafeedState.STOPPED);
taskMap.put("datafeed-datafeed_2", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskParams>("3L", StartDatafeedAction.NAME, tasksBuilder.addTask(MlMetadata.datafeedTaskId("datafeed_3"), StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_3", 0L), new Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("datafeed_3"), DatafeedState.STARTED);
taskMap.put("datafeed-datafeed_3", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); PersistentTasksCustomMetaData tasks = tasksBuilder.build();
MlMetadata mlMetadata = mlMetadataBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build();
assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks));

View File

@ -27,7 +27,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask; import org.elasticsearch.xpack.ml.action.StartDatafeedAction.DatafeedTask;
@ -63,7 +62,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.addJobTask;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -98,8 +97,9 @@ public class DatafeedManagerTests extends ESTestCase {
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
mlMetadata.putJob(job, false); mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTask<OpenJobAction.Request> task = createJobTask(job.getId(), "node_id", JobState.OPENED, 0L); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))

View File

@ -74,7 +74,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override @Override
protected Custom makeTestChanges(Custom testInstance) { protected Custom makeTestChanges(Custom testInstance) {
Builder builder = new Builder((PersistentTasksCustomMetaData) testInstance); Builder builder = PersistentTasksCustomMetaData.builder((PersistentTasksCustomMetaData) testInstance);
switch (randomInt(3)) { switch (randomInt(3)) {
case 0: case 0:
addRandomTask(builder); addRandomTask(builder);
@ -196,9 +196,9 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
for (int i = 0; i < randomIntBetween(10, 100); i++) { for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder; final Builder builder;
if (randomBoolean()) { if (randomBoolean()) {
builder = new Builder(); builder = PersistentTasksCustomMetaData.builder();
} else { } else {
builder = new Builder(persistentTasks); builder = PersistentTasksCustomMetaData.builder(persistentTasks);
} }
boolean changed = false; boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) { for (int j = 0; j < randomIntBetween(1, 10); j++) {