create allocation when the job has been created instead of creating it when opening the job.

This avoids the confusing situation that a there is no allocation when a job hasn't been opened yet. Now it complains about the fact that the job status is closed.

Original commit: elastic/x-pack-elasticsearch@3159dc6954
This commit is contained in:
Martijn van Groningen 2016-12-07 17:15:18 +01:00
parent ccf8cb7e0d
commit 7cc2b8c5ce
9 changed files with 55 additions and 68 deletions

View File

@ -206,19 +206,18 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public void onNewClusterState(ClusterState state) {
String jobId = request.getJobId();
PrelertMetadata metadata = state.getMetaData().custom(PrelertMetadata.TYPE);
if (metadata != null) {
Allocation allocation = metadata.getAllocations().get(jobId);
if (allocation != null) {
if (allocation.getStatus() == JobStatus.OPENED) {
listener.onResponse(new Response(true));
} else {
String message = "[" + jobId + "] expected job status [" + JobStatus.OPENED + "], but got [" +
allocation.getStatus() + "], reason [" + allocation.getStatusReason() + "]";
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT));
}
Allocation allocation = metadata.getAllocations().get(jobId);
if (allocation != null) {
if (allocation.getStatus() == JobStatus.OPENED) {
listener.onResponse(new Response(true));
} else {
String message = "[" + jobId + "] expected job status [" + JobStatus.OPENED + "], but got [" +
allocation.getStatus() + "], reason [" + allocation.getStatusReason() + "]";
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT));
}
} else {
listener.onFailure(new IllegalStateException("no allocation for job [" + jobId + "]"));
}
listener.onFailure(new IllegalStateException("no allocation for job [" + jobId + "]"));
}
@Override

View File

@ -434,7 +434,10 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PrelertMetadata.Builder builder = new PrelertMetadata.Builder(currentState.metaData().custom(PrelertMetadata.TYPE));
builder.createAllocation(request.getJobId(), request.isIgnoreDowntime());
builder.updateStatus(request.getJobId(), JobStatus.OPENING, null);
if (request.isIgnoreDowntime()) {
builder.setIgnoreDowntime(request.getJobId());
}
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).putCustom(PrelertMetadata.TYPE, builder.build()))
.build();

View File

@ -63,7 +63,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
public Allocation(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.nodeId = in.readOptionalString();
this.jobId = in.readString();
this.ignoreDowntime = in.readBoolean();
this.status = JobStatus.fromStream(in);
@ -107,7 +107,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeOptionalString(nodeId);
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
status.writeTo(out);
@ -118,7 +118,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NODE_ID_FIELD.getPreferredName(), nodeId);
if (nodeId != null) {
builder.field(NODE_ID_FIELD.getPreferredName(), nodeId);
}
builder.field(JOB_ID_FIELD.getPreferredName(), jobId);
builder.field(IGNORE_DOWNTIME_FIELD.getPreferredName(), ignoreDowntime);
builder.field(STATUS.getPreferredName(), status);
@ -209,7 +211,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
break;
case OPENING:
if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED)) {
if (this.status.isAnyOf(JobStatus.CLOSED, JobStatus.FAILED) == false) {
throw new IllegalArgumentException("[" + jobId + "] expected status [" + JobStatus.CLOSED
+ "] or [" + JobStatus.FAILED + "], but got [" + status +"]");
}

View File

@ -205,6 +205,18 @@ public class PrelertMetadata implements MetaData.Custom {
throw ExceptionsHelper.jobAlreadyExists(job.getId());
}
this.jobs.put(job.getId(), job);
Allocation allocation = allocations.get(job.getId());
if (allocation == null) {
Allocation.Builder builder = new Allocation.Builder();
builder.setJobId(job.getId());
boolean addSchedulderState = job.getSchedulerConfig() != null;
if (addSchedulderState) {
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
}
builder.setStatus(JobStatus.CLOSED);
allocations.put(job.getId(), builder.build());
}
return this;
}
@ -249,35 +261,6 @@ public class PrelertMetadata implements MetaData.Custom {
return new PrelertMetadata(jobs, allocations);
}
public Builder createAllocation(String jobId, boolean ignoreDowntime) {
Job job = jobs.get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
Allocation allocation = allocations.get(jobId);
Allocation.Builder builder;
if (allocation == null) {
builder = new Allocation.Builder();
builder.setJobId(jobId);
boolean addSchedulderState = job.getSchedulerConfig() != null;
if (addSchedulderState) {
builder.setSchedulerState(new SchedulerState(JobSchedulerStatus.STOPPED, null, null));
}
} else {
if (allocation.getStatus() != JobStatus.CLOSED) {
throw ExceptionsHelper.conflictStatusException("[" + jobId + "] expected status [" + JobStatus.CLOSED
+ "], but got [" + allocation.getStatus() +"]");
}
builder = new Allocation.Builder(allocation);
}
builder.setStatus(JobStatus.OPENING);
builder.setIgnoreDowntime(ignoreDowntime);
allocations.put(jobId, builder.build());
return this;
}
public Builder assignToNode(String jobId, String nodeId) {
Allocation allocation = allocations.get(jobId);
if (allocation == null) {
@ -307,6 +290,17 @@ public class PrelertMetadata implements MetaData.Custom {
allocations.put(jobId, builder.build());
return this;
}
public Builder setIgnoreDowntime(String jobId) {
Allocation allocation = allocations.get(jobId);
if (allocation == null) {
throw new IllegalStateException("[" + jobId + "] no allocation to ignore downtime");
}
Allocation.Builder builder = new Allocation.Builder(allocation);
builder.setIgnoreDowntime(true);
allocations.put(jobId, builder.build());
return this;
}
}
}

View File

@ -94,7 +94,6 @@ public class JobManagerTests extends ESTestCase {
allocation.setJobId(job.getId());
allocation.setStatus(JobStatus.OPENING);
PrelertMetadata.Builder newMetadata = new PrelertMetadata.Builder(clusterState.metaData().custom(PrelertMetadata.TYPE));
newMetadata.createAllocation(job.getId(), false);
newMetadata.assignToNode(job.getId(), "myNode");
newMetadata.updateAllocation(job.getId(), allocation.build());
@ -137,7 +136,6 @@ public class JobManagerTests extends ESTestCase {
Job job = buildJobBuilder("foo").build();
PrelertMetadata prelertMetadata = new PrelertMetadata.Builder()
.putJob(job, false)
.createAllocation("foo", false)
.assignToNode("foo", "nodeId")
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))

View File

@ -17,7 +17,7 @@ public class AllocationTests extends AbstractSerializingTestCase<Allocation> {
@Override
protected Allocation createTestInstance() {
String nodeId = randomAsciiOfLength(10);
String nodeId = randomBoolean() ? randomAsciiOfLength(10) : null;
String jobId = randomAsciiOfLength(10);
boolean ignoreDowntime = randomBoolean();
JobStatus jobStatus = randomFrom(JobStatus.values());

View File

@ -56,7 +56,6 @@ public class JobAllocatorTests extends ESTestCase {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder(cs.metaData().custom(PrelertMetadata.TYPE));
pmBuilder.putJob((buildJobBuilder("_job_id").build()), false);
pmBuilder.createAllocation("_job_id", false);
cs = ClusterState.builder(cs).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.build();
@ -73,7 +72,6 @@ public class JobAllocatorTests extends ESTestCase {
public void testAssignJobsToNodes() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -154,7 +152,6 @@ public class JobAllocatorTests extends ESTestCase {
// add an allocated job
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_id").build(), false);
pmBuilder.createAllocation("_id", false);
pmBuilder.assignToNode("_id", "_node_id");
cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
@ -171,7 +168,6 @@ public class JobAllocatorTests extends ESTestCase {
// make job not allocated
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_id", new LocalTransportAddress("_id"), Version.CURRENT))
@ -198,8 +194,6 @@ public class JobAllocatorTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescriptionBuilder);
pmBuilder.putJob(jobBuilder.build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()

View File

@ -62,7 +62,6 @@ public class JobLifeCycleServiceTests extends ESTestCase {
public void testClusterChanged_startJob() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -75,8 +74,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null);
pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null);
cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -89,7 +87,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null);
pmBuilder.assignToNode("_job_id", "_node_id");
cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
@ -111,7 +109,6 @@ public class JobLifeCycleServiceTests extends ESTestCase {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()
@ -123,7 +120,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.createAllocation("_job_id", false);
pmBuilder.updateStatus("_job_id", JobStatus.OPENING, null);
pmBuilder.updateStatus("_job_id", JobStatus.OPENED, null);
pmBuilder.updateStatus("_job_id", JobStatus.CLOSING, null);
pmBuilder.assignToNode("_job_id", "_node_id");
@ -143,6 +140,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
pmBuilder.putJob(buildJobBuilder("_job_id").build(), false);
pmBuilder.removeJob("_job_id");
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
.putCustom(PrelertMetadata.TYPE, pmBuilder.build()))
.nodes(DiscoveryNodes.builder()

View File

@ -41,12 +41,11 @@ public class PrelertMetadataTests extends ESTestCase {
builder.putJob(job2, false);
builder.putJob(job3, false);
builder.createAllocation(job1.getId(), false);
builder.assignToNode(job1.getId(), "node1");
builder.createAllocation(job2.getId(), false);
builder.updateStatus(job1.getId(), JobStatus.OPENING, null);
builder.assignToNode(job2.getId(), "node1");
builder.createAllocation(job3.getId(), false);
builder.updateStatus(job2.getId(), JobStatus.OPENING, null);
builder.assignToNode(job3.getId(), "node1");
builder.updateStatus(job3.getId(), JobStatus.OPENING, null);
PrelertMetadata expected = builder.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -68,11 +67,11 @@ public class PrelertMetadataTests extends ESTestCase {
builder.putJob(job2, false);
builder.putJob(job3, false);
builder.createAllocation(job1.getId(), false);
builder.updateStatus(job1.getId(), JobStatus.OPENING, null);
builder.assignToNode(job1.getId(), "node1");
builder.createAllocation(job2.getId(), false);
builder.updateStatus(job2.getId(), JobStatus.OPENING, null);
builder.assignToNode(job2.getId(), "node1");
builder.createAllocation(job3.getId(), false);
builder.updateStatus(job3.getId(), JobStatus.OPENING, null);
builder.assignToNode(job3.getId(), "node1");
PrelertMetadata expected = builder.build();
@ -110,7 +109,7 @@ public class PrelertMetadataTests extends ESTestCase {
public void testUpdateAllocation_setFinishedTime() {
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(buildJobBuilder("_job_id").build(), false);
builder.createAllocation("_job_id", false);
builder.updateStatus("_job_id", JobStatus.OPENING, null);
builder.updateStatus("_job_id", JobStatus.OPENED, null);
PrelertMetadata prelertMetadata = builder.build();