Update the job status only once when closing (or pausing) the analytical process.

Also changed the the post data api to change the job status to running when the analytical process is started.

Closes elastic/elasticsearch#319

Original commit: elastic/x-pack-elasticsearch@b38d52d849
This commit is contained in:
Martijn van Groningen 2016-11-28 11:46:41 +01:00
parent 2e78706a3f
commit ee132337b5
17 changed files with 191 additions and 60 deletions

View File

@ -161,7 +161,6 @@ public class PostDataCloseAction extends Action<PostDataCloseAction.Request, Pos
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
UpdateJobStatusAction.Request updateStatusRequest = new UpdateJobStatusAction.Request(request.getJobId(), JobStatus.CLOSING);
ActionListener<UpdateJobStatusAction.Response> delegateListener = new ActionListener<UpdateJobStatusAction.Response>() {
@Override

View File

@ -180,7 +180,7 @@ public class UpdateJobSchedulerStatusAction extends Action<UpdateJobSchedulerSta
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.updateSchedulerStatus(request.getJobId(), request.getSchedulerStatus());
jobManager.updateSchedulerStatus(request, listener);
}
@Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.prelert.job.data;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
@ -48,6 +49,7 @@ public interface DataProcessor {
* Stop the running job and mark it as finished.<br>
*
* @param jobId The job to stop
* @param nextStatus The final status to set when analytical process has stopped
*/
void closeJob(String jobId);
void closeJob(String jobId, JobStatus nextStatus);
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.manager;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
@ -14,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
@ -54,6 +56,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, Setting.Property.NodeScope);
private final Client client;
private final int maxRunningJobs;
private final ThreadPool threadPool;
private final JobManager jobManager;
@ -73,6 +76,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.maxRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.parser = parser;
@ -96,7 +100,9 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
return create(id, params.isIgnoreDowntime());
AutodetectCommunicator c = create(id, params.isIgnoreDowntime());
setJobStatus(jobId, JobStatus.RUNNING);
return c;
});
try {
return communicator.writeToJob(input, params);
@ -172,7 +178,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
@Override
public void closeJob(String jobId) {
public void closeJob(String jobId, JobStatus nextStatus) {
logger.debug("Closing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId);
if (communicator == null) {
@ -182,9 +188,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
try {
communicator.close();
setJobFinishedTimeAndStatus(jobId, JobStatus.CLOSED);
// TODO check for errors from autodetect
// TODO delete associated files (model config etc)
setJobStatus(jobId, nextStatus);
} catch (Exception e) {
logger.warn("Exception closing stopped process input stream", e);
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
@ -207,10 +211,19 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
return Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now());
}
private void setJobFinishedTimeAndStatus(String jobId, JobStatus status) {
// NORELEASE Implement this.
// Perhaps move the JobStatus and finish time to a separate document stored outside the cluster state
logger.error("Cannot set finished job status and time- Not Implemented");
private void setJobStatus(String jobId, JobStatus status) {
UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status);
client.execute(UpdateJobStatusAction.INSTANCE, request, new ActionListener<UpdateJobStatusAction.Response>() {
@Override
public void onResponse(UpdateJobStatusAction.Response response) {
logger.info("Successfully set job status to [{}] for job [{}]", status, jobId);
}
@Override
public void onFailure(Exception e) {
logger.error("Could not set job status to [" + status + "] for job [" + jobId +"]", e);
}
});
}
public Optional<ModelSizeStats> getModelSizeStats(String jobId) {

View File

@ -10,13 +10,11 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.prelert.action.DeleteJobAction;
import org.elasticsearch.xpack.prelert.action.PauseJobAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
@ -24,6 +22,7 @@ import org.elasticsearch.xpack.prelert.action.ResumeJobAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
@ -356,8 +355,12 @@ public class JobManager extends AbstractComponent {
return Optional.ofNullable(allocation.getSchedulerState());
}
public void updateSchedulerStatus(String jobId, JobSchedulerStatus newStatus) {
clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId, new ClusterStateUpdateTask() {
public void updateSchedulerStatus(UpdateJobSchedulerStatusAction.Request request,
ActionListener<UpdateJobSchedulerStatusAction.Response> actionListener) {
String jobId = request.getJobId();
JobSchedulerStatus newStatus = request.getSchedulerStatus();
clusterService.submitStateUpdateTask("update-scheduler-status-job-" + jobId,
new AckedClusterStateUpdateTask<UpdateJobSchedulerStatusAction.Response>(request, actionListener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -365,8 +368,8 @@ public class JobManager extends AbstractComponent {
}
@Override
public void onFailure(String source, Exception e) {
LOGGER.error("Error updating scheduler status: source=[" + source + "], status=[" + newStatus + "]", e);
protected UpdateJobSchedulerStatusAction.Response newResponse(boolean acknowledged) {
return new UpdateJobSchedulerStatusAction.Response(acknowledged);
}
});
}
@ -527,7 +530,7 @@ public class JobManager extends AbstractComponent {
}
public void setJobStatus(UpdateJobStatusAction.Request request, ActionListener<UpdateJobStatusAction.Response> actionListener) {
clusterService.submitStateUpdateTask("set-paused-status-job-" + request.getJobId(),
clusterService.submitStateUpdateTask("set-job-status-" + request.getStatus() + "-" + request.getJobId(),
new AckedClusterStateUpdateTask<UpdateJobStatusAction.Response>(request, actionListener) {
@Override

View File

@ -137,7 +137,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
private String nodeId;
private String jobId;
private JobStatus status = JobStatus.CLOSED;
private JobStatus status;
private SchedulerState schedulerState;
public Builder() {
@ -158,8 +158,32 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.jobId = jobId;
}
public void setStatus(JobStatus status) {
this.status = status;
public void setStatus(JobStatus newStatus) {
switch (newStatus) {
case CLOSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closing");
}
break;
case PAUSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closing");
}
if (this.status == JobStatus.PAUSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is pausing");
}
if (this.status == JobStatus.PAUSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is already paused");
}
}
this.status = newStatus;
}
public void setSchedulerState(SchedulerState schedulerState) {
@ -200,6 +224,9 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
}
public Allocation build() {
if (status == null) {
status = JobStatus.CLOSED;
}
return new Allocation(nodeId, jobId, status, schedulerState);
}

View File

@ -99,6 +99,8 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
break;
case PAUSED:
break;
case FAILED:
break;
default:
throw new IllegalStateException("Unknown job status [" + status + "]");
}
@ -109,12 +111,12 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
if (schedulerState != null) {
switch (schedulerState.getStatus()) {
case STARTING:
scheduledJobService.start(job, allocation);
executor.execute(() -> scheduledJobService.start(job, allocation));
break;
case STARTED:
break;
case STOPPING:
scheduledJobService.stop(allocation);
executor.execute(() -> scheduledJobService.stop(allocation));
break;
case STOPPED:
break;
@ -147,25 +149,21 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
private void closeJob(Job job) {
try {
// NORELEASE Ensure this also removes the job auto-close timeout task
dataProcessor.closeJob(job.getId());
dataProcessor.closeJob(job.getId(), JobStatus.CLOSED);
} catch (ElasticsearchException e) {
logger.error("Failed to close job [" + job.getId() + "]", e);
updateJobStatus(job.getId(), JobStatus.FAILED);
return;
}
updateJobStatus(job.getId(), JobStatus.CLOSED);
}
private void pauseJob(Job job) {
try {
// NORELEASE Ensure this also removes the job auto-close timeout task
dataProcessor.closeJob(job.getId());
dataProcessor.closeJob(job.getId(), JobStatus.PAUSED);
} catch (ElasticsearchException e) {
logger.error("Failed to close job [" + job.getId() + "] while pausing", e);
updateJobStatus(job.getId(), JobStatus.FAILED);
return;
}
updateJobStatus(job.getId(), JobStatus.PAUSED);
}
private void updateJobStatus(String jobId, JobStatus status) {

View File

@ -19,12 +19,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
@ -235,6 +237,11 @@ public class PrelertMetadata implements MetaData.Custom {
if (previous == null) {
throw new IllegalStateException("Expected that job [" + jobId + "] was already allocated");
}
if (previous.getStatus() != updated.getStatus() && updated.getStatus() == JobStatus.CLOSED) {
Job.Builder job = new Job.Builder(this.jobs.get(jobId));
job.setFinishedTime(new Date());
this.jobs.put(job.getId(), job.build());
}
return this;
}

View File

@ -83,6 +83,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
builder.value(result);
pipedProcessOutStream.write(builder.string().getBytes(StandardCharsets.UTF_8));
pipedProcessOutStream.flush();
pipedProcessOutStream.write(',');
return FLUSH_ID;
}
@ -92,7 +93,10 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public void close() throws IOException {
pipedProcessOutStream.write('{');
pipedProcessOutStream.write('}');
pipedProcessOutStream.write(']');
pipedProcessOutStream.flush();
pipedProcessOutStream.close();
pipedPersistStream.close();
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
@ -13,6 +15,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -36,6 +39,7 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ -197,6 +201,18 @@ public class ScheduledJobsIT extends ESIntegTestCase {
StopJobSchedulerAction.Response response =
client.execute(StopJobSchedulerAction.INSTANCE, new StopJobSchedulerAction.Request(jobId)).get();
assertTrue(response.isAcknowledged());
assertBusy(() -> {
GetJobsAction.Response r = null;
try {
GetJobsAction.Request request = new GetJobsAction.Request();
request.setJobId(jobId);
request.schedulerStatus(true);
r = client.execute(GetJobsAction.INSTANCE, request).get();
} catch (Exception e) {
fail();
}
assertThat(r.getResponse().results().get(0).getSchedulerState().getStatus(), equalTo(JobSchedulerStatus.STOPPED));
});
} catch (Exception e) {
// ignore
}

View File

@ -244,37 +244,57 @@ public class PrelertJobIT extends ESRestTestCase {
public void testPauseAndResumeJob() throws Exception {
createFarequoteJob();
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause");
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause"));
assertThat(e.getMessage(), containsString("[farequote][CLOSED] can't pause a job that is closed"));
client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(),
new StringEntity("time,airline,responsetime,sourcetype\n" +
"2014-06-23 00:00:00Z,AAL,132.2046,farequote"));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote",
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause");
assertBusy(() -> {
try {
Response response = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote");
String responseEntityToString = responseEntityToString(response);
assertThat(responseEntityToString, containsString("\"ignoreDowntime\":\"ONCE\""));
} catch (Exception e) {
} catch (Exception e1) {
fail();
}
}, 2, TimeUnit.SECONDS);
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume");
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is CLOSED"));
}
public void testPauseJob_GivenJobIsPaused() throws Exception {
createFarequoteJob();
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause");
ResponseException e = expectThrows(ResponseException.class,
e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_pause"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot pause job 'farequote' while its status is PAUSED"));
client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume");
client().performRequest("post", PrelertPlugin.BASE_PATH + "data/farequote/", Collections.emptyMap(),
new StringEntity("time,airline,responsetime,sourcetype\n" +
"2014-06-23 00:00:00Z,AAL,132.2046,farequote"));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/farequote",
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
e = expectThrows(ResponseException.class,
() -> client().performRequest("post", PrelertPlugin.BASE_PATH + "jobs/farequote/_resume"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(409));
assertThat(e.getMessage(), containsString("Cannot resume job 'farequote' while its status is RUNNING"));
}
public void testResumeJob_GivenJobIsClosed() throws Exception {

View File

@ -82,6 +82,15 @@ public class ScheduledJobIT extends ESRestTestCase {
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStartedState(jobId);
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"RUNNING\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId));
@ -92,9 +101,19 @@ public class ScheduledJobIT extends ESRestTestCase {
response = client().performRequest("post", PrelertPlugin.BASE_PATH + "schedulers/" + jobId + "/_stop");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
waitForSchedulerStoppedState(client(), jobId);
client().performRequest("POST", "/_xpack/prelert/data/" + jobId + "/_close");
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get", PrelertPlugin.BASE_PATH + "jobs/" + jobId,
Collections.singletonMap("metric", "status"));
assertThat(responseEntityToString(getJobResponse), containsString("\"status\":\"CLOSED\""));
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
response = client().performRequest("delete", PrelertPlugin.BASE_PATH + "jobs/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));

View File

@ -44,8 +44,8 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
ModelSnapshot modelSnapshot2 = createFullyPopulated();
modelSnapshot2.setTimestamp(modelSnapshot1.getTimestamp());
assertTrue(modelSnapshot1.equals(modelSnapshot2));
assertTrue(modelSnapshot2.equals(modelSnapshot1));
assertEquals(modelSnapshot1, modelSnapshot2);
assertEquals(modelSnapshot2, modelSnapshot1);
assertEquals(modelSnapshot1.hashCode(), modelSnapshot2.hashCode());
}
@ -151,7 +151,9 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
modelSnapshot.setRestorePriority(DEFAULT_PRIORITY);
modelSnapshot.setSnapshotId(DEFAULT_ID);
modelSnapshot.setSnapshotDocCount(DEFAULT_DOC_COUNT);
modelSnapshot.setModelSizeStats(new ModelSizeStats.Builder("foo"));
ModelSizeStats.Builder modelSizeStatsBuilder = new ModelSizeStats.Builder("foo");
modelSizeStatsBuilder.setLogTime(null);
modelSnapshot.setModelSizeStats(modelSizeStatsBuilder);
modelSnapshot.setLatestResultTimeStamp(DEFAULT_LATEST_RESULT_TIMESTAMP);
modelSnapshot.setLatestRecordTimeStamp(DEFAULT_LATEST_RECORD_TIMESTAMP);
modelSnapshot.setQuantiles(new Quantiles("foo", DEFAULT_TIMESTAMP, "state"));

View File

@ -103,7 +103,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
// job is created
assertEquals(1, manager.numberOfRunningJobs());
manager.closeJob("foo");
manager.closeJob("foo", JobStatus.CLOSED);
assertEquals(0, manager.numberOfRunningJobs());
}
@ -208,6 +208,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private void givenAllocationWithStatus(JobStatus status) {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(status);
when(jobManager.getJobAllocation("foo")).thenReturn(allocation.build());
}

View File

@ -90,6 +90,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("foo");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(JobStatus.PAUSING);
pmBuilder.updateAllocation("foo", allocation.build());
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
@ -101,9 +102,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
verify(dataProcessor).closeJob("foo");
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.PAUSED);
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
verify(dataProcessor).closeJob("foo", JobStatus.PAUSED);
}
public void testClusterChanged_GivenJobIsPausingAndCloseJobThrows() {
@ -114,6 +113,7 @@ public class JobLifeCycleServiceTests extends ESTestCase {
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("foo");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING); // from running we can go to other statuses
allocation.setStatus(JobStatus.PAUSING);
pmBuilder.updateAllocation("foo", allocation.build());
ClusterState cs1 = ClusterState.builder(new ClusterName("_cluster_name")).metaData(MetaData.builder()
@ -122,11 +122,11 @@ public class JobLifeCycleServiceTests extends ESTestCase {
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
.localNodeId("_node_id"))
.build();
doThrow(new ElasticsearchException("")).when(dataProcessor).closeJob("foo");
doThrow(new ElasticsearchException("")).when(dataProcessor).closeJob("foo", JobStatus.PAUSED);
jobLifeCycleService.clusterChanged(new ClusterChangedEvent("_source", cs1, cs1));
verify(dataProcessor).closeJob("foo");
verify(dataProcessor).closeJob("foo", JobStatus.PAUSED);
UpdateJobStatusAction.Request expectedRequest = new UpdateJobStatusAction.Request("foo", JobStatus.FAILED);
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.JobTests;
import java.io.ByteArrayInputStream;
@ -25,6 +26,7 @@ import java.io.IOException;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class PrelertMetadataTests extends ESTestCase {
@ -99,4 +101,22 @@ public class PrelertMetadataTests extends ESTestCase {
assertThat(result.getJobs().get("2"), notNullValue());
}
public void testUpdateAllocation_setFinishedTime() {
PrelertMetadata.Builder builder = new PrelertMetadata.Builder();
builder.putJob(buildJobBuilder("_job_id").build(), false);
builder.putAllocation("_node_id", "_job_id");
PrelertMetadata prelertMetadata = builder.build();
builder = new PrelertMetadata.Builder(prelertMetadata);
Allocation.Builder allocation = new Allocation.Builder();
allocation.setJobId("_job_id");
allocation.setNodeId("_node_id");
allocation.setStatus(JobStatus.RUNNING);
builder.updateAllocation("_job_id", allocation.build());
assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), nullValue());
allocation.setStatus(JobStatus.CLOSED);
builder.updateAllocation("_job_id", allocation.build());
assertThat(builder.build().getJobs().get("_job_id").getFinishedTime(), notNullValue());
}
}

View File

@ -109,7 +109,7 @@ setup:
- is_false: jobs.0.data_counts
- is_false: jobs.0.model_size_stats
- is_false: jobs.0.scheduler_state
- match: { jobs.0.status: CLOSED }
- match: { jobs.0.status: RUNNING }
- do:
xpack.prelert.get_jobs:
@ -118,7 +118,7 @@ setup:
- is_true: jobs.0.config
- is_true: jobs.0.data_counts
- is_false: jobs.0.scheduler_state
- match: { jobs.0.status: CLOSED }
- match: { jobs.0.status: RUNNING }
- do:
xpack.prelert.get_jobs: