[ML] Add force delete datafeed action (elastic/x-pack-elasticsearch#1623)
When a user or client intend to delete a datafeed and its job, there is benefit into ensuring the datafeed has gracefully stopped (ie no data loss). In constrast, the desired behaviour is to stop and delete the datafeed as quickly as possible. This change adds a force option to the delete datafeed action. When the delete is forced, the datafeed is isolated, its task removed and, finally, the datafeed itself is removed from the metadata. relates elastic/x-pack-elasticsearch#1533 Original commit: elastic/x-pack-elasticsearch@5ae0168bf2
This commit is contained in:
parent
c2575288d8
commit
3f6e640f90
|
@ -12,7 +12,7 @@ The delete {dfeed} API enables you to delete an existing {dfeed}.
|
|||
|
||||
===== Description
|
||||
|
||||
NOTE: You must stop the {dfeed} before you can delete it.
|
||||
NOTE: Unless the `force` parameter is used, the {dfeed} must be stopped before it can be deleted.
|
||||
|
||||
|
||||
===== Path Parameters
|
||||
|
@ -21,6 +21,12 @@ NOTE: You must stop the {dfeed} before you can delete it.
|
|||
(string) Identifier for the {dfeed}
|
||||
|
||||
|
||||
===== Query Parameters
|
||||
|
||||
`force`::
|
||||
(boolean) Use to forcefully delete a started {dfeed}; this method is quicker than
|
||||
stopping and deleting the {dfeed}.
|
||||
|
||||
===== Authorization
|
||||
|
||||
You must have `manage_ml`, or `manage` cluster privileges to use this API.
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.elasticsearch.xpack.ml.action.GetJobsAction;
|
|||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
|
||||
import org.elasticsearch.xpack.ml.action.IsolateDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.KillProcessAction;
|
||||
import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.PostDataAction;
|
||||
|
@ -420,6 +421,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
new ActionHandler<>(PreviewDatafeedAction.INSTANCE, PreviewDatafeedAction.TransportAction.class),
|
||||
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
|
||||
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
|
||||
new ActionHandler<>(IsolateDatafeedAction.INSTANCE, IsolateDatafeedAction.TransportAction.class),
|
||||
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
|
||||
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
|
||||
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
|
||||
|
|
|
@ -5,6 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
@ -20,19 +23,24 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -59,7 +67,10 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
|
||||
|
||||
public static final ParseField FORCE = new ParseField("force");
|
||||
|
||||
private String datafeedId;
|
||||
private boolean force;
|
||||
|
||||
public Request(String datafeedId) {
|
||||
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
|
||||
|
@ -72,6 +83,14 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
return datafeedId;
|
||||
}
|
||||
|
||||
public boolean isForce() {
|
||||
return force;
|
||||
}
|
||||
|
||||
public void setForce(boolean force) {
|
||||
this.force = force;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
|
@ -81,12 +100,18 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
datafeedId = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_5_5_0)) {
|
||||
force = in.readBoolean();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(datafeedId);
|
||||
if (out.getVersion().onOrAfter(Version.V_5_5_0)) {
|
||||
out.writeBoolean(force);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,13 +124,13 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return Objects.equals(datafeedId, request.datafeedId);
|
||||
Request other = (Request) o;
|
||||
return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(force, other.force);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(datafeedId);
|
||||
return Objects.hash(datafeedId, force);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,11 +165,17 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
private InternalClient client;
|
||||
private PersistentTasksService persistentTasksService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
InternalClient internalClient, PersistentTasksService persistentTasksService) {
|
||||
super(settings, DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
this.client = internalClient;
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -159,6 +190,55 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
if (request.isForce()) {
|
||||
forceDeleteDatafeed(request, state, listener);
|
||||
} else {
|
||||
deleteDatafeedFromMetadata(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void forceDeleteDatafeed(Request request, ClusterState state, ActionListener<Response> listener) {
|
||||
ActionListener<Boolean> finalListener = ActionListener.wrap(
|
||||
response -> deleteDatafeedFromMetadata(request, listener),
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
ActionListener<IsolateDatafeedAction.Response> isolateDatafeedHandler = ActionListener.wrap(
|
||||
response -> removeDatafeedTask(request, state, finalListener),
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
IsolateDatafeedAction.Request isolateDatafeedRequest = new IsolateDatafeedAction.Request(request.getDatafeedId());
|
||||
client.execute(IsolateDatafeedAction.INSTANCE, isolateDatafeedRequest, isolateDatafeedHandler);
|
||||
}
|
||||
|
||||
private void removeDatafeedTask(Request request, ClusterState state, ActionListener<Boolean> listener) {
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
|
||||
if (datafeedTask == null) {
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
persistentTasksService.cancelPersistentTask(datafeedTask.getId(),
|
||||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
|
||||
listener.onResponse(Boolean.TRUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
// the task has been removed in between
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteDatafeedFromMetadata(Request request, ActionListener<Response> listener) {
|
||||
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(),
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
|
||||
|
@ -179,7 +259,6 @@ public class DeleteDatafeedAction extends Action<DeleteDatafeedAction.Request, D
|
|||
.build();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
|
||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* An internal action that isolates a datafeed.
|
||||
* Datafeed isolation is effectively disconnecting a running datafeed task
|
||||
* from its job, i.e. even though the datafeed performs a search, the retrieved
|
||||
* data is not sent to the job, etc. As stopping a datafeed cannot always happen
|
||||
* instantaneously (e.g. cannot cancel an ongoing search), isolating a datafeed
|
||||
* task ensures the current datafeed task can complete inconsequentially while
|
||||
* the datafeed persistent task may be stopped or reassigned on another node.
|
||||
*/
|
||||
public class IsolateDatafeedAction
|
||||
extends Action<IsolateDatafeedAction.Request, IsolateDatafeedAction.Response, IsolateDatafeedAction.RequestBuilder> {
|
||||
|
||||
public static final IsolateDatafeedAction INSTANCE = new IsolateDatafeedAction();
|
||||
public static final String NAME = "cluster:internal/xpack/ml/datafeed/isolate";
|
||||
|
||||
private IsolateDatafeedAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
|
||||
|
||||
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
|
||||
|
||||
static {
|
||||
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
|
||||
}
|
||||
|
||||
public static Request fromXContent(XContentParser parser) {
|
||||
return parseRequest(null, parser);
|
||||
}
|
||||
|
||||
public static Request parseRequest(String datafeedId, XContentParser parser) {
|
||||
Request request = PARSER.apply(parser, null);
|
||||
if (datafeedId != null) {
|
||||
request.datafeedId = datafeedId;
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
private String datafeedId;
|
||||
|
||||
public Request(String datafeedId) {
|
||||
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
|
||||
}
|
||||
|
||||
Request() {
|
||||
}
|
||||
|
||||
private String getDatafeedId() {
|
||||
return datafeedId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean match(Task task) {
|
||||
String expectedDescription = MlMetadata.datafeedTaskId(datafeedId);
|
||||
if (task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription())){
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
datafeedId = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(datafeedId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(datafeedId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Request other = (Request) obj;
|
||||
return Objects.equals(datafeedId, other.datafeedId);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends BaseTasksResponse implements Writeable {
|
||||
|
||||
private boolean isolated;
|
||||
|
||||
public Response(boolean isolated) {
|
||||
super(null, null);
|
||||
this.isolated = isolated;
|
||||
}
|
||||
|
||||
public Response(StreamInput in) throws IOException {
|
||||
readFrom(in);
|
||||
}
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
isolated = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(isolated);
|
||||
}
|
||||
}
|
||||
|
||||
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
RequestBuilder(ElasticsearchClient client, IsolateDatafeedAction action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportTasksAction<StartDatafeedAction.DatafeedTask, Request, Response, Response> {
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService) {
|
||||
super(settings, IsolateDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||
indexNameExpressionResolver, Request::new, Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
final ClusterState state = clusterService.state();
|
||||
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
|
||||
|
||||
if (datafeedTask == null || datafeedTask.getExecutorNode() == null) {
|
||||
// No running datafeed task to isolate
|
||||
listener.onResponse(new Response());
|
||||
return;
|
||||
}
|
||||
|
||||
String executorNode = datafeedTask.getExecutorNode();
|
||||
DiscoveryNodes nodes = state.nodes();
|
||||
if (nodes.resolveNode(executorNode).getVersion().before(Version.V_5_5_0)) {
|
||||
listener.onFailure(new ElasticsearchException("Force delete datafeed is not supported because the datafeed task " +
|
||||
"is running on a node [" + executorNode + "] with a version prior to " + Version.V_5_5_0));
|
||||
return;
|
||||
}
|
||||
|
||||
request.setNodes(datafeedTask.getExecutorNode());
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
|
||||
List<FailedNodeException> failedNodeExceptions) {
|
||||
if (taskOperationFailures.isEmpty() == false) {
|
||||
throw org.elasticsearch.ExceptionsHelper
|
||||
.convertToElastic(taskOperationFailures.get(0).getCause());
|
||||
} else if (failedNodeExceptions.isEmpty() == false) {
|
||||
throw org.elasticsearch.ExceptionsHelper
|
||||
.convertToElastic(failedNodeExceptions.get(0));
|
||||
} else {
|
||||
return new Response();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask datafeedTask, ActionListener<Response> listener) {
|
||||
datafeedTask.isolate();
|
||||
listener.onResponse(new Response());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response readTaskResponse(StreamInput in) throws IOException {
|
||||
return new Response(in);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -398,7 +398,15 @@ public class StartDatafeedAction
|
|||
}
|
||||
|
||||
public void stop(String reason, TimeValue timeout) {
|
||||
datafeedManager.stopDatafeed(this, reason, timeout);
|
||||
if (datafeedManager != null) {
|
||||
datafeedManager.stopDatafeed(this, reason, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
public void isolate() {
|
||||
if (datafeedManager != null) {
|
||||
datafeedManager.isolateDatafeed(getAllocationId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,10 @@ class DatafeedJob {
|
|||
|
||||
void isolate() {
|
||||
isIsolated = true;
|
||||
stop();
|
||||
}
|
||||
|
||||
boolean isIsolated() {
|
||||
return isIsolated;
|
||||
}
|
||||
|
||||
Long runLookBack(long startTime, Long endTime) throws Exception {
|
||||
|
@ -96,7 +99,7 @@ class DatafeedJob {
|
|||
request.setCalcInterim(true);
|
||||
run(lookbackStartTimeMs, lookbackEnd, request);
|
||||
|
||||
if (isRunning()) {
|
||||
if (isRunning() && !isIsolated) {
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED));
|
||||
LOGGER.info("[{}] Lookback has finished", jobId);
|
||||
if (isLookbackOnly) {
|
||||
|
@ -154,7 +157,7 @@ class DatafeedJob {
|
|||
long recordCount = 0;
|
||||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end);
|
||||
while (dataExtractor.hasNext()) {
|
||||
if (!isRunning() && !dataExtractor.isCancelled()) {
|
||||
if ((isIsolated || !isRunning()) && !dataExtractor.isCancelled()) {
|
||||
dataExtractor.cancel();
|
||||
}
|
||||
if (isIsolated) {
|
||||
|
@ -232,7 +235,7 @@ class DatafeedJob {
|
|||
// If the datafeed was stopped, then it is possible that by the time
|
||||
// we call flush the job is closed. Thus, we don't flush unless the
|
||||
// datafeed is still running.
|
||||
if (isRunning()) {
|
||||
if (isRunning() && !isIsolated) {
|
||||
flushJob(flushRequest);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,11 +152,17 @@ public class DatafeedManager extends AbstractComponent {
|
|||
isolated = true;
|
||||
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
|
||||
while (iter.hasNext()) {
|
||||
iter.next().isolateDatafeed();
|
||||
Holder next = iter.next();
|
||||
next.isolateDatafeed();
|
||||
next.setRelocating();
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
public void isolateDatafeed(long allocationId) {
|
||||
runningDatafeedsOnThisNode.get(allocationId).isolateDatafeed();
|
||||
}
|
||||
|
||||
// Important: Holder must be created and assigned to DatafeedTask before setting state to started,
|
||||
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
|
||||
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
|
||||
|
@ -219,7 +225,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
|
||||
if (holder.isRunning()) {
|
||||
if (holder.isRunning() && !holder.isIsolated()) {
|
||||
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
|
||||
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
|
||||
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() {
|
||||
|
@ -343,6 +349,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
private final ProblemTracker problemTracker;
|
||||
private final Consumer<Exception> handler;
|
||||
volatile Future<?> future;
|
||||
private volatile boolean isRelocating;
|
||||
|
||||
Holder(String taskId, long allocationId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob,
|
||||
ProblemTracker problemTracker, Consumer<Exception> handler) {
|
||||
|
@ -363,7 +370,15 @@ public class DatafeedManager extends AbstractComponent {
|
|||
return datafeedJob.isRunning();
|
||||
}
|
||||
|
||||
boolean isIsolated() {
|
||||
return datafeedJob.isIsolated();
|
||||
}
|
||||
|
||||
public void stop(String source, TimeValue timeout, Exception e) {
|
||||
if (isRelocating) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
|
||||
if (datafeedJob.stop()) {
|
||||
boolean acquired = false;
|
||||
|
@ -403,10 +418,14 @@ public class DatafeedManager extends AbstractComponent {
|
|||
datafeedJob.isolate();
|
||||
}
|
||||
|
||||
public void setRelocating() {
|
||||
isRelocating = true;
|
||||
}
|
||||
|
||||
private Long executeLoopBack(long startTime, Long endTime) throws Exception {
|
||||
datafeedJobLock.lock();
|
||||
try {
|
||||
if (isRunning()) {
|
||||
if (isRunning() && !isIsolated()) {
|
||||
return datafeedJob.runLookBack(startTime, endTime);
|
||||
} else {
|
||||
return null;
|
||||
|
@ -419,7 +438,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
private long executeRealTime() throws Exception {
|
||||
datafeedJobLock.lock();
|
||||
try {
|
||||
if (isRunning()) {
|
||||
if (isRunning() && !isIsolated()) {
|
||||
return datafeedJob.runRealtime();
|
||||
} else {
|
||||
return -1L;
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.rest.RestController;
|
|||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.AcknowledgedRestListener;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.action.CloseJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
|
||||
|
@ -33,8 +34,10 @@ public class RestDeleteDatafeedAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
|
||||
DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(datafeedId);
|
||||
return channel -> client.execute(DeleteDatafeedAction.INSTANCE, deleteDatafeedRequest, new AcknowledgedRestListener<>(channel));
|
||||
DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId);
|
||||
if (restRequest.hasParam(DeleteDatafeedAction.Request.FORCE.getPreferredName())) {
|
||||
request.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), request.isForce()));
|
||||
}
|
||||
return channel -> client.execute(DeleteDatafeedAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -102,9 +102,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
assertNull(datafeedJob.runLookBack(0L, null));
|
||||
|
||||
verify(dataExtractorFactory).newExtractor(0L, 1500L);
|
||||
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
|
||||
flushRequest.setCalcInterim(true);
|
||||
verify(client, never()).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
|
||||
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
|
||||
}
|
||||
|
||||
public void testLookBackRunWithNoEndTime() throws Exception {
|
||||
|
|
|
@ -403,6 +403,34 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
|
|||
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
|
||||
}
|
||||
|
||||
public void testForceDeleteWhileDatafeedIsRunning() throws Exception {
|
||||
String jobId = "job-realtime-2";
|
||||
createJob(jobId);
|
||||
String datafeedId = jobId + "-datafeed";
|
||||
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
|
||||
openJob(client(), jobId);
|
||||
|
||||
Response response = client().performRequest("post",
|
||||
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z");
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
assertThat(responseEntityToString(response), equalTo("{\"started\":true}"));
|
||||
|
||||
ResponseException e = expectThrows(ResponseException.class,
|
||||
() -> client().performRequest("delete", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId));
|
||||
response = e.getResponse();
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
|
||||
assertThat(responseEntityToString(response), containsString("Cannot delete datafeed [" + datafeedId
|
||||
+ "] while its status is started"));
|
||||
|
||||
response = client().performRequest("delete",
|
||||
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "?force=true");
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
|
||||
|
||||
expectThrows(ResponseException.class,
|
||||
() -> client().performRequest("get", "/_xpack/ml/datafeeds/" + datafeedId));
|
||||
}
|
||||
|
||||
private class LookbackOnlyTestHelper {
|
||||
private String jobId;
|
||||
private String dataIndex;
|
||||
|
|
|
@ -119,6 +119,7 @@ cluster:monitor/xpack/ml/job/results/buckets/get
|
|||
cluster:monitor/xpack/ml/job/model_snapshots/get
|
||||
cluster:monitor/xpack/ml/job/results/records/get
|
||||
cluster:monitor/xpack/ml/job/results/influencers/get
|
||||
cluster:internal/xpack/ml/datafeed/isolate
|
||||
cluster:admin/xpack/ml/datafeeds/preview
|
||||
cluster:admin/xpack/ml/datafeeds/put
|
||||
cluster:admin/xpack/ml/datafeeds/update
|
||||
|
|
|
@ -10,6 +10,13 @@
|
|||
"required": true,
|
||||
"description": "The ID of the datafeed to delete"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
"force": {
|
||||
"type": "boolean",
|
||||
"required": false,
|
||||
"description": "True if the datafeed should be forcefully deleted"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
|
|
|
@ -300,3 +300,22 @@ setup:
|
|||
xpack.ml.delete_datafeed:
|
||||
datafeed_id: test-datafeed-1
|
||||
- match: { acknowledged: true }
|
||||
|
||||
---
|
||||
"Test force delete datafeed":
|
||||
- do:
|
||||
xpack.ml.put_datafeed:
|
||||
datafeed_id: test-datafeed-1
|
||||
body: >
|
||||
{
|
||||
"job_id":"datafeeds-crud-1",
|
||||
"indexes":["index-foo"],
|
||||
"types":["type-bar"]
|
||||
}
|
||||
- match: { datafeed_id: "test-datafeed-1" }
|
||||
|
||||
- do:
|
||||
xpack.ml.delete_datafeed:
|
||||
datafeed_id: test-datafeed-1
|
||||
force: true
|
||||
- match: { acknowledged: true }
|
||||
|
|
Loading…
Reference in New Issue