[ML] Reintroduced the closing job state

Closing a job may take a while. In the meantime it is possible to start a datafeed, because before this change the job state remained OPENED.
With this change when the executor node receives the close job request, it will first set the status to CLOSING and after that closes the job (closing autodetect process, etc.).

relates elastic/x-pack-elasticsearch#990

Original commit: elastic/x-pack-elasticsearch@d8d89c0756
This commit is contained in:
Martijn van Groningen 2017-04-07 20:49:37 +02:00
parent 6396edc6a7
commit 454d6b3390
3 changed files with 37 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -46,6 +47,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -278,8 +280,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, InternalClient client, ClusterService clusterService, InternalClient client,
Auditor auditor, PersistentTasksService persistentTasksService) { Auditor auditor, PersistentTasksService persistentTasksService) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters, super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT); indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.SAME);
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
this.auditor = auditor; this.auditor = auditor;
@ -332,10 +335,24 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
} }
@Override @Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) { protected void taskOperation(Request request, OpenJobAction.JobTask jobTask, ActionListener<Response> listener) {
task.closeJob("close job (api)"); JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
jobTask.closeJob("close job (api)");
listener.onResponse(new Response(true)); listener.onResponse(new Response(true));
} }
});
}, listener::onFailure));
}
@Override @Override
protected boolean accumulateExceptions() { protected boolean accumulateExceptions() {
@ -540,7 +557,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
continue; continue;
} }
if (MlMetadata.getJobState(resolvedJobId, tasks) == JobState.CLOSED) { if (MlMetadata.getJobState(resolvedJobId, tasks).isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
continue; continue;
} }
@ -562,7 +579,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
PersistentTasksCustomMetaData tasks = state.getMetaData() PersistentTasksCustomMetaData tasks = state.getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE); .custom(PersistentTasksCustomMetaData.TYPE);
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks); PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null) { if (jobTask == null || jobTask.getStatus() == null) {
throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT);
}
JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus();
if (jobTaskStatus.getState().isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT); throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT);
} }

View File

@ -22,7 +22,7 @@ import java.util.Locale;
*/ */
public enum JobState implements ToXContent, Writeable { public enum JobState implements ToXContent, Writeable {
CLOSED, OPENED, FAILED; CLOSING, CLOSED, OPENED, FAILED;
public static JobState fromString(String name) { public static JobState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT)); return valueOf(name.trim().toUpperCase(Locale.ROOT));

View File

@ -10,33 +10,39 @@ import org.elasticsearch.test.ESTestCase;
public class JobStateTests extends ESTestCase { public class JobStateTests extends ESTestCase {
public void testFromString() { public void testFromString() {
assertEquals(JobState.fromString("closing"), JobState.CLOSING);
assertEquals(JobState.fromString("closed"), JobState.CLOSED); assertEquals(JobState.fromString("closed"), JobState.CLOSED);
assertEquals(JobState.fromString("failed"), JobState.FAILED); assertEquals(JobState.fromString("failed"), JobState.FAILED);
assertEquals(JobState.fromString("opened"), JobState.OPENED); assertEquals(JobState.fromString("opened"), JobState.OPENED);
assertEquals(JobState.fromString("CLOSING"), JobState.CLOSING);
assertEquals(JobState.fromString("CLOSED"), JobState.CLOSED); assertEquals(JobState.fromString("CLOSED"), JobState.CLOSED);
assertEquals(JobState.fromString("FAILED"), JobState.FAILED); assertEquals(JobState.fromString("FAILED"), JobState.FAILED);
assertEquals(JobState.fromString("OPENED"), JobState.OPENED); assertEquals(JobState.fromString("OPENED"), JobState.OPENED);
} }
public void testToString() { public void testToString() {
assertEquals("closing", JobState.CLOSING.toString());
assertEquals("closed", JobState.CLOSED.toString()); assertEquals("closed", JobState.CLOSED.toString());
assertEquals("failed", JobState.FAILED.toString()); assertEquals("failed", JobState.FAILED.toString());
assertEquals("opened", JobState.OPENED.toString()); assertEquals("opened", JobState.OPENED.toString());
} }
public void testValidOrdinals() { public void testValidOrdinals() {
assertEquals(0, JobState.CLOSED.ordinal()); assertEquals(0, JobState.CLOSING.ordinal());
assertEquals(1, JobState.OPENED.ordinal()); assertEquals(1, JobState.CLOSED.ordinal());
assertEquals(2, JobState.FAILED.ordinal()); assertEquals(2, JobState.OPENED.ordinal());
assertEquals(3, JobState.FAILED.ordinal());
} }
public void testIsAnyOf() { public void testIsAnyOf() {
assertFalse(JobState.OPENED.isAnyOf()); assertFalse(JobState.OPENED.isAnyOf());
assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.FAILED)); assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.FAILED));
assertFalse(JobState.CLOSED.isAnyOf(JobState.FAILED, JobState.OPENED)); assertFalse(JobState.CLOSED.isAnyOf(JobState.FAILED, JobState.OPENED));
assertFalse(JobState.CLOSING.isAnyOf(JobState.FAILED, JobState.OPENED));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED));
assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED)); assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED));
assertTrue(JobState.CLOSING.isAnyOf(JobState.CLOSING));
} }
} }