[ML] implement _all for closing jobs (elastic/x-pack-elasticsearch#962)
Add a '_all' functionality for closing ML jobs. For cluster shutdown due to maintenance and major upgrades we recommend the user to stop all datafeeds and jobs. This change add the ability to close all jobs at once where previously it was required to iterate over all jobs and do a explicit close. This is part one of elastic/x-pack-elasticsearch#795, part two can be found in elastic/x-pack-elasticsearch#995. relates elastic/x-pack-elasticsearch#795 Original commit: elastic/x-pack-elasticsearch@9b251ed7e1
This commit is contained in:
parent
b41288592c
commit
c7fd1aacff
|
@ -5,15 +5,19 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -23,11 +27,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -36,18 +42,27 @@ import org.elasticsearch.xpack.ml.MlMetadata;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobAction.Response, CloseJobAction.RequestBuilder> {
|
||||
|
||||
|
@ -245,19 +260,34 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
/*
|
||||
* Closing of multiple jobs:
|
||||
*
|
||||
* 1. Resolve and validate jobs first: if any job does not meet the
|
||||
* criteria (e.g. open datafeed), fail immediately, do not close any
|
||||
* job
|
||||
*
|
||||
* 2. Internally a task request is created for every job, so there
|
||||
* are n inner tasks for 1 user request
|
||||
*
|
||||
* 3. Collect n inner task results or failures and send 1 outer
|
||||
* result/failure
|
||||
*/
|
||||
|
||||
ClusterState currentState = clusterService.state();
|
||||
if (request.isForce()) {
|
||||
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
String jobId = request.getJobId();
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
|
||||
if (jobTask == null) {
|
||||
throw new ElasticsearchStatusException("cannot force close job, because job [" + jobId + "] is not open",
|
||||
RestStatus.CONFLICT);
|
||||
List<String> resolvedJobs = resolveAndValidateJobId(request.getJobId(), currentState);
|
||||
|
||||
if (resolvedJobs.isEmpty()) {
|
||||
listener.onResponse(new Response(true));
|
||||
return;
|
||||
}
|
||||
forceCloseJob(jobTask.getId(), jobId, listener);
|
||||
|
||||
request.setResolvedJobIds(resolvedJobs.toArray(new String[0]));
|
||||
|
||||
if (request.isForce()) {
|
||||
forceCloseJob(currentState, request, listener);
|
||||
} else {
|
||||
PersistentTask<?> jobTask = validateAndReturnJobTask(request.getJobId(), currentState);
|
||||
normalCloseJob(task, jobTask.getId(), request, listener);
|
||||
normalCloseJob(currentState, task, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,46 +298,132 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(Request request, List<Response> tasks,
|
||||
List<TaskOperationFailure> taskOperationFailures,
|
||||
List<FailedNodeException> failedNodeExceptions) {
|
||||
|
||||
// number of resolved jobs should be equal to the number of tasks,
|
||||
// otherwise something went wrong
|
||||
if (request.getResolvedJobIds().length != tasks.size()) {
|
||||
if (taskOperationFailures.isEmpty() == false) {
|
||||
throw org.elasticsearch.ExceptionsHelper
|
||||
.convertToElastic(taskOperationFailures.get(0).getCause());
|
||||
} else if (failedNodeExceptions.isEmpty() == false) {
|
||||
throw org.elasticsearch.ExceptionsHelper
|
||||
.convertToElastic(failedNodeExceptions.get(0));
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
"Expected [" + request.getResolvedJobIds().length
|
||||
+ "] number of tasks but " + "got [" + tasks.size() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
return new Response(tasks.stream().allMatch(Response::isClosed));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response readTaskResponse(StreamInput in) throws IOException {
|
||||
return new Response(in);
|
||||
}
|
||||
|
||||
private void forceCloseJob(long persistentTaskId, String jobId, ActionListener<Response> listener) {
|
||||
private void forceCloseJob(ClusterState currentState, Request request,
|
||||
ActionListener<Response> listener) {
|
||||
|
||||
String[] jobIds = request.getResolvedJobIds();
|
||||
final int numberOfJobs = jobIds.length;
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
final AtomicArray<Exception> failures = new AtomicArray<>(jobIds.length);
|
||||
|
||||
for (String jobId : jobIds) {
|
||||
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
|
||||
persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener<PersistentTask<?>>() {
|
||||
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
|
||||
persistentTasksService.cancelPersistentTask(jobTask.getId(),
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
listener.onResponse(new Response(true));
|
||||
if (counter.incrementAndGet() == numberOfJobs) {
|
||||
sendResponseOrFailure(request.getJobId(), listener, failures);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final int slot = counter.incrementAndGet();
|
||||
failures.set(slot - 1, e);
|
||||
if (slot == numberOfJobs) {
|
||||
sendResponseOrFailure(request.getJobId(), listener, failures);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResponseOrFailure(String jobId,
|
||||
ActionListener<Response> listener,
|
||||
AtomicArray<Exception> failures) {
|
||||
List<Exception> catchedExceptions = failures.asList();
|
||||
if (catchedExceptions.size() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
return;
|
||||
}
|
||||
|
||||
String msg = "Failed to force close job [" + jobId + "] with ["
|
||||
+ catchedExceptions.size()
|
||||
+ "] failures, rethrowing last, all Exceptions: ["
|
||||
+ catchedExceptions.stream().map(Exception::getMessage)
|
||||
.collect(Collectors.joining(", "))
|
||||
+ "]";
|
||||
|
||||
ElasticsearchException e = new ElasticsearchException(msg,
|
||||
catchedExceptions.get(0));
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void normalCloseJob(ClusterState currentState, Task task, Request request,
|
||||
ActionListener<Response> listener) {
|
||||
Map<String, Long> jobIdToPersistentTaskId = new HashMap<>();
|
||||
|
||||
for (String jobId : request.getResolvedJobIds()) {
|
||||
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
|
||||
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
|
||||
jobIdToPersistentTaskId.put(jobId, jobTask.getId());
|
||||
}
|
||||
|
||||
private void normalCloseJob(Task task, long persistentTaskId, Request request, ActionListener<Response> listener) {
|
||||
auditor.info(request.getJobId(), Messages.JOB_AUDIT_CLOSING);
|
||||
ActionListener<Response> finalListener =
|
||||
ActionListener.wrap(r -> waitForJobClosed(persistentTaskId, request, r, listener), listener::onFailure);
|
||||
ActionListener.wrap(
|
||||
r -> waitForJobClosed(request, jobIdToPersistentTaskId,
|
||||
r, listener),
|
||||
listener::onFailure);
|
||||
super.doExecute(task, request, finalListener);
|
||||
}
|
||||
|
||||
// Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed
|
||||
// This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state,
|
||||
// so wait for that to happen here.
|
||||
void waitForJobClosed(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
|
||||
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener<OpenJobAction.Request>() {
|
||||
void waitForJobClosed(Request request, Map<String, Long> jobIdToPersistentTaskId, Response response,
|
||||
ActionListener<Response> listener) {
|
||||
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService,
|
||||
request.timeout, logger, threadPool.getThreadContext());
|
||||
waitForPersistentTaskStatus(stateObserver, persistentTasksCustomMetaData -> {
|
||||
for (Map.Entry<String, Long> entry : jobIdToPersistentTaskId.entrySet()) {
|
||||
long persistentTaskId = entry.getValue();
|
||||
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<OpenJobAction.Request> task) {
|
||||
logger.debug("finalizing job [{}]", request.getJobId());
|
||||
FinalizeJobExecutionAction.Request finalizeRequest =
|
||||
new FinalizeJobExecutionAction.Request(request.getJobId());
|
||||
public void onResponse(Boolean result) {
|
||||
Set<String> jobIds = jobIdToPersistentTaskId.keySet();
|
||||
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
|
||||
jobIds.toArray(new String[jobIds.size()]));
|
||||
client.execute(FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
|
||||
ActionListener.wrap(r-> listener.onResponse(response), listener::onFailure));
|
||||
new ActionListener<FinalizeJobExecutionAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(FinalizeJobExecutionAction.Response r) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -317,6 +433,81 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void waitForPersistentTaskStatus(ClusterStateObserver stateObserver,
|
||||
Predicate<PersistentTasksCustomMetaData> predicate,
|
||||
ActionListener<Boolean> listener) {
|
||||
if (predicate.test(stateObserver.setAndGetObservedState().metaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE))) {
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
listener.onResponse(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new IllegalStateException("timed out after " + timeout));
|
||||
}
|
||||
}, clusterState -> predicate
|
||||
.test(clusterState.metaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static List<String> resolveAndValidateJobId(String jobId, ClusterState state) {
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
|
||||
if (!Job.ALL.equals(jobId)) {
|
||||
validateAndReturnJobTask(jobId, state);
|
||||
return Collections.singletonList(jobId);
|
||||
}
|
||||
|
||||
if (mlMetadata.getJobs().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<String> matchedJobs = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, Job> jobEntry : mlMetadata.getJobs().entrySet()) {
|
||||
String resolvedJobId = jobEntry.getKey();
|
||||
Job job = jobEntry.getValue();
|
||||
|
||||
if (job.isDeleted()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
|
||||
if (jobTask == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (MlMetadata.getJobState(resolvedJobId, tasks) == JobState.CLOSED) {
|
||||
continue;
|
||||
}
|
||||
|
||||
validateAndReturnJobTask(resolvedJobId, state);
|
||||
|
||||
matchedJobs.add(resolvedJobId);
|
||||
}
|
||||
|
||||
return matchedJobs;
|
||||
}
|
||||
|
||||
static PersistentTask<?> validateAndReturnJobTask(String jobId, ClusterState state) {
|
||||
|
@ -326,7 +517,8 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
throw new ResourceNotFoundException("cannot close job, because job [" + jobId + "] does not exist");
|
||||
}
|
||||
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
|
||||
if (jobTask == null) {
|
||||
throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT);
|
||||
|
@ -334,10 +526,12 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
|
||||
Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
|
||||
if (datafeed.isPresent()) {
|
||||
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
|
||||
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(),
|
||||
tasks);
|
||||
if (datafeedState != DatafeedState.STOPPED) {
|
||||
throw new ElasticsearchStatusException("cannot close job [{}], datafeed hasn't been stopped",
|
||||
RestStatus.CONFLICT, jobId);
|
||||
throw new ElasticsearchStatusException(
|
||||
"cannot close job [{}], datafeed hasn't been stopped", RestStatus.CONFLICT,
|
||||
jobId);
|
||||
}
|
||||
}
|
||||
return jobTask;
|
||||
|
|
|
@ -55,29 +55,29 @@ public class FinalizeJobExecutionAction extends Action<FinalizeJobExecutionActio
|
|||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
||||
private String jobId;
|
||||
private String[] jobIds;
|
||||
|
||||
public Request(String jobId) {
|
||||
this.jobId = jobId;
|
||||
public Request(String[] jobIds) {
|
||||
this.jobIds = jobIds;
|
||||
}
|
||||
|
||||
Request() {
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
public String[] getJobIds() {
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
jobId = in.readString();
|
||||
jobIds = in.readStringArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(jobId);
|
||||
out.writeStringArray(jobIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -140,18 +140,21 @@ public class FinalizeJobExecutionAction extends Action<FinalizeJobExecutionActio
|
|||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state,
|
||||
ActionListener<Response> listener) throws Exception {
|
||||
String jobId = request.getJobId();
|
||||
String source = "finalize_job_execution [" + jobId + "]";
|
||||
logger.debug("finalizing job [{}]", request.getJobId());
|
||||
String jobIdString = String.join(",", request.getJobIds());
|
||||
String source = "finalize_job_execution [" + jobIdString + "]";
|
||||
logger.debug("finalizing jobs [{}]", jobIdString);
|
||||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
|
||||
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
|
||||
jobBuilder.setFinishedTime(new Date());
|
||||
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
|
||||
mlMetadataBuilder.putJob(jobBuilder.build(), true);
|
||||
Date finishedTime = new Date();
|
||||
|
||||
for (String jobId : request.getJobIds()) {
|
||||
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
|
||||
jobBuilder.setFinishedTime(finishedTime);
|
||||
mlMetadataBuilder.putJob(jobBuilder.build(), true);
|
||||
}
|
||||
ClusterState.Builder builder = ClusterState.builder(currentState);
|
||||
return builder.metaData(new MetaData.Builder(currentState.metaData())
|
||||
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
|
||||
|
@ -166,7 +169,7 @@ public class FinalizeJobExecutionAction extends Action<FinalizeJobExecutionActio
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState,
|
||||
ClusterState newState) {
|
||||
logger.debug("finalized job [{}]", request.getJobId());
|
||||
logger.debug("finalized job [{}]", jobIdString);
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
});
|
||||
|
|
|
@ -31,7 +31,9 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -39,7 +41,7 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
// TODO: Hacking around here with TransportTasksAction. Ideally we should have another base class in core that
|
||||
// redirects to a single node only
|
||||
public abstract class TransportJobTaskAction<OperationTask extends Task, Request extends TransportJobTaskAction.JobTaskRequest<Request>,
|
||||
public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction.JobTask, Request extends TransportJobTaskAction.JobTaskRequest<Request>,
|
||||
Response extends BaseTasksResponse & Writeable> extends TransportTasksAction<OperationTask, Request, Response, Response> {
|
||||
|
||||
protected final AutodetectProcessManager processManager;
|
||||
|
@ -55,27 +57,36 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
String jobId = request.getJobId();
|
||||
ClusterState state = clusterService.state();
|
||||
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
|
||||
// node running the job task.
|
||||
ClusterState state = clusterService.state();
|
||||
JobManager.getJobOrThrowIfUnknown(state, jobId);
|
||||
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
|
||||
Set<String> executorNodes = new HashSet<>();
|
||||
|
||||
for (String resolvedJobId : request.getResolvedJobIds()) {
|
||||
JobManager.getJobOrThrowIfUnknown(state, resolvedJobId);
|
||||
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata
|
||||
.getJobTask(resolvedJobId, tasks);
|
||||
|
||||
if (jobTask == null || jobTask.isAssigned() == false) {
|
||||
String message = "Cannot perform requested action because job [" + jobId + "] is not open";
|
||||
String message = "Cannot perform requested action because job [" + resolvedJobId
|
||||
+ "] is not open";
|
||||
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
|
||||
} else {
|
||||
request.setNodes(jobTask.getExecutorNode());
|
||||
super.doExecute(task, request, listener);
|
||||
executorNodes.add(jobTask.getExecutorNode());
|
||||
}
|
||||
}
|
||||
|
||||
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks);
|
||||
JobState jobState = MlMetadata.getJobState(task.getJobId(), tasks);
|
||||
if (jobState == JobState.OPENED) {
|
||||
innerTaskOperation(request, task, listener, state);
|
||||
} else {
|
||||
|
@ -110,7 +121,8 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
|
|||
}
|
||||
} else {
|
||||
if (tasks.size() > 1) {
|
||||
throw new IllegalStateException("Expected one node level response, but got [" + tasks.size() + "]");
|
||||
throw new IllegalStateException(
|
||||
"Expected one node level response, but got [" + tasks.size() + "]");
|
||||
}
|
||||
return tasks.get(0);
|
||||
}
|
||||
|
@ -124,33 +136,53 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
|
|||
public static class JobTaskRequest<R extends JobTaskRequest<R>> extends BaseTasksRequest<R> {
|
||||
|
||||
String jobId;
|
||||
String[] resolvedJobIds;
|
||||
|
||||
JobTaskRequest() {
|
||||
}
|
||||
|
||||
JobTaskRequest(String jobId) {
|
||||
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
|
||||
|
||||
// the default implementation just returns 1 jobId
|
||||
this.resolvedJobIds = new String[] { jobId };
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
protected String[] getResolvedJobIds() {
|
||||
return resolvedJobIds;
|
||||
}
|
||||
|
||||
protected void setResolvedJobIds(String[] resolvedJobIds) {
|
||||
this.resolvedJobIds = resolvedJobIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
jobId = in.readString();
|
||||
resolvedJobIds = in.readStringArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(jobId);
|
||||
out.writeStringArray(resolvedJobIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean match(Task task) {
|
||||
return OpenJobAction.JobTask.match(task, jobId);
|
||||
for (String id : resolvedJobIds) {
|
||||
if (OpenJobAction.JobTask.match(task, id)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
|
@ -56,6 +58,6 @@ public final class MlStrings {
|
|||
}
|
||||
|
||||
public static boolean isValidId(String id) {
|
||||
return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches();
|
||||
return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches() && !Job.ALL.equals(id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
@ -84,6 +85,43 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
|
|||
CloseJobAction.validateAndReturnJobTask("job_id", cs2);
|
||||
}
|
||||
|
||||
public void testResolve() {
|
||||
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
|
||||
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()),
|
||||
false);
|
||||
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_1", "job_id_1",
|
||||
Collections.singletonList("*")));
|
||||
|
||||
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()),
|
||||
false);
|
||||
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_2", "job_id_2",
|
||||
Collections.singletonList("*")));
|
||||
|
||||
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()),
|
||||
false);
|
||||
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3",
|
||||
Collections.singletonList("*")));
|
||||
|
||||
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
|
||||
PersistentTask<?> jobTask = createJobTask(1L, "job_id_1", null, JobState.OPENED);
|
||||
tasks.put(1L, jobTask);
|
||||
|
||||
jobTask = createJobTask(2L, "job_id_2", null, JobState.CLOSED);
|
||||
tasks.put(2L, jobTask);
|
||||
|
||||
jobTask = createJobTask(3L, "job_id_3", null, JobState.FAILED);
|
||||
tasks.put(3L, jobTask);
|
||||
|
||||
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||
new PersistentTasksCustomMetaData(1L, tasks)))
|
||||
.build();
|
||||
|
||||
assertEquals(Arrays.asList("job_id_1", "job_id_3"),
|
||||
CloseJobAction.resolveAndValidateJobId("_all", cs1));
|
||||
}
|
||||
|
||||
public static PersistentTask<StartDatafeedAction.Request> createTask(long id,
|
||||
String datafeedId,
|
||||
long startTime,
|
||||
|
|
|
@ -15,6 +15,13 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* NOTE: a copy if this file resides in :x-pack-elasticsearch:qa:smoke-test-ml-with-security
|
||||
*
|
||||
* Therefore any changes here, have to be transfered there, too.
|
||||
* (Or eventually fix it by introducing a common test infrastructure package)
|
||||
*/
|
||||
|
||||
public class MlRestTestStateCleaner {
|
||||
|
||||
private final Logger logger;
|
||||
|
@ -75,27 +82,27 @@ public class MlRestTestStateCleaner {
|
|||
return;
|
||||
}
|
||||
|
||||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
try {
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode();
|
||||
int statusCode = adminClient
|
||||
.performRequest("POST", "/_xpack/ml/anomaly_detectors/_all/_close")
|
||||
.getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when closing job " + jobId);
|
||||
logger.error("Got status code " + statusCode + " when closing all jobs");
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
if (e1.getMessage().contains("because job [" + jobId + "] is not open")) {
|
||||
logger.debug("job [" + jobId + "] has already been closed", e1);
|
||||
} else {
|
||||
logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1);
|
||||
logger.warn("failed to close all jobs. Forcing closed", e1);
|
||||
try {
|
||||
adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true");
|
||||
adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/_all/_close?force=true");
|
||||
} catch (Exception e2) {
|
||||
logger.warn("Force-closing job [" + jobId + "] failed", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
|
||||
logger.warn("Force-closing all jobs failed", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?",
|
||||
e1);
|
||||
}
|
||||
|
||||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting job " + jobId);
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.junit.Before;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -290,32 +289,31 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
public static void deleteAllJobs(Logger logger, Client client) throws Exception {
|
||||
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
|
||||
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
|
||||
for (Map.Entry<String, Job> entry : mlMetadata.getJobs().entrySet()) {
|
||||
String jobId = entry.getKey();
|
||||
|
||||
try {
|
||||
CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId);
|
||||
CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL);
|
||||
closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L));
|
||||
logger.info("Closing job [{}]", jobId);
|
||||
CloseJobAction.Response response =
|
||||
client.execute(CloseJobAction.INSTANCE, closeRequest).get();
|
||||
logger.info("Closing jobs using [{}]", Job.ALL);
|
||||
CloseJobAction.Response response = client.execute(CloseJobAction.INSTANCE, closeRequest)
|
||||
.get();
|
||||
assertTrue(response.isClosed());
|
||||
} catch (Exception e1) {
|
||||
if (e1.getMessage().contains("because job [" + jobId + "] is not open")) {
|
||||
logger.debug("job [" + jobId + "] has already been closed", e1);
|
||||
} else {
|
||||
try {
|
||||
CloseJobAction.Request closeRequest = new CloseJobAction.Request(jobId);
|
||||
CloseJobAction.Request closeRequest = new CloseJobAction.Request(Job.ALL);
|
||||
closeRequest.setForce(true);
|
||||
closeRequest.setCloseTimeout(TimeValue.timeValueSeconds(20L));
|
||||
CloseJobAction.Response response =
|
||||
client.execute(CloseJobAction.INSTANCE, closeRequest).get();
|
||||
assertTrue(response.isClosed());
|
||||
} catch (Exception e2) {
|
||||
logger.warn("Force-closing datafeed [" + jobId + "] failed.", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
|
||||
logger.warn("Force-closing jobs failed.", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing job, something went wrong?",
|
||||
e1);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Job> entry : mlMetadata.getJobs().entrySet()) {
|
||||
String jobId = entry.getKey();
|
||||
assertBusy(() -> {
|
||||
GetJobsStatsAction.Response statsResponse =
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet();
|
||||
|
|
|
@ -29,5 +29,6 @@ public class MlStringsTests extends ESTestCase {
|
|||
assertThat(MlStrings.isValidId("_-.a1"), is(false));
|
||||
assertThat(MlStrings.isValidId("A"), is(false));
|
||||
assertThat(MlStrings.isValidId("!afafd"), is(false));
|
||||
assertThat(MlStrings.isValidId("_all"), is(false));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,13 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* NOTE: a copy if this file resides in :x-pack-elasticsearch:plugin
|
||||
*
|
||||
* Therefore any changes here, have to be transfered there, too.
|
||||
* (Or eventually fix it by introducing a common test infrastructure package)
|
||||
*/
|
||||
|
||||
public class MlRestTestStateCleaner {
|
||||
|
||||
private final Logger logger;
|
||||
|
@ -75,27 +82,27 @@ public class MlRestTestStateCleaner {
|
|||
return;
|
||||
}
|
||||
|
||||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
try {
|
||||
int statusCode = adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/" + jobId + "/_close").getStatusLine().getStatusCode();
|
||||
int statusCode = adminClient
|
||||
.performRequest("POST", "/_xpack/ml/anomaly_detectors/_all/_close")
|
||||
.getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when closing job " + jobId);
|
||||
logger.error("Got status code " + statusCode + " when closing all jobs");
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
if (e1.getMessage().contains("because job [" + jobId + "] is not open")) {
|
||||
logger.debug("job [" + jobId + "] has already been closed", e1);
|
||||
} else {
|
||||
logger.warn("failed to close job [" + jobId + "]. Forcing closed", e1);
|
||||
logger.warn("failed to close all jobs. Forcing closed", e1);
|
||||
try {
|
||||
adminClient.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close?force=true");
|
||||
adminClient.performRequest("POST",
|
||||
"/_xpack/ml/anomaly_detectors/_all/_close?force=true");
|
||||
} catch (Exception e2) {
|
||||
logger.warn("Force-closing job [" + jobId + "] failed", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing job, something went wrong?", e1);
|
||||
logger.warn("Force-closing all jobs failed", e2);
|
||||
}
|
||||
throw new RuntimeException("Had to resort to force-closing jobs, something went wrong?",
|
||||
e1);
|
||||
}
|
||||
|
||||
for (Map<String, Object> jobConfig : jobConfigs) {
|
||||
String jobId = (String) jobConfig.get("job_id");
|
||||
int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId).getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
logger.error("Got status code " + statusCode + " when deleting job " + jobId);
|
||||
|
|
Loading…
Reference in New Issue