[ML] Changed job and datafeed lifecycle management

* Removed OPENING and CLOSING job states. Instead when persistent task has been created and
   status hasn't been set then this means we haven't yet started, when the executor changes it to STARTED we have.
   The coordinating node will monitor cs for a period of time until that happens and then returns or times out.
* Refactored job close api to go to node running job task and close job there.
* Changed unexpected job and datafeed exception messages to not mention the state and instead mention that job/datafeed haven't yet started/stopped.

Original commit: elastic/x-pack-elasticsearch@37e778b585
This commit is contained in:
Martijn van Groningen 2017-03-28 14:55:50 +02:00
parent 905fa16f81
commit f2654b5872
32 changed files with 420 additions and 472 deletions

View File

@ -37,12 +37,12 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.CloseJobService;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
@ -324,7 +324,6 @@ public class MachineLearning implements ActionPlugin {
persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService),
auditor,
new CloseJobService(internalClient, threadPool, clusterService),
invalidLicenseEnforcer
);
}
@ -406,6 +405,7 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(GetRecordsAction.INSTANCE, GetRecordsAction.TransportAction.class),
new ActionHandler<>(PostDataAction.INSTANCE, PostDataAction.TransportAction.class),
new ActionHandler<>(CloseJobAction.INSTANCE, CloseJobAction.TransportAction.class),
new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, FinalizeJobExecutionAction.TransportAction.class),
new ActionHandler<>(FlushJobAction.INSTANCE, FlushJobAction.TransportAction.class),
new ActionHandler<>(ValidateDetectorAction.INSTANCE, ValidateDetectorAction.TransportAction.class),
new ActionHandler<>(ValidateJobConfigAction.INSTANCE, ValidateJobConfigAction.TransportAction.class),

View File

@ -357,13 +357,12 @@ public class MlMetadata implements MetaData.Custom {
}
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeed.get().getId() + "] refers to it");
}
JobState jobState = getJobState(jobId, tasks);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +
JobState.CLOSED + "]");
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
if (jobTask != null) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job hasn't been closed");
}
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setDeleted(true);

View File

