[ML] Make open job and start datafeed apis master node actions and

let close job and stop datafeed apis redirect to elected master node.

This is for cluster state observation purposes, so that a subsequent open and then close job or
start and then stop datafeed see the same local cluster state and sanity validation doesn't fail.

Original commit: elastic/x-pack-elasticsearch@21a63184b9
This commit is contained in:
Martijn van Groningen 2017-04-14 10:43:21 +02:00
parent e93b447b9c
commit d114a55b99
10 changed files with 212 additions and 177 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -20,6 +21,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -291,47 +294,58 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
/*
* Closing of multiple jobs:
*
* 1. Resolve and validate jobs first: if any job does not meet the
* criteria (e.g. open datafeed), fail immediately, do not close any
* job
*
* 2. Internally a task request is created for every job, so there
* are n inner tasks for 1 user request
*
* 3. Collect n inner task results or failures and send 1 outer
* result/failure
*/
ClusterState state = clusterService.state();
request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]);
if (request.resolvedJobIds.length == 0) {
listener.onResponse(new Response(true));
return;
}
Set<String> executorNodes = new HashSet<>();
for (String resolvedJobId : request.resolvedJobIds) {
JobManager.getJobOrThrowIfUnknown(state, resolvedJobId);
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot perform requested action because job [" + resolvedJobId
+ "] is not open";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates close job to elected master node, so it becomes the coordinating node.
// See comment in OpenJobAction.Transport class for more information.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else {
executorNodes.add(jobTask.getExecutorNode());
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
if (request.isForce()) {
forceCloseJob(state, request, listener);
} else {
normalCloseJob(state, task, request, listener);
/*
* Closing of multiple jobs:
*
* 1. Resolve and validate jobs first: if any job does not meet the
* criteria (e.g. open datafeed), fail immediately, do not close any
* job
*
* 2. Internally a task request is created for every job, so there
* are n inner tasks for 1 user request
*
* 3. Collect n inner task results or failures and send 1 outer
* result/failure
*/
request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]);
if (request.resolvedJobIds.length == 0) {
listener.onResponse(new Response(true));
return;
}
Set<String> executorNodes = new HashSet<>();
for (String resolvedJobId : request.resolvedJobIds) {
JobManager.getJobOrThrowIfUnknown(state, resolvedJobId);
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot perform requested action because job [" + resolvedJobId
+ "] is not open";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
if (request.isForce()) {
forceCloseJob(state, request, listener);
} else {
normalCloseJob(state, task, request, listener);
}
}
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -236,7 +235,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
}
}
public static class TransportAction extends TransportJobTaskAction<OpenJobAction.JobTask, Request, Response> {
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
@ -255,8 +254,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task,
ActionListener<Response> listener, ClusterState state) {
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
if (request.getAdvanceTime() != null) {

View File

@ -11,14 +11,16 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -88,7 +90,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
return new Response();
}
public static class Request extends ActionRequest implements ToXContent {
public static class Request extends MasterNodeRequest<Request> implements ToXContent {
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
@ -356,22 +358,41 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
// This class extends from TransportMasterNodeAction for cluster state observing purposes.
// The close job api also redirect the elected master node.
// The master node will wait for the job to be opened by checking the persistent task's status and then return.
// To ensure that a subsequent close job call will see that same task status (and sanity validation doesn't fail)
// both open and close job apis redirect to the elected master node.
// In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off.
// The open job api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue.
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected String executor() {
// This api doesn't do heavy or blocking operations (just delegates PersistentTasksService),
// so we can do this on the network thread
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
JobParams jobParams = request.getJobParams();
if (licenseState.isMachineLearningAllowed()) {
ActionListener<PersistentTask<JobParams>> finalListener = new ActionListener<PersistentTask<JobParams>>() {
@ -395,6 +416,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// We only delegate here to PersistentTasksService, but if there is a metadata writeblock,
// then delagating to PersistentTasksService doesn't make a whole lot of sense,
// because PersistentTasksService will then fail.
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
void waitForJobStarted(String taskId, JobParams jobParams, ActionListener<Response> listener) {
JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.timeout,

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
@ -233,7 +232,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
public static class TransportAction extends TransportJobTaskAction<OpenJobAction.JobTask, Request, Response> {
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
@ -252,7 +251,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener, ClusterState state) {
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {

View File

@ -12,19 +12,22 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
@ -95,7 +98,7 @@ public class StartDatafeedAction
return new Response();
}
public static class Request extends ActionRequest implements ToXContent {
public static class Request extends MasterNodeRequest<Request> implements ToXContent {
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
@ -403,22 +406,41 @@ public class StartDatafeedAction
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
// This class extends from TransportMasterNodeAction for cluster state observing purposes.
// The stop datafeed api also redirect the elected master node.
// The master node will wait for the datafeed to be started by checking the persistent task's status and then return.
// To ensure that a subsequent stop datafeed call will see that same task status (and sanity validation doesn't fail)
// both start and stop datafeed apis redirect to the elected master node.
// In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off.
// The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue.
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
XPackLicenseState licenseState, PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected String executor() {
// This api doesn't do heavy or blocking operations (just delegates PersistentTasksService),
// so we can do this on the network thread
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
DatafeedParams params = request.params;
if (licenseState.isMachineLearningAllowed()) {
ActionListener<PersistentTask<DatafeedParams>> finalListener = new ActionListener<PersistentTask<DatafeedParams>>() {
@ -442,6 +464,14 @@ public class StartDatafeedAction
}
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// We only delegate here to PersistentTasksService, but if there is a metadata writeblock,
// then delagating to PersistentTasksService doesn't make a whole lot of sense,
// because PersistentTasksService will then fail.
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
void waitForDatafeedStarted(String taskId, DatafeedParams params, ActionListener<Response> listener) {
Predicate<PersistentTask<?>> predicate = persistentTask -> {
if (persistentTask == null) {

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
@ -20,6 +21,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
@ -33,6 +35,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -264,66 +267,79 @@ public class StopDatafeedAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<String> resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks);
if (resolvedDatafeeds.isEmpty()) {
listener.onResponse(new Response(true));
return;
}
request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()]));
if (request.force) {
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(resolvedDatafeeds.size());
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
if (counter.incrementAndGet() == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
failures.set(slot - 1, e);
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
});
} else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found.";
logger.warn(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates stop datafeed to elected master node, so it becomes the coordinating node.
// See comment in StartDatafeedAction.Transport class for more information.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
} else {
Set<String> executorNodes = new HashSet<>();
Map<String, String> datafeedIdToPersistentTaskId = new HashMap<>();
MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks);
executorNodes.add(datafeedTask.getExecutorNode());
datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId());
List<String> resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks);
if (resolvedDatafeeds.isEmpty()) {
listener.onResponse(new Response(true));
return;
}
request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()]));
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure);
if (request.force) {
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(resolvedDatafeeds.size());
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
super.doExecute(task, request, finalListener);
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
if (counter.incrementAndGet() == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
failures.set(slot - 1, e);
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
});
} else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " +
"datafeed's task could not be found.";
logger.warn(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == resolvedDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
}
} else {
Set<String> executorNodes = new HashSet<>();
Map<String, String> datafeedIdToPersistentTaskId = new HashMap<>();
for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks);
executorNodes.add(datafeedTask.getExecutorNode());
datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId());
}
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure);
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
super.doExecute(task, request, finalListener);
}
}
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
@ -39,8 +38,8 @@ import java.util.function.Supplier;
*/
// TODO: Hacking around here with TransportTasksAction. Ideally we should have another base class in core that
// redirects to a single node only
public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction.JobTask, Request extends TransportJobTaskAction.JobTaskRequest<Request>,
Response extends BaseTasksResponse & Writeable> extends TransportTasksAction<OperationTask, Request, Response, Response> {
public abstract class TransportJobTaskAction<Request extends TransportJobTaskAction.JobTaskRequest<Request>,
Response extends BaseTasksResponse & Writeable> extends TransportTasksAction<OpenJobAction.JobTask, Request, Response, Response> {
protected final AutodetectProcessManager processManager;
@ -71,25 +70,6 @@ public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction
}
}
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlMetadata.getJobState(task.getJobId(), tasks);
if (jobState == JobState.OPENED) {
innerTaskOperation(request, task, listener, state);
} else {
logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion());
// Note that DatafeedJob relies on this exception being thrown to detect the state
// conflict and stop the datafeed. If this exception type/status changes, DatafeedJob
// also needs to change.
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot perform requested action because job [" +
request.getJobId() + "] is not open"));
}
}
protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener<Response> listener, ClusterState state);
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -179,7 +178,7 @@ public class UpdateProcessAction extends
}
}
public static class TransportAction extends TransportJobTaskAction<OpenJobAction.JobTask, Request, Response> {
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
@ -198,7 +197,7 @@ public class UpdateProcessAction extends
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener, ClusterState state) {
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
request.getModelPlotConfig(), e -> {

View File

@ -156,7 +156,7 @@ public class AutodetectProcessManager extends AbstractComponent {
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobId + "] is not open");
}
communicator.writeToJob(input, xContentType, params, handler);
}
@ -174,9 +174,9 @@ public class AutodetectProcessManager extends AbstractComponent {
logger.debug("Flushing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
String message = String.format(Locale.ROOT, "[%s] Cannot flush: no active autodetect process for job", jobId);
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobId);
logger.debug(message);
throw new IllegalArgumentException(message);
throw ExceptionsHelper.conflictStatusException(message);
}
communicator.flushJob(params, (aVoid, e) -> {
@ -194,8 +194,9 @@ public class AutodetectProcessManager extends AbstractComponent {
Consumer<Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
handler.accept(null);
String message = "Cannot process update model debug config because job [" + jobId + "] is not open";
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> {

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
@ -16,10 +15,8 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentType.JSON;
@ -44,7 +41,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
@ -149,7 +145,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("stopped", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close",
Collections.singletonMap("timeout", "20s"));
@ -170,7 +165,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
@ -210,8 +204,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Collections.singletonMap("timeout", "20s"));
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
// feed some more data points
postData =
"{\"airline\":\"AAL\",\"responsetime\":\"136.2361\",\"sourcetype\":\"farequote\",\"time\":\"1407081600\"}\n" +
@ -308,28 +301,4 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false);
}
// When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node,
// may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or
// close calls until that node that is running the job task has applied the cluster state where job state has been set to opened.
// this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be
// avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running),
// this is just a workaround for inconsistency in cluster states that may happen for a small amount of time.
private void assertSameClusterStateOnAllNodes(){
assert getClusterHosts().size() > 1;
Set<Integer> versions = new HashSet<>();
for (HttpHost host : getClusterHosts()) {
try {
// Client round robins between cluster hosts:
Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true"));
assertEquals(200, response.getStatusLine().getStatusCode());
int version = (Integer) responseEntityToMap(response).get("version");
logger.info("Sampled version [{}]", version);
versions.add(version);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
assertEquals(1, versions.size());
}
}