diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/OpenJobAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/OpenJobAction.java index 6239d223b18..59e475659c1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/OpenJobAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/OpenJobAction.java @@ -206,19 +206,18 @@ public class OpenJobAction extends Action implements ToXConte } public Allocation(StreamInput in) throws IOException { - this.nodeId = in.readString(); + this.nodeId = in.readOptionalString(); this.jobId = in.readString(); this.ignoreDowntime = in.readBoolean(); this.status = JobStatus.fromStream(in); @@ -107,7 +107,7 @@ public class Allocation extends AbstractDiffable implements ToXConte @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(nodeId); + out.writeOptionalString(nodeId); out.writeString(jobId); out.writeBoolean(ignoreDowntime); status.writeTo(out); @@ -118,7 +118,9 @@ public class Allocation extends AbstractDiffable implements ToXConte @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(NODE_ID_FIELD.getPreferredName(), nodeId); + if (nodeId != null) { + builder.field(NODE_ID_FIELD.getPreferredName(), nodeId); + } builder.field(JOB_ID_FIELD.getPreferredName(), jobId); builder.field(IGNORE_DOWNTIME_FIELD.getPreferredName(), ignoreDowntime); builder.field(STATUS.getPreferredName(), status); @@ -209,7 +211,7 @@ public class Allocation extends AbstractDiffable implements ToXConte } break; case OPENING: - if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) { + if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED) == false) { throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.CLOSED + "] or [" + JobStatus.FAILED + "], but got [" + status +"]"); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java index e77bac9c1fa..d8c9ee8039f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadata.java @@ -205,6 +205,18 @@ public class PrelertMetadata implements MetaData.Custom { throw ExceptionsHelper.jobAlreadyExists(job.getId()); } this.jobs.put(job.getId(), job); + + Allocation allocation = allocations.get(job.getId()); + if (allocation == null) { + Allocation.Builder builder = new Allocation.Builder(); + builder.setJobId(job.getId()); + boolean addSchedulderState = job.getSchedulerConfig() != null; + if (addSchedulderState) { + builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); + } + builder.setStatus(JobStatus.CLOSED); + allocations.put(job.getId(), builder.build()); + } return this; } @@ -249,35 +261,6 @@ public class PrelertMetadata implements MetaData.Custom { return new PrelertMetadata(jobs, allocations); } - public Builder createAllocation(String jobId, boolean ignoreDowntime) { - Job job = jobs.get(jobId); - if (job == null) { - throw ExceptionsHelper.missingJobException(jobId); - } - - Allocation allocation = allocations.get(jobId); - Allocation.Builder builder; - if (allocation == null) { - builder = new Allocation.Builder(); - builder.setJobId(jobId); - boolean addSchedulderState = job.getSchedulerConfig() != null; - if (addSchedulderState) { - builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null)); - } - } else { - if (allocation.getStatus() != JobStatus.CLOSED) { - throw ExceptionsHelper.conflictStatusException("[" + jobId + "] expected status [" + JobStatus.CLOSED - + "], but got [" + allocation.getStatus() +"]"); - } - builder = new Allocation.Builder(allocation); - } - - builder.setStatus(JobStatus.OPENING); - builder.setIgnoreDowntime(ignoreDowntime); - allocations.put(jobId, builder.build()); - return this; - } - public Builder assignToNode(String jobId, String nodeId) { Allocation allocation = allocations.get(jobId); if (allocation == null) { @@ -307,6 +290,17 @@ public class PrelertMetadata implements MetaData.Custom { allocations.put(jobId, builder.build()); return this; } + + public Builder setIgnoreDowntime(String jobId) { + Allocation allocation = allocations.get(jobId); + if (allocation == null) { + throw new IllegalStateException("[" + jobId + "] no allocation to ignore downtime"); + } + Allocation.Builder builder = new Allocation.Builder(allocation); + builder.setIgnoreDowntime(true); + allocations.put(jobId, builder.build()); + return this; + } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index d7786ae6fd1..d303f559444 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -94,7 +94,6 @@ public class JobManagerTests extends ESTestCase { allocation.setJobId(job.getId()); allocation.setStatus(JobStatus.OPENING); PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE)); - newMetadata.createAllocation(job.getId(), false); newMetadata.assignToNode(job.getId(), "myNode"); newMetadata.updateAllocation(job.getId(), allocation.build()); @@ -137,7 +136,6 @@ public class JobManagerTests extends ESTestCase { Job job = buildJobBuilder("foo").build(); PrelertMetadata prelertMetadata = new PrelertMetadata.Builder() .putJob(job, false) - .createAllocation("foo", false) .assignToNode("foo", "nodeId") .build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java index 8fb8c22e034..9309d5a9dad 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/AllocationTests.java @@ -17,7 +17,7 @@ public class AllocationTests extends AbstractSerializingTestCase { @Override protected Allocation createTestInstance() { - String nodeId = randomAsciiOfLength(10); + String nodeId = randomBoolean() ? randomAsciiOfLength(10) : null; String jobId = randomAsciiOfLength(10); boolean ignoreDowntime = randomBoolean(); JobStatus jobStatus = randomFrom(JobStatus.values()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java index 6632893d890..a91a33e8c50 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocatorTests.java @@ -56,7 +56,6 @@ public class JobAllocatorTests extends ESTestCase { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE)); pmBuilder.putJob((buildJobBuilder("_job_id").build()), false); - pmBuilder.createAllocation("_job_id", false); cs = ClusterState.builder(cs).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .build(); @@ -73,7 +72,6 @@ public class JobAllocatorTests extends ESTestCase { public void testAssignJobsToNodes() { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -154,7 +152,6 @@ public class JobAllocatorTests extends ESTestCase { // add an allocated job PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_id").build(), false); - pmBuilder.createAllocation("_id", false); pmBuilder.assignToNode("_id", "_node_id"); cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -171,7 +168,6 @@ public class JobAllocatorTests extends ESTestCase { // make job not allocated pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT)) @@ -198,8 +194,6 @@ public class JobAllocatorTests extends ESTestCase { jobBuilder.setDataDescription(dataDescriptionBuilder); pmBuilder.putJob(jobBuilder.build(), false); - pmBuilder.createAllocation("_job_id", false); - ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java index 5a3c8424d03..b7f89ac785e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java @@ -62,7 +62,6 @@ public class JobLifeCycleServiceTests extends ESTestCase { public void testClusterChanged_startJob() { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -75,8 +74,7 @@ public class JobLifeCycleServiceTests extends ESTestCase { pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); - pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null); + pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null); cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -89,7 +87,7 @@ public class JobLifeCycleServiceTests extends ESTestCase { pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); + pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null); pmBuilder.assignToNode("_job_id", "_node_id"); cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) @@ -111,7 +109,6 @@ public class JobLifeCycleServiceTests extends ESTestCase { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() @@ -123,7 +120,7 @@ public class JobLifeCycleServiceTests extends ESTestCase { pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); - pmBuilder.createAllocation("_job_id", false); + pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null); pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null); pmBuilder.updateStatus("_job_id", JobStatus.CLOSING, null); pmBuilder.assignToNode("_job_id", "_node_id"); @@ -143,6 +140,7 @@ public class JobLifeCycleServiceTests extends ESTestCase { PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(); pmBuilder.putJob(buildJobBuilder("_job_id").build(), false); + pmBuilder.removeJob("_job_id"); ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder() .putCustom(PrelertMetadata.TYPE, pmBuilder.build())) .nodes(DiscoveryNodes.builder() diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java index d564b3bac71..605ff54152b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertMetadataTests.java @@ -41,12 +41,11 @@ public class PrelertMetadataTests extends ESTestCase { builder.putJob(job2, false); builder.putJob(job3, false); - builder.createAllocation(job1.getId(), false); - builder.assignToNode(job1.getId(), "node1"); - builder.createAllocation(job2.getId(), false); + builder.updateStatus(job1.getId(), JobStatus.OPENING, null); builder.assignToNode(job2.getId(), "node1"); - builder.createAllocation(job3.getId(), false); + builder.updateStatus(job2.getId(), JobStatus.OPENING, null); builder.assignToNode(job3.getId(), "node1"); + builder.updateStatus(job3.getId(), JobStatus.OPENING, null); PrelertMetadata expected = builder.build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -68,11 +67,11 @@ public class PrelertMetadataTests extends ESTestCase { builder.putJob(job2, false); builder.putJob(job3, false); - builder.createAllocation(job1.getId(), false); + builder.updateStatus(job1.getId(), JobStatus.OPENING, null); builder.assignToNode(job1.getId(), "node1"); - builder.createAllocation(job2.getId(), false); + builder.updateStatus(job2.getId(), JobStatus.OPENING, null); builder.assignToNode(job2.getId(), "node1"); - builder.createAllocation(job3.getId(), false); + builder.updateStatus(job3.getId(), JobStatus.OPENING, null); builder.assignToNode(job3.getId(), "node1"); PrelertMetadata expected = builder.build(); @@ -110,7 +109,7 @@ public class PrelertMetadataTests extends ESTestCase { public void testUpdateAllocation_setFinishedTime() { PrelertMetadata.Builder builder = new PrelertMetadata.Builder(); builder.putJob(buildJobBuilder("_job_id").build(), false); - builder.createAllocation("_job_id", false); + builder.updateStatus("_job_id", JobStatus.OPENING, null); builder.updateStatus("_job_id", JobStatus.OPENED, null); PrelertMetadata prelertMetadata = builder.build();