[ML] Use PersistentTasksService#waitForPersistentTaskStatus(...) to wait for job and datafeed status and
use PersistentTasksService#removeTask(...) to force close job and force stop datafeed. Original commit: elastic/x-pack-elasticsearch@4abcf99f93
This commit is contained in:
parent
5b66c7a7ba
commit
77dff92bef
|
@ -298,12 +298,12 @@ public class MachineLearning implements ActionPlugin {
|
|||
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
|
||||
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, xContentRegistry);
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient);
|
||||
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, internalClient, clusterService, jobProvider,
|
||||
System::currentTimeMillis, auditor);
|
||||
System::currentTimeMillis, auditor, persistentTasksService);
|
||||
InvalidLicenseEnforcer invalidLicenseEnforcer =
|
||||
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager);
|
||||
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
|
||||
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService,
|
||||
autodetectProcessManager, auditor),
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
|
@ -35,14 +36,14 @@ import org.elasticsearch.xpack.ml.MlMetadata;
|
|||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -228,32 +229,37 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
private final InternalClient client;
|
||||
private final ClusterService clusterService;
|
||||
private final Auditor auditor;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService, AutodetectProcessManager manager, InternalClient client,
|
||||
Auditor auditor) {
|
||||
Auditor auditor, PersistentTasksService persistentTasksService) {
|
||||
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT, manager);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.auditor = auditor;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
ClusterState currentState = clusterService.state();
|
||||
if (request.isForce()) {
|
||||
forceCloseJob(request.getJobId(), listener);
|
||||
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(request.getJobId(), tasks);
|
||||
forceCloseJob(jobTask.getId(), request.getJobId(), listener);
|
||||
} else {
|
||||
normalCloseJob(task, request, listener);
|
||||
PersistentTask<?> jobTask = validateAndReturnJobTask(request.getJobId(), currentState);
|
||||
normalCloseJob(task, jobTask.getId(), request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener,
|
||||
ClusterState state) {
|
||||
validate(request.getJobId(), state);
|
||||
task.closeJob("close job (api)");
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
@ -263,55 +269,65 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
return new Response(in);
|
||||
}
|
||||
|
||||
private void forceCloseJob(String jobId, ActionListener<Response> listener) {
|
||||
private void forceCloseJob(long persistentTaskId, String jobId, ActionListener<Response> listener) {
|
||||
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
|
||||
ClusterState currentState = clusterService.state();
|
||||
PersistentTask<?> task = MlMetadata.getJobTask(jobId,
|
||||
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
|
||||
if (task != null) {
|
||||
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(task.getId());
|
||||
client.execute(RemovePersistentTaskAction.INSTANCE, request,
|
||||
ActionListener.wrap(
|
||||
response -> listener.onResponse(new Response(response.isAcknowledged())),
|
||||
listener::onFailure));
|
||||
} else {
|
||||
String msg = "Requested job [" + jobId + "] be force-closed, but job's task" +
|
||||
"could not be found.";
|
||||
logger.warn(msg);
|
||||
listener.onFailure(new RuntimeException(msg));
|
||||
}
|
||||
persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void normalCloseJob(Task task, Request request, ActionListener<Response> listener) {
|
||||
private void normalCloseJob(Task task, long persistentTaskId, Request request, ActionListener<Response> listener) {
|
||||
auditor.info(request.getJobId(), Messages.JOB_AUDIT_CLOSING);
|
||||
ActionListener<Response> finalListener =
|
||||
ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure);
|
||||
ActionListener.wrap(r -> waitForJobClosed(persistentTaskId, request, r, listener), listener::onFailure);
|
||||
super.doExecute(task, request, finalListener);
|
||||
}
|
||||
|
||||
// Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed
|
||||
// This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state,
|
||||
// so wait for that to happen here.
|
||||
void waitForJobClosed(Request request, Response response, ActionListener<Response> listener) {
|
||||
JobStateObserver observer = new JobStateObserver(threadPool, clusterService);
|
||||
observer.waitForState(request.getJobId(), request.getCloseTimeout(), JobState.CLOSED, e -> {
|
||||
if (e != null) {
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
void waitForJobClosed(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
|
||||
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.debug("finalizing job [{}]", request.getJobId());
|
||||
FinalizeJobExecutionAction.Request finalizeRequest =
|
||||
new FinalizeJobExecutionAction.Request(request.getJobId());
|
||||
client.execute(FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
|
||||
ActionListener.wrap(r-> listener.onResponse(response), listener::onFailure));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void validate(String jobId, ClusterState state) {
|
||||
static PersistentTask<?> validateAndReturnJobTask(String jobId, ClusterState state) {
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
Job job = mlMetadata.getJobs().get(jobId);
|
||||
if (job == null) {
|
||||
throw new ResourceNotFoundException("cannot close job, because job [" + jobId + "] does not exist");
|
||||
}
|
||||
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
|
||||
if (jobTask == null) {
|
||||
throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT);
|
||||
}
|
||||
|
||||
Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
|
||||
if (datafeed.isPresent()) {
|
||||
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
|
||||
|
@ -320,6 +336,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
|
|||
RestStatus.CONFLICT, jobId);
|
||||
}
|
||||
}
|
||||
return jobTask;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,14 +49,15 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
|||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -64,6 +65,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
|
||||
|
||||
|
@ -290,18 +292,16 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
|
||||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final JobStateObserver observer;
|
||||
private final XPackLicenseState licenseState;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.licenseState = licenseState;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.observer = new JobStateObserver(threadPool, clusterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -310,7 +310,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
waitForJobStarted(request, listener);
|
||||
waitForJobStarted(taskId, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -324,15 +324,47 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
}
|
||||
}
|
||||
|
||||
void waitForJobStarted(Request request, ActionListener<Response> listener) {
|
||||
observer.waitForState(request.getJobId(), request.timeout, JobState.OPENED, e -> {
|
||||
if (e != null) {
|
||||
void waitForJobStarted(long taskId, Request request, ActionListener<Response> listener) {
|
||||
JobPredicate predicate = new JobPredicate();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
listener.onResponse(new Response(predicate.opened));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private class JobPredicate implements Predicate<PersistentTask<?>> {
|
||||
|
||||
private volatile boolean opened;
|
||||
|
||||
@Override
|
||||
public boolean test(PersistentTask<?> persistentTask) {
|
||||
if (persistentTask == null) {
|
||||
return false;
|
||||
}
|
||||
JobState jobState = (JobState) persistentTask.getStatus();
|
||||
if (jobState == null) {
|
||||
return false;
|
||||
}
|
||||
switch (jobState) {
|
||||
case OPENED:
|
||||
opened = true;
|
||||
return true;
|
||||
case FAILED:
|
||||
return true;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected job state [" + jobState + "]");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor<Request> {
|
||||
|
@ -435,7 +467,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
if (job.isDeleted()) {
|
||||
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted");
|
||||
}
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
|
||||
PersistentTask<?> task = MlMetadata.getJobTask(jobId, tasks);
|
||||
if (task != null) {
|
||||
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has already been opened");
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -52,7 +51,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
|
|||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
|
@ -62,10 +60,12 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
|
|||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class StartDatafeedAction
|
||||
extends Action<StartDatafeedAction.Request, StartDatafeedAction.Response, StartDatafeedAction.RequestBuilder> {
|
||||
|
@ -338,18 +338,16 @@ public class StartDatafeedAction
|
|||
|
||||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final DatafeedStateObserver observer;
|
||||
private final XPackLicenseState licenseState;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.licenseState = licenseState;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.observer = new DatafeedStateObserver(threadPool, clusterService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -358,7 +356,7 @@ public class StartDatafeedAction
|
|||
PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
waitForDatafeedStarted(request, listener);
|
||||
waitForDatafeedStarted(taskId, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -372,13 +370,25 @@ public class StartDatafeedAction
|
|||
}
|
||||
}
|
||||
|
||||
void waitForDatafeedStarted(Request request, ActionListener<Response> listener) {
|
||||
observer.waitForState(request.getDatafeedId(), request.timeout, DatafeedState.STARTED, e -> {
|
||||
if (e != null) {
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
void waitForDatafeedStarted(long taskId, Request request, ActionListener<Response> listener) {
|
||||
Predicate<PersistentTask<?>> predicate = persistentTask -> {
|
||||
if (persistentTask == null) {
|
||||
return false;
|
||||
}
|
||||
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
|
||||
return datafeedState == DatafeedState.STARTED;
|
||||
};
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -37,14 +36,13 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -217,22 +215,22 @@ public class StopDatafeedAction
|
|||
|
||||
public static class TransportAction extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
|
||||
|
||||
private final InternalClient client;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService, InternalClient client) {
|
||||
ClusterService clusterService, PersistentTasksService persistentTasksService) {
|
||||
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT);
|
||||
this.client = client;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
MetaData metaData = state.metaData();
|
||||
PersistentTasksCustomMetaData tasks = metaData.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
|
||||
if (request.force) {
|
||||
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
|
||||
|
@ -245,11 +243,10 @@ public class StopDatafeedAction
|
|||
listener.onFailure(new RuntimeException(msg));
|
||||
}
|
||||
} else {
|
||||
MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE);
|
||||
String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks);
|
||||
request.setNodes(nodeId);
|
||||
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(request.getDatafeedId(), mlMetadata, tasks);
|
||||
request.setNodes(datafeedTask.getExecutorNode());
|
||||
ActionListener<Response> finalListener =
|
||||
ActionListener.wrap(r -> waitForDatafeedStopped(request, r, listener), listener::onFailure);
|
||||
ActionListener.wrap(r -> waitForDatafeedStopped(datafeedTask.getId(), request, r, listener), listener::onFailure);
|
||||
super.doExecute(task, request, finalListener);
|
||||
}
|
||||
}
|
||||
|
@ -257,24 +254,33 @@ public class StopDatafeedAction
|
|||
// Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed
|
||||
// This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
|
||||
// so wait for that to happen here.
|
||||
void waitForDatafeedStopped(Request request, Response response, ActionListener<Response> listener) {
|
||||
DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService);
|
||||
observer.waitForState(request.getDatafeedId(), request.getTimeout(), DatafeedState.STOPPED, e -> {
|
||||
if (e != null) {
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
|
||||
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(),
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void forceStopTask(long taskId, ActionListener<Response> listener) {
|
||||
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId);
|
||||
private void forceStopTask(long persistentTaskId, ActionListener<Response> listener) {
|
||||
persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
|
||||
client.execute(RemovePersistentTaskAction.INSTANCE, request,
|
||||
ActionListener.wrap(
|
||||
response -> listener.onResponse(new Response(response.isAcknowledged())),
|
||||
listener::onFailure));
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -300,7 +306,7 @@ public class StopDatafeedAction
|
|||
}
|
||||
}
|
||||
|
||||
static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
|
||||
static PersistentTask<?> validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
|
||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
if (datafeed == null) {
|
||||
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
|
||||
|
@ -310,6 +316,6 @@ public class StopDatafeedAction
|
|||
throw ExceptionsHelper.conflictStatusException("Cannot stop datafeed [" + datafeedId +
|
||||
"] because it has already been stopped");
|
||||
}
|
||||
return task.getExecutorNode();
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
|||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
|
||||
import java.time.Duration;
|
||||
|
@ -46,14 +47,18 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
|
||||
public class DatafeedJobRunner extends AbstractComponent {
|
||||
|
||||
private static final String INF_SYMBOL = "\u221E";
|
||||
|
||||
private final Client client;
|
||||
private final ClusterService clusterService;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final JobProvider jobProvider;
|
||||
private final ThreadPool threadPool;
|
||||
private final Supplier<Long> currentTimeSupplier;
|
||||
|
@ -61,14 +66,15 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
private final ConcurrentMap<String, Holder> runningDatafeeds = new ConcurrentHashMap<>();
|
||||
|
||||
public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor) {
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
|
||||
super(Settings.EMPTY);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.threadPool = threadPool;
|
||||
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
|
||||
this.auditor = auditor;
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
|
||||
}
|
||||
|
||||
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
|
||||
|
@ -225,7 +231,8 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
|
||||
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
|
||||
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
|
||||
return new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler);
|
||||
return new Holder(task.getPersistentTaskId(), datafeed, datafeedJob, task.isLookbackOnly(),
|
||||
new ProblemTracker(auditor, job.getId()), handler);
|
||||
}
|
||||
|
||||
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) {
|
||||
|
@ -272,6 +279,7 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
|
||||
public class Holder {
|
||||
|
||||
private final long taskId;
|
||||
private final DatafeedConfig datafeed;
|
||||
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
|
||||
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
|
||||
|
@ -281,8 +289,9 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
private final Consumer<Exception> handler;
|
||||
volatile Future<?> future;
|
||||
|
||||
Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
|
||||
Holder(long taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
|
||||
Consumer<Exception> handler) {
|
||||
this.taskId = taskId;
|
||||
this.datafeed = datafeed;
|
||||
this.datafeedJob = datafeedJob;
|
||||
this.autoCloseJob = autoCloseJob;
|
||||
|
@ -351,9 +360,13 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void closeJob() {
|
||||
DatafeedStateObserver observer = new DatafeedStateObserver(threadPool, clusterService);
|
||||
observer.waitForState(datafeed.getId(), TimeValue.timeValueSeconds(20), DatafeedState.STOPPED, e1 -> {
|
||||
if (e1 == null) {
|
||||
Predicate<PersistentTask<?>> predicate = persistentTask -> {
|
||||
return persistentTask == null || persistentTask.getStatus() == DatafeedState.STOPPED;
|
||||
};
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, TimeValue.timeValueSeconds(20),
|
||||
new WaitForPersistentTaskStatusListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
|
||||
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {
|
||||
|
||||
|
@ -369,8 +382,11 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
logger.error("[" + datafeed.getJobId() + "] failed to auto-close job", e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.error("Cannot auto close job [" + datafeed.getJobId() + "]", e1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Cannot auto close job [" + datafeed.getJobId() + "]", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class DatafeedStateObserver {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(DatafeedStateObserver.class);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public DatafeedStateObserver(ThreadPool threadPool, ClusterService clusterService) {
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public void waitForState(String datafeedId, TimeValue waitTimeout, DatafeedState expectedState, Consumer<Exception> handler) {
|
||||
ClusterStateObserver observer =
|
||||
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
|
||||
Predicate<ClusterState> predicate = (newState) -> {
|
||||
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
|
||||
return datafeedState == expectedState;
|
||||
};
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
handler.accept(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
Exception e = new IllegalArgumentException("Cluster service closed while waiting for datafeed state to change to ["
|
||||
+ expectedState + "]");
|
||||
handler.accept(new IllegalStateException(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
if (predicate.test(clusterService.state())) {
|
||||
handler.accept(null);
|
||||
} else {
|
||||
Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed state to change to ["
|
||||
+ expectedState + "]");
|
||||
handler.accept(e);
|
||||
}
|
||||
}
|
||||
}, predicate, waitTimeout);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.utils;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class JobStateObserver {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(JobStateObserver.class);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public JobStateObserver(ThreadPool threadPool, ClusterService clusterService) {
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public void waitForState(String jobId, TimeValue waitTimeout, JobState expectedState, Consumer<Exception> handler) {
|
||||
ClusterStateObserver observer =
|
||||
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
|
||||
JobStatePredicate jobStatePredicate = new JobStatePredicate(jobId, expectedState);
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
if (jobStatePredicate.failed) {
|
||||
handler.accept(new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.OPENED +
|
||||
"] but got [" + JobState.FAILED +"]", RestStatus.CONFLICT));
|
||||
} else {
|
||||
handler.accept(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
Exception e = new IllegalArgumentException("Cluster service closed while waiting for job state to change to ["
|
||||
+ expectedState + "]");
|
||||
handler.accept(new IllegalStateException(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
ClusterState state = clusterService.state();
|
||||
if (jobStatePredicate.test(state)) {
|
||||
if (jobStatePredicate.failed) {
|
||||
handler.accept(new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.OPENED +
|
||||
"] but got [" + JobState.FAILED +"]", RestStatus.CONFLICT));
|
||||
} else {
|
||||
handler.accept(null);
|
||||
}
|
||||
} else {
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState actual = MlMetadata.getJobState(jobId, tasks);
|
||||
Exception e = new IllegalArgumentException("Timeout expired while waiting for job state [" + actual +
|
||||
"] to change to [" + expectedState + "]");
|
||||
handler.accept(e);
|
||||
}
|
||||
}
|
||||
}, jobStatePredicate, waitTimeout);
|
||||
}
|
||||
|
||||
private static class JobStatePredicate implements Predicate<ClusterState> {
|
||||
|
||||
private final String jobId;
|
||||
private final JobState expectedState;
|
||||
|
||||
private volatile boolean failed;
|
||||
|
||||
JobStatePredicate(String jobId, JobState expectedState) {
|
||||
this.jobId = jobId;
|
||||
this.expectedState = expectedState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean test(ClusterState newState) {
|
||||
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
JobState jobState = MlMetadata.getJobState(jobId, tasks);
|
||||
if (jobState == JobState.FAILED) {
|
||||
failed = true;
|
||||
return true;
|
||||
} else {
|
||||
return jobState == expectedState;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -7,7 +7,6 @@ package org.elasticsearch.xpack.persistent;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -21,6 +20,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -29,11 +29,11 @@ import java.util.function.Predicate;
|
|||
*/
|
||||
public class PersistentTasksService extends AbstractComponent {
|
||||
|
||||
private final Client client;
|
||||
private final InternalClient client;
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
|
||||
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
|
|
|
@ -68,7 +68,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
|
|||
|
||||
ElasticsearchStatusException e =
|
||||
expectThrows(ElasticsearchStatusException.class,
|
||||
() -> CloseJobAction.validate("job_id", cs1));
|
||||
() -> CloseJobAction.validateAndReturnJobTask("job_id", cs1));
|
||||
assertEquals(RestStatus.CONFLICT, e.status());
|
||||
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
|
||||
|
||||
|
@ -81,7 +81,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
|
|||
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
|
||||
.putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||
new PersistentTasksCustomMetaData(1L, tasks))).build();
|
||||
CloseJobAction.validate("job_id", cs2);
|
||||
CloseJobAction.validateAndReturnJobTask("job_id", cs2);
|
||||
}
|
||||
|
||||
public static PersistentTask<StartDatafeedAction.Request> createTask(long id,
|
||||
|
|
|
@ -56,14 +56,14 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
|
|||
Job job = createDatafeedJob().build(new Date());
|
||||
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
|
||||
Exception e = expectThrows(ResourceNotFoundException.class,
|
||||
() -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
|
||||
() -> StopDatafeedAction.validateAndReturnDatafeedTask("foo", mlMetadata1, tasks));
|
||||
assertThat(e.getMessage(), equalTo("No datafeed with id [foo] exists"));
|
||||
|
||||
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build();
|
||||
MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false)
|
||||
.putDatafeed(datafeedConfig)
|
||||
.build();
|
||||
StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata2, tasks);
|
||||
StopDatafeedAction.validateAndReturnDatafeedTask("foo", mlMetadata2, tasks);
|
||||
}
|
||||
|
||||
public void testValidate_alreadyStopped() {
|
||||
|
@ -83,7 +83,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
|
|||
.putDatafeed(datafeedConfig)
|
||||
.build();
|
||||
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
|
||||
() -> StopDatafeedAction.validateAndReturnDatafeedTask("foo", mlMetadata1, tasks));
|
||||
assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped"));
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.xpack.ml.notifications.AuditMessage;
|
|||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
@ -142,7 +143,9 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
|
||||
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
|
||||
|
||||
datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor) {
|
||||
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
|
||||
datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor,
|
||||
persistentTasksService) {
|
||||
@Override
|
||||
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
|
||||
return dataExtractorFactory;
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -90,7 +91,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry) {
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
|
||||
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
|
||||
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService,
|
||||
clusterService);
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
|
||||
|
|
Loading…
Reference in New Issue