[7.x][ML] Implement force deleting a data frame analytics job (#50553) (#50589)

Adds a `force` parameter to the delete data frame analytics
request. When `force` is `true`, the action force-stops the
jobs and then proceeds to the deletion. This can be used in
order to delete a non-stopped job with a single request.

Closes #48124

Backport of #50553
This commit is contained in:
Dimitris Athanasiou 2020-01-03 13:46:02 +02:00 committed by GitHub
parent 5655d6a1c1
commit ca0828ba07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 130 additions and 36 deletions

View File

@ -29,8 +29,6 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
import org.elasticsearch.client.ml.DeleteCalendarRequest;
@ -41,7 +39,9 @@ import org.elasticsearch.client.ml.DeleteFilterRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.FindFileStructureRequest;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.ForecastJobRequest;
@ -692,7 +692,16 @@ final class MLRequestConverters {
.addPathPartAsIs("_ml", "data_frame", "analytics")
.addPathPart(deleteRequest.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params();
if (deleteRequest.getForce() != null) {
params.putParam("force", Boolean.toString(deleteRequest.getForce()));
}
request.addParameters(params.asMap());
return request;
}
static Request evaluateDataFrame(EvaluateDataFrameRequest evaluateRequest) throws IOException {

View File

@ -31,6 +31,7 @@ import java.util.Optional;
public class DeleteDataFrameAnalyticsRequest implements Validatable {
private final String id;
private Boolean force;
public DeleteDataFrameAnalyticsRequest(String id) {
this.id = id;
@ -40,6 +41,20 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
return id;
}
public Boolean getForce() {
return force;
}
/**
* Used to forcefully delete an job that is not stopped.
* This method is quicker than stopping and deleting the job.
*
* @param force When {@code true} forcefully delete a non stopped job. Defaults to {@code false}
*/
public void setForce(Boolean force) {
this.force = force;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
@ -54,11 +69,11 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
if (o == null || getClass() != o.getClass()) return false;
DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && Objects.equals(force, other.force);
}
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}
}

View File

@ -25,8 +25,6 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
import org.elasticsearch.client.ml.DeleteCalendarRequest;
@ -37,8 +35,10 @@ import org.elasticsearch.client.ml.DeleteFilterRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameRequestTests;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.FindFileStructureRequest;
import org.elasticsearch.client.ml.FindFileStructureRequestTests;
import org.elasticsearch.client.ml.FlushJobRequest;
@ -778,6 +778,15 @@ public class MLRequestConvertersTests extends ESTestCase {
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
assertNull(request.getEntity());
assertThat(request.getParameters().isEmpty(), is(true));
deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10));
deleteRequest.setForce(true);
request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
assertNull(request.getEntity());
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}
public void testEvaluateDataFrame() throws IOException {

View File

@ -1646,8 +1646,11 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
machineLearningClient::getDataFrameAnalytics, machineLearningClient::getDataFrameAnalyticsAsync);
assertThat(getDataFrameAnalyticsResponse.getAnalytics(), hasSize(1));
AcknowledgedResponse deleteDataFrameAnalyticsResponse = execute(
new DeleteDataFrameAnalyticsRequest(configId),
DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(configId);
if (randomBoolean()) {
deleteRequest.setForce(randomBoolean());
}
AcknowledgedResponse deleteDataFrameAnalyticsResponse = execute(deleteRequest,
machineLearningClient::deleteDataFrameAnalytics, machineLearningClient::deleteDataFrameAnalyticsAsync);
assertTrue(deleteDataFrameAnalyticsResponse.isAcknowledged());

View File

@ -36,9 +36,6 @@ import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
import org.elasticsearch.client.ml.DeleteCalendarRequest;
@ -51,8 +48,11 @@ import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameResponse;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.FindFileStructureRequest;
import org.elasticsearch.client.ml.FindFileStructureResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
@ -3065,6 +3065,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1>
// end::delete-data-frame-analytics-request
//tag::delete-data-frame-analytics-request-force
request.setForce(false); // <1>
//end::delete-data-frame-analytics-request-force
// tag::delete-data-frame-analytics-execute
AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT);
// end::delete-data-frame-analytics-execute

View File

@ -21,6 +21,17 @@ include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new request referencing an existing {dfanalytics-job}.
==== Optional arguments
The following arguments are optional:
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-force]
---------------------------------------------------
<1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping
and deleting the job. Defaults to `false`.
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]

View File

@ -21,7 +21,7 @@ experimental[]
[[ml-delete-dfanalytics-prereq]]
==== {api-prereq-title}
* You must have `machine_learning_admin` built-in role to use this API. For more
* You must have `machine_learning_admin` built-in role to use this API. For more
information, see <<security-privileges>> and <<built-in-roles>>.
@ -32,6 +32,13 @@ information, see <<security-privileges>> and <<built-in-roles>>.
(Required, string)
include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
[[ml-delete-dfanalytics-query-params]]
==== {api-query-parms-title}
`force`::
(Optional, boolean) If `true`, it deletes a job that is not stopped; this method is
quicker than stopping and deleting the job.
[[ml-delete-dfanalytics-example]]
==== {api-examples-title}

View File

@ -5,16 +5,16 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -30,13 +30,21 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContentFragment {
public static class Request extends AcknowledgedRequest<Request> {
public static final ParseField FORCE = new ParseField("force");
private String id;
private boolean force;
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
force = in.readBoolean();
} else {
force = false;
}
}
public Request() {}
@ -49,15 +57,17 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
return id;
}
@Override
public ActionRequestValidationException validate() {
return null;
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
return builder;
public ActionRequestValidationException validate() {
return null;
}
@Override
@ -65,18 +75,21 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteDataFrameAnalyticsAction.Request request = (DeleteDataFrameAnalyticsAction.Request) o;
return Objects.equals(id, request.id);
return Objects.equals(id, request.id) && force == request.force;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeBoolean(force);
}
}
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}
}

View File

@ -6,9 +6,7 @@
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -25,13 +23,6 @@ public class KillProcessAction extends ActionType<KillProcessAction.Response> {
super(NAME, KillProcessAction.Response::new);
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
RequestBuilder(ElasticsearchClient client, KillProcessAction action) {
super(client, action, new Request());
}
}
public static class Request extends JobTaskRequest<Request> {
public Request(String jobId) {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@ -106,6 +107,32 @@ public class TransportDeleteDataFrameAnalyticsAction
protected void masterOperation(Task task, DeleteDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
String id = request.getId();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
if (request.isForce()) {
forceDelete(parentTaskClient, id, listener);
} else {
normalDelete(parentTaskClient, state, id, listener);
}
}
private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id,
ActionListener<AcknowledgedResponse> listener) {
logger.debug("[{}] Force deleting data frame analytics job", id);
ActionListener<StopDataFrameAnalyticsAction.Response> stopListener = ActionListener.wrap(
stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener),
listener::onFailure
);
StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
request.setForce(true);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener);
}
private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id,
ActionListener<AcknowledgedResponse> listener) {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks);
if (taskState != DataFrameAnalyticsState.STOPPED) {
@ -114,9 +141,6 @@ public class TransportDeleteDataFrameAnalyticsAction
return;
}
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
// We clean up the memory tracker on delete because there is no stop; the task stops by itself
memoryTracker.removeDataFrameAnalyticsJob(id);

View File

@ -32,6 +32,7 @@ public class RestDeleteDataFrameAnalyticsAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -19,6 +19,13 @@
}
}
]
},
"params":{
"force":{
"type":"boolean",
"description":"True if the job should be forcefully deleted",
"default":false
}
}
}
}