@ -9,33 +9,27 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -43,15 +37,14 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -74,7 +67,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return new Response();
}
public static class Request extends MasterNodeRequest<Request> implements ToXContent {
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> implements ToXContent {
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
@ -83,7 +76,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
request.setCloseTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
}
@ -95,29 +88,24 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return request;
}
private String jobId;
private TimeValue timeout = TimeValue.timeValueMinutes(20);
private boolean force = false;
Request() {}
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
public String getJobId() {
return jobId;
super(jobId);
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public TimeValue getTimeout() {
public TimeValue getCloseTimeout() {
return timeout;
}
public void setTimeout(TimeValue timeout) {
public void setCloseTimeout(TimeValue timeout) {
this.timeout = timeout;
}
@ -129,15 +117,9 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
this.force = force;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
timeout = new TimeValue(in);
force = in.readBoolean();
}
@ -145,7 +127,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
timeout.writeTo(out);
out.writeBoolean(force);
}
@ -155,13 +136,14 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.field(FORCE.getPreferredName(), force);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobId, timeout);
return Objects.hash(jobId, timeout, force);
}
@Override
@ -186,14 +168,19 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean closed;
Response() {
}
Response(StreamInput in) throws IOException {
readFrom(in);
}
Response(boolean closed) {
super(null, null);
this.closed = closed;
}
@ -235,78 +222,47 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
public static class TransportAction extends TransportJobTaskAction<OpenJobAction.JobTask, Request, Response> {
private final InternalClient client;
private final ClusterService clusterService;
private final CloseJobService closeJobService;
private final Client client;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, CloseJobService closeJobService, Client client) {
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
ClusterService clusterService, AutodetectProcessManager manager, Client client) {
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT, manager);
this.client = new InternalClient(settings, threadPool, client);
this.clusterService = clusterService;
this.closeJobService = closeJobService;
this.client = client;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
if (request.isForce()) {
forceCloseJob(client, request.getJobId(), state, listener);
forceCloseJob(request.getJobId(), listener);
} else {
closeJob(request, listener);
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure);
super.doExecute(task, request, finalListener);
}
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener,
ClusterState state) {
validate(request.getJobId(), state);
task.closeJob("close job (api)");
listener.onResponse(new Response(true));
}
private void closeJob(Request request, ActionListener<Response> listener) {
clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return moveJobToClosingState(request.getJobId(), currentState);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
closeJobService.closeJob(request, listener);
}
});
}
});
@Override
protected Response readTaskResponse(StreamInput in) throws IOException {
return new Response(in);
}
private void forceCloseJob(Client client, String jobId, ClusterState currentState,
ActionListener<Response> listener) {
private void forceCloseJob(String jobId, ActionListener<Response> listener) {
ClusterState currentState = clusterService.state();
PersistentTask<?> task = MlMetadata.getJobTask(jobId,
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
if (task != null) {
@ -323,58 +279,36 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
}
static PersistentTask<?> validateAndFindTask(String jobId, ClusterState state) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (mlMetadata.getJobs().containsKey(jobId) == false) {
throw ExceptionsHelper.missingJobException(jobId);
// Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed
// This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here.
void waitForJobClosed(Request request, Response response, ActionListener<Response> listener) {
JobStateObserver observer = new JobStateObserver(threadPool, clusterService);
observer.waitForState(request.getJobId(), request.getCloseTimeout(), JobState.CLOSED, e -> {
if (e != null) {
listener.onFailure(e);
} else {
FinalizeJobExecutionAction.Request finalizeRequest =
new FinalizeJobExecutionAction.Request(request.getJobId());
client.execute(FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r-> listener.onResponse(response), listener::onFailure));
}
});
}
}
static void validate(String jobId, ClusterState state) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
if (datafeedState != DatafeedState.STOPPED) {
throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped",
RestStatus.CONFLICT, jobId);
RestStatus.CONFLICT, jobId);
}
}
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) {
JobState jobState = (JobState) jobTask.getStatus();
if (jobState.isAnyOf(JobState.OPENED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]",
RestStatus.CONFLICT, jobId, JobState.OPENED, jobState);
}
return jobTask;
}
throw new ElasticsearchStatusException("cannot close job [{}], expected job state [{}], but got [{}]",
RestStatus.CONFLICT, jobId, JobState.OPENED, JobState.CLOSED);
}
static ClusterState moveJobToClosingState(String jobId, ClusterState currentState) {
PersistentTask<?> task = validateAndFindTask(jobId, currentState);
PersistentTasksCustomMetaData currentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Map<Long, PersistentTask<?>> updatedTasks = new HashMap<>(currentTasks.taskMap());
PersistentTask<?> taskToUpdate = currentTasks.getTask(task.getId());
taskToUpdate = new PersistentTask<>(taskToUpdate, JobState.CLOSING);
updatedTasks.put(taskToUpdate.getId(), taskToUpdate);
PersistentTasksCustomMetaData newTasks = new PersistentTasksCustomMetaData(currentTasks.getCurrentId(), updatedTasks);
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(new Date());
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder
.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, newTasks))
.build();
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
/**
* Service that interacts with a client to close jobs remotely.
*/
// Ideally this would sit in CloseJobAction.TransportAction, but we can't inject a client there as
// it would lead to cyclic dependency issue, so we isolate it here.
public class CloseJobService {
private final Client client;
private final JobStateObserver observer;
public CloseJobService(Client client, ThreadPool threadPool, ClusterService clusterService) {
this.client = client;
this.observer = new JobStateObserver(threadPool, clusterService);
}
void closeJob(CloseJobAction.Request request, ActionListener<CloseJobAction.Response> listener) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
client.admin().cluster().listTasks(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + request.getJobId();
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> {
if (e == null) {
listener.onResponse(new CloseJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}, listener::onFailure));
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]"));
}, listener::onFailure));
}
}

View File

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
import java.util.Date;
public class FinalizeJobExecutionAction extends Action<FinalizeJobExecutionAction.Request,
FinalizeJobExecutionAction.Response,FinalizeJobExecutionAction.RequestBuilder> {
public static final FinalizeJobExecutionAction INSTANCE = new FinalizeJobExecutionAction();
public static final String NAME = "cluster:internal/ml/job/finalize_job_execution";
private FinalizeJobExecutionAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, INSTANCE);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private String jobId;
public Request(String jobId) {
this.jobId = jobId;
}
Request() {
}
public String getJobId() {
return jobId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
public static class RequestBuilder
extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, FinalizeJobExecutionAction action) {
super(client, action, new Request());
}
}
public static class Response extends AcknowledgedResponse {
Response(boolean acknowledged) {
super(acknowledged);
}
Response() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
String jobId = request.getJobId();
String source = "finalize_job_execution [" + jobId + "]";
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(new Date());
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
.build();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState,
ClusterState newState) {
listener.onResponse(new Response(true));
}
});
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -255,7 +256,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task,
ActionListener<FlushJobAction.Response> listener) {
ActionListener<Response> listener, ClusterState state) {
InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
if (request.getAdvanceTime() != null) {

View File

@ -18,7 +18,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -268,6 +267,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
protected void onCancelled() {
String reason = getReasonCancelled();
closeJob(reason);
}
void closeJob(String reason) {
autodetectProcessManager.closeJob(jobId, false, reason);
}
@ -368,7 +371,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
// simply because there are no ml nodes in the cluster then we fail quickly here:
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
OpenJobAction.validate(request.getJobId(), mlMetadata, tasks, clusterState.nodes());
OpenJobAction.validate(request.getJobId(), mlMetadata, tasks);
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
if (assignment.getExecutorNode() == null) {
throw new ElasticsearchStatusException("cannot open job [" + request.getJobId() + "], no suitable nodes found, " +
@ -383,19 +386,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
protected void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> {
if (e1 != null) {
listener.onFailure(e1);
return;
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) {
listener.onResponse(new TransportResponse.Empty());
} else {
listener.onFailure(e2);
}
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) {
listener.onResponse(new TransportResponse.Empty());
} else {
listener.onFailure(e2);
}
});
});
}
@ -431,19 +427,17 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
* Fail fast before trying to update the job state on master node if the job doesn't exist or its state
* is not what it should be.
*/
static void validate(String jobId, MlMetadata mlMetadata, @Nullable PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes) {
static void validate(String jobId, MlMetadata mlMetadata, @Nullable PersistentTasksCustomMetaData tasks) {
Job job = mlMetadata.getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (job.isDeleted()) {
throw new ElasticsearchStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted",
RestStatus.CONFLICT);
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted");
}
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.CLOSED
+ "] or [" + JobState.FAILED + "], but got [" + jobState + "]", RestStatus.CONFLICT);
PersistentTasksCustomMetaData.PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
if (task != null) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has already been opened");
}
}
@ -481,7 +475,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
JobState jobTaskState = (JobState) task.getStatus();
return jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
jobTaskState == JobState.OPENING || // executor node is busy starting the cpp process
task.isCurrentStatus() == false; // previous executor node failed and
// current executor node didn't have the chance to set job status to OPENING
}).size();

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -251,7 +252,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener, ClusterState state) {
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
@ -38,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
@ -100,7 +98,7 @@ public class StartDatafeedAction
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((request, startTime) -> request.startTime = parseDateOrThrow(
startTime, START_TIME, () -> System.currentTimeMillis()), START_TIME);
startTime, START_TIME, System::currentTimeMillis), START_TIME);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
@ -140,7 +138,7 @@ public class StartDatafeedAction
}
public Request(String datafeedId, String startTime) {
this(datafeedId, parseDateOrThrow(startTime, START_TIME, () -> System.currentTimeMillis()));
this(datafeedId, parseDateOrThrow(startTime, START_TIME, System::currentTimeMillis));
}
public Request(StreamInput in) throws IOException {
@ -163,7 +161,7 @@ public class StartDatafeedAction
}
public void setEndTime(String endTime) {
setEndTime(parseDateOrThrow(endTime, END_TIME, () -> System.currentTimeMillis()));
setEndTime(parseDateOrThrow(endTime, END_TIME, System::currentTimeMillis));
}
public void setEndTime(Long endTime) {
@ -413,8 +411,7 @@ public class StartDatafeedAction
if (licenseState.isMachineLearningAllowed()) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
DiscoveryNodes nodes = clusterState.getNodes();
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes);
StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks);
} else {
throw LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING);
}
@ -462,7 +459,7 @@ public class StartDatafeedAction
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes) {
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
@ -474,18 +471,17 @@ public class StartDatafeedAction
DatafeedJobValidator.validate(datafeed, job);
JobState jobState = MlMetadata.getJobState(datafeed.getJobId(), tasks);
if (jobState != JobState.OPENED) {
throw new ElasticsearchStatusException("cannot start datafeed, expected job state [{}], but got [{}]",
RestStatus.CONFLICT, JobState.OPENED, jobState);
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +
"] hasn't been opened");
}
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedState == DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed [{}] already started, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, datafeedId, DatafeedState.STOPPED, DatafeedState.STARTED);
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because it has already been started");
}
}
public static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -34,7 +33,6 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -48,6 +46,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.List;
@ -220,7 +219,7 @@ public class StopDatafeedAction
public static class TransportAction extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
private final Client client;
private final InternalClient client;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@ -228,7 +227,7 @@ public class StopDatafeedAction
ClusterService clusterService, Client client) {
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, MachineLearning.THREAD_POOL_NAME);
this.client = client;
this.client = new InternalClient(settings, threadPool, client);
}
@Override
@ -240,7 +239,7 @@ public class StopDatafeedAction
if (request.force) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
if (datafeedTask != null) {
forceStopTask(client, datafeedTask.getId(), listener);
forceStopTask(datafeedTask.getId(), listener);
} else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found.";
@ -271,7 +270,7 @@ public class StopDatafeedAction
});
}
private void forceStopTask(Client client, long taskId, ActionListener<Response> listener) {
private void forceStopTask(long taskId, ActionListener<Response> listener) {
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId);
client.execute(RemovePersistentTaskAction.INSTANCE, request,
@ -293,7 +292,7 @@ public class StopDatafeedAction
@Override
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
task.stop("stop_datafeed_api", request.getTimeout());
task.stop("stop_datafeed (api)", request.getTimeout());
listener.onResponse(new Response(true));
}
@ -309,9 +308,9 @@ public class StopDatafeedAction
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
}
PersistentTask<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (task == null || task.getStatus() != DatafeedState.STARTED) {
throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]",
RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED);
if (task == null) {
throw ExceptionsHelper.conflictStatusException("Cannot stop datafeed [" + datafeedId +
"] because it has already been stopped");
}
return task.getExecutorNode();
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -20,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -65,8 +63,8 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
String message = "Cannot perform requested action because job [" + jobId + "] hasn't been opened";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
} else {
request.setNodes(jobTask.getExecutorNode());
super.doExecute(task, request, listener);
@ -79,15 +77,15 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener);
innerTaskOperation(request, task, listener, state);
} else {
logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion());
listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot perform requested action because job [" +
request.getJobId() + "] hasn't been opened"));
}
}
protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener<Response> listener);
protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener<Response> listener, ClusterState state);
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -64,6 +65,7 @@ public class UpdateProcessAction extends
private boolean isUpdated;
private Response() {
super(null, null);
this.isUpdated = true;
}
@ -196,7 +198,7 @@ public class UpdateProcessAction extends
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener, ClusterState state) {
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> {
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
@ -17,7 +16,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -34,10 +32,9 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import java.time.Duration;
import java.util.Collections;
@ -80,16 +77,8 @@ public class DatafeedJobRunner extends AbstractComponent {
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
String datafeedId = task.getDatafeedId();
ClusterState state = clusterService.state();
// CS on master node can be ahead on the node where job and datafeed tasks run,
// so check again and fail if in case of unexpected cs. Persist tasks will retry later then.
Assignment assignment = StartDatafeedAction.selectNode(logger, datafeedId, state);
if (assignment.getExecutorNode() == null) {
handler.accept(new ElasticsearchStatusException("cannot start datafeed [{}] yet, local cs [{}], allocation explanation [{}]",
RestStatus.CONFLICT, datafeedId, state.getVersion(), assignment.getExplanation()));
return;
}
logger.info("Attempt to start datafeed based on cluster state version [{}]", state.getVersion());
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> {

View File

@ -35,11 +35,9 @@ import java.util.concurrent.TimeUnit;
/**
* This class represents a configured and created Job. The creation time is set
* to the time the object was constructed, state is set to
* {@link JobState#OPENING} and the finished time and last data time fields are
* {@code null} until the job has seen some data or it is finished respectively.
* If the job was created to read data from a list of files FileUrls will be a
* non-empty list else the expects data to be streamed to it.
* to the time the object was constructed and the finished time and last
* data time fields are {@code null} until the job has seen some data or it is
* finished respectively.
*/
public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent {

View File

@ -26,7 +26,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
*/
public enum JobState implements Task.Status {
CLOSING, CLOSED, OPENING, OPENED, FAILED;
CLOSED, OPENED, FAILED;
public static final String NAME = "JobState";

View File

@ -31,7 +31,7 @@ public class RestCloseJobAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request(restRequest.param(Job.ID.getPreferredName()));
if (restRequest.hasParam(Request.TIMEOUT.getPreferredName())) {
request.setTimeout(TimeValue.parseTimeValue(
request.setCloseTimeout(TimeValue.parseTimeValue(
restRequest.param(Request.TIMEOUT.getPreferredName()), Request.TIMEOUT.getPreferredName()));
}
if (restRequest.hasParam(Request.FORCE.getPreferredName())) {

View File

@ -353,7 +353,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* to a new executor node and the status hasn't been updated then the task status is stale.
*/
public boolean isCurrentStatus() {
return allocationIdOnLastStatusUpdate == allocationId;
return allocationIdOnLastStatusUpdate != null && allocationIdOnLastStatusUpdate == allocationId;
}
@Override

View File

@ -382,7 +382,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// expected to because datafeeds is automatically stopped in case of invalid license,
// a license error should not be returned
Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet);
assertEquals("datafeed already stopped, expected datafeed state [started], but got [stopped]", e.getMessage());
assertEquals("Cannot stop datafeed [foobar] because it has already been stopped", e.getMessage());
} else {
listener.actionGet();
}
@ -421,7 +421,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
if (invalidLicense) {
// so the license expired then job closes automatically, so an error is expected:
Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet);
assertEquals("cannot close job [foo], expected job state [opened], but got [closed]", e.getMessage());
assertEquals("Cannot perform requested action because job [foo] hasn't been opened", e.getMessage());
} else {
listener.actionGet();
}

View File

@ -5,10 +5,27 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction.Request;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
@ -18,6 +35,9 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setForce(randomBoolean());
}
return request;
}
@ -30,4 +50,50 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
protected Request parseInstance(XContentParser parser) {
return Request.parseRequest(null, parser);
}
public void testValidate() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id",
Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask(1L, "job_id", null, JobState.OPENED);
tasks.put(1L, jobTask);
tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(1L, tasks))).build();
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class,
() -> CloseJobAction.validate("job_id", cs1));
assertEquals(RestStatus.CONFLICT, e.status());
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>();
tasks.put(1L, jobTask);
if (randomBoolean()) {
tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED));
}
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(1L, tasks))).build();
CloseJobAction.validate("job_id", cs2);
}
public static PersistentTask<StartDatafeedAction.Request> createTask(long id,
String datafeedId,
long startTime,
String nodeId,
DatafeedState state) {
PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME,
new StartDatafeedAction.Request(datafeedId, startTime),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, state);
return task;
}
}

View File

@ -1,113 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
public class CloseJobActionTests extends ESTestCase {
public void testMoveJobToClosingState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task =
createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task))));
ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build());
PersistentTasksCustomMetaData actualTasks = result.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus());
MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE);
assertNotNull(actualMetadata.getJobs().get("job_id").getFinishedTime());
}
public void testMoveJobToClosingState_jobMissing() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap())));
expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()));
}
public void testMoveJobToClosingState_unexpectedJobState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", null, JobState.OPENING);
ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task))));
ElasticsearchStatusException result =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build()));
assertEquals("cannot close job [job_id], expected job state [opened], but got [opening]", result.getMessage());
ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap())));
result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build()));
assertEquals("cannot close job [job_id], expected job state [opened], but got [closed]", result.getMessage());
}
public void testCloseJob_datafeedNotStopped() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask(1L, "job_id", null, JobState.OPENED);
tasks.put(1L, jobTask);
tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build();
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndFindTask("job_id", cs1));
assertEquals(RestStatus.CONFLICT, e.status());
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>();
tasks.put(1L, jobTask);
if (randomBoolean()) {
tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED));
}
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build();
assertEquals(jobTask, CloseJobAction.validateAndFindTask("job_id", cs2));
}
public static PersistentTask<StartDatafeedAction.Request> createDatafeedTask(long id, String datafeedId, long startTime,
String nodeId, DatafeedState datafeedState) {
PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, datafeedState);
return task;
}
}

View File

@ -51,24 +51,20 @@ public class OpenJobActionTests extends ESTestCase {
public void testValidate() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name", "_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTask<OpenJobAction.Request> task =
createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
createJobTask(1L, "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), tasks);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()));
OpenJobAction.validate("job_id", mlBuilder.build(), null);
}
public void testValidate_jobMissing() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id1").build(), false);
expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build(), null, null));
expectThrows(ResourceNotFoundException.class, () -> OpenJobAction.validate("job_id2", mlBuilder.build(), null));
}
public void testValidate_jobMarkedAsDeleted() {
@ -77,33 +73,20 @@ public class OpenJobActionTests extends ESTestCase {
jobBuilder.setDeleted(true);
mlBuilder.putJob(jobBuilder.build(), false);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), null, null));
() -> OpenJobAction.validate("job_id", mlBuilder.build(), null));
assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage());
}
public void testValidate_unexpectedState() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name", "_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING);
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", jobState);
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", JobState.OPENED);
PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes));
assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage());
jobState = randomFrom(JobState.OPENING, JobState.CLOSING);
task = createJobTask(1L, "job_id", "_other_node_id", jobState);
PersistentTasksCustomMetaData tasks2 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks2, nodes));
assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage());
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1));
assertEquals("Cannot open job [job_id] because it has already been opened", e.getMessage());
}
public void testSelectLeastLoadedMlNode() {
@ -208,11 +191,11 @@ public class OpenJobActionTests extends ESTestCase {
.build();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", null));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", null));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", null));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", null));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", null));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
@ -228,7 +211,7 @@ public class OpenJobActionTests extends ESTestCase {
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode());
PersistentTask<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
PersistentTask<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", null);
taskMap.put(5L, lastTask);
tasks = new PersistentTasksCustomMetaData(6L, taskMap);
@ -275,7 +258,6 @@ public class OpenJobActionTests extends ESTestCase {
metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());
MlMetadata mlMetadata = cs.metaData().custom(MlMetadata.TYPE);
String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(cs, "job_id"));
if (randomBoolean()) {
routingTable.remove(indexToRemove);
@ -299,7 +281,9 @@ public class OpenJobActionTests extends ESTestCase {
public static PersistentTask<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, jobState);
if (jobState != null) {
task = new PersistentTask<>(task, jobState);
}
return task;
}

View File

@ -46,7 +46,7 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING);
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
@ -116,7 +116,7 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null, null));
() -> StartDatafeedAction.validate("some-datafeed", mlMetadata1, null));
assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists"));
}
@ -133,8 +133,8 @@ public class StartDatafeedActionTests extends ESTestCase {
.putDatafeed(datafeedConfig1)
.build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks, null));
assertThat(e.getMessage(), equalTo("cannot start datafeed, expected job state [opened], but got [closed]"));
() -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks));
assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] hasn't been opened"));
}
public void testValidate_dataFeedAlreadyStarted() {
@ -144,10 +144,6 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.putDatafeed(datafeedConfig)
.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTask<StartDatafeedAction.Request> datafeedTask =
@ -160,9 +156,8 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes));
assertThat(e.getMessage(), equalTo("datafeed [datafeed_id] already started, expected datafeed state [stopped], " +
"but got [started]"));
() -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks));
assertThat(e.getMessage(), equalTo("cannot start datafeed [datafeed_id] because it has already been started"));
}
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,

View File

@ -69,8 +69,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
PersistentTasksCustomMetaData tasks;
if (randomBoolean()) {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED);
new StartDatafeedAction.Request("foo2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
} else {
tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap());
@ -84,7 +83,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
.build();
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
assertThat(e.getMessage(), equalTo("datafeed already stopped, expected datafeed state [started], but got [stopped]"));
assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped"));
}
}

View File

@ -226,7 +226,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> {
return node.getId().equals(task.getExecutorNode()) &&
(task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false);
(task.getStatus() == null || task.isCurrentStatus() == false);
});
int count = foundTasks.size();
if (count > maxConcurrentJobAllocations) {

View File

@ -373,7 +373,7 @@ public class DatafeedJobIT extends ESRestTestCase {
() -> client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete job [" + jobId + "] while datafeed [" + datafeedId
assertThat(responseEntityToString(response), containsString("Cannot delete job [" + jobId + "] because datafeed [" + datafeedId
+ "] refers to it"));
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");

View File

@ -52,7 +52,7 @@ public class MlRestTestStateCleaner {
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
}
} catch (Exception e) {
if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
} else {
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
@ -84,15 +84,14 @@ public class MlRestTestStateCleaner {
logger.error("Got status code " + statusCode + " when closing job " + jobId);
}
} catch (Exception e1) {
if (e1.getMessage().contains("expected job state [opened], but got [closed]")
|| e1.getMessage().contains("expected job state [opened], but got [closing]")) {
if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) {
logger.debug("job [" + jobId + "] has already been closed", e1);
} else {
logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e1);
logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1);
try {
adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true");
} catch (Exception e2) {
throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2);
logger.warn("Force-closing job [" + jobId + "] failed", e2);
}
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
}

View File

@ -11,42 +11,32 @@ public class JobStateTests extends ESTestCase {
public void testFromString() {
assertEquals(JobState.fromString("closed"), JobState.CLOSED);
assertEquals(JobState.fromString("closing"), JobState.CLOSING);
assertEquals(JobState.fromString("failed"), JobState.FAILED);
assertEquals(JobState.fromString("opening"), JobState.OPENING);
assertEquals(JobState.fromString("opened"), JobState.OPENED);
assertEquals(JobState.fromString("CLOSED"), JobState.CLOSED);
assertEquals(JobState.fromString("CLOSING"), JobState.CLOSING);
assertEquals(JobState.fromString("FAILED"), JobState.FAILED);
assertEquals(JobState.fromString("OPENING"), JobState.OPENING);
assertEquals(JobState.fromString("OPENED"), JobState.OPENED);
}
public void testToString() {
assertEquals("closed", JobState.CLOSED.toString());
assertEquals("closing", JobState.CLOSING.toString());
assertEquals("failed", JobState.FAILED.toString());
assertEquals("opening", JobState.OPENING.toString());
assertEquals("opened", JobState.OPENED.toString());
}
public void testValidOrdinals() {
assertEquals(0, JobState.CLOSING.ordinal());
assertEquals(1, JobState.CLOSED.ordinal());
assertEquals(2, JobState.OPENING.ordinal());
assertEquals(3, JobState.OPENED.ordinal());
assertEquals(4, JobState.FAILED.ordinal());
assertEquals(0, JobState.CLOSED.ordinal());
assertEquals(1, JobState.OPENED.ordinal());
assertEquals(2, JobState.FAILED.ordinal());
}
public void testIsAnyOf() {
assertFalse(JobState.OPENED.isAnyOf());
assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.CLOSING, JobState.FAILED,
JobState.OPENING));
assertFalse(JobState.CLOSED.isAnyOf(JobState.CLOSING, JobState.FAILED, JobState.OPENING, JobState.OPENED));
assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.FAILED));
assertFalse(JobState.CLOSED.isAnyOf(JobState.FAILED, JobState.OPENED));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED));
assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED));
assertTrue(JobState.CLOSING.isAnyOf(JobState.CLOSED, JobState.CLOSING));
assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED, JobState.CLOSING));
assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED));
}
}

View File

@ -252,7 +252,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get();
assertTrue(stopResponse.isStopped());
} catch (ExecutionException e1) {
if (e1.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
if (e1.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e1);
} else {
try {
@ -261,7 +261,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get();
assertTrue(stopResponse.isStopped());
} catch (Exception e2) {
throw new RuntimeException("Force-stopping datafeed [" + datafeedId + "] failed.", e2);
logger.warn("Force-stopping datafeed [" + datafeedId + "] failed.", e2);
}
throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?", e1);
}
@ -293,8 +293,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
client.execute(CloseJobAction.INSTANCE, closeRequest).get();
assertTrue(response.isClosed());
} catch (Exception e1) {
if (e1.getMessage().contains("expected job state [opened], but got [closed]")
|| e1.getMessage().contains("expected job state [opened], but got [closing]")) {
if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) {
logger.debug("job [" + jobId + "] has already been closed", e1);
} else {
try {
@ -305,7 +304,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
client.execute(CloseJobAction.INSTANCE, closeRequest).get();
assertTrue(response.isClosed());
} catch (Exception e2) {
throw new RuntimeException("Force-closing datafeed [" + jobId + "] failed.", e2);
logger.warn("Force-closing datafeed [" + jobId + "] failed.", e2);
}
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
}

View File

@ -142,3 +142,4 @@ cluster:admin/persistent/start
cluster:admin/persistent/completion
cluster:admin/persistent/update_status
cluster:admin/persistent/remove
cluster:internal/ml/job/finalize_job_execution

View File

@ -292,7 +292,7 @@
- match: { datafeed_id: "test-datafeed-1" }
- do:
catch: /Cannot delete job \[datafeed-job\] while datafeed \[test-datafeed-1\] refers to it/
catch: /Cannot delete job \[datafeed-job\] because datafeed \[test-datafeed-1\] refers to it/
xpack.ml.delete_job:
job_id: datafeed-job
---

View File

@ -122,7 +122,7 @@ setup:
datafeed_id: "datafeed-1"
start: 0
- do:
catch: /cannot start datafeed, expected job state \[opened\], but got \[closed\]/
catch: /cannot start datafeed \[datafeed-1\] because job \[datafeed-job\] hasn't been opened/
xpack.ml.start_datafeed:
datafeed_id: "datafeed-1"
start: 0
@ -143,7 +143,7 @@ setup:
start: 0
- do:
catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/
catch: /cannot start datafeed \[datafeed-1\] because it has already been started/
xpack.ml.start_datafeed:
datafeed_id: "datafeed-1"
start: 0
@ -162,7 +162,7 @@ setup:
xpack.ml.stop_datafeed:
datafeed_id: "datafeed-1"
- do:
catch: /datafeed already stopped, expected datafeed state \[started\], but got \[stopped\]/
catch: /Cannot stop datafeed \[datafeed-1\] because it has already been stopped/
xpack.ml.stop_datafeed:
datafeed_id: "datafeed-1"

View File

@ -52,7 +52,7 @@ public class MlRestTestStateCleaner {
logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId);
}
} catch (Exception e) {
if (e.getMessage().contains("datafeed already stopped, expected datafeed state [started], but got [stopped]")) {
if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) {
logger.debug("failed to stop datafeed [" + datafeedId + "]", e);
} else {
logger.warn("failed to stop datafeed [" + datafeedId + "]", e);
@ -84,15 +84,14 @@ public class MlRestTestStateCleaner {
logger.error("Got status code " + statusCode + " when closing job " + jobId);
}
} catch (Exception e1) {
if (e1.getMessage().contains("expected job state [opened], but got [closed]")
|| e1.getMessage().contains("expected job state [opened], but got [closing]")) {
if (e1.getMessage().contains("because job [" + jobId + "] hasn't been opened")) {
logger.debug("job [" + jobId + "] has already been closed", e1);
} else {
logger.warn("failed to close job [" + jobId + "]. Forcing closed.", e1);
logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1);
try {
adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true");
} catch (Exception e2) {
throw new RuntimeException("Force-closing job [" + jobId + "] failed.", e2);
logger.warn("Force-closing job [" + jobId + "] failed", e2);
}
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
}