[7.x] [ML][Data Frame] adding force delete (#44590) (#44696)

* [ML][Data Frame] adding force delete (#44590)

* [ML][Data Frame] adding force delete

* Update delete-transform.asciidoc

* adjusting for backport
This commit is contained in:
Benjamin Trent 2019-07-22 13:13:25 -05:00 committed by GitHub
parent 3714cb63da
commit 06e21f7902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 155 additions and 25 deletions

View File

@ -37,6 +37,7 @@ import java.io.IOException;
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.createEntity;
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;
final class DataFrameRequestConverters {
@ -71,12 +72,16 @@ final class DataFrameRequestConverters {
return request;
}
static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest request) {
static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest deleteRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(request.getId())
.addPathPart(deleteRequest.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
if (deleteRequest.getForce() != null) {
request.addParameter(FORCE, Boolean.toString(deleteRequest.getForce()));
}
return request;
}
static Request startDataFrameTransform(StartDataFrameTransformRequest startRequest) {

View File

@ -31,7 +31,10 @@ import java.util.Optional;
*/
public class DeleteDataFrameTransformRequest implements Validatable {
public static final String FORCE = "force";
private final String id;
private Boolean force;
public DeleteDataFrameTransformRequest(String id) {
this.id = id;
@ -41,6 +44,14 @@ public class DeleteDataFrameTransformRequest implements Validatable {
return id;
}
public Boolean getForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
@ -54,7 +65,7 @@ public class DeleteDataFrameTransformRequest implements Validatable {
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}
@Override
@ -67,6 +78,6 @@ public class DeleteDataFrameTransformRequest implements Validatable {
return false;
}
DeleteDataFrameTransformRequest other = (DeleteDataFrameTransformRequest) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && Objects.equals(force, other.force);
}
}

View File

@ -50,6 +50,8 @@ import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.AL
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
public class DataFrameRequestConvertersTests extends ESTestCase {
@ -81,6 +83,13 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo"));
assertThat(request.getParameters(), not(hasKey("force")));
deleteRequest.setForce(true);
request = DataFrameRequestConverters.deleteDataFrameTransform(deleteRequest);
assertThat(request.getParameters(), hasEntry("force", "true"));
}
public void testStartDataFrameTransform() {

View File

@ -374,6 +374,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
// tag::delete-data-frame-transform-request
DeleteDataFrameTransformRequest request =
new DeleteDataFrameTransformRequest("mega-transform"); // <1>
request.setForce(false); // <2>
// end::delete-data-frame-transform-request
// tag::delete-data-frame-transform-execute

View File

@ -16,6 +16,9 @@ A +{request}+ object requires a non-null `id`.
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new request referencing an existing {dataframe-transform}
<2> Sets the optional argument `force`. When `true`, the {dataframe-transform}
is deleted regardless of its current state. The default value is `false`,
meaning that only `stopped` {dataframe-transforms} can be deleted.
include::../execution.asciidoc[]

View File

@ -34,6 +34,13 @@ see {stack-ov}/security-privileges.html[Security privileges] and
`<data_frame_transform_id>`::
(Required, string) Identifier for the {dataframe-transform}.
[[delete-data-frame-transform-query-parms]]
==== {api-query-parms-title}
`force`::
(Optional, boolean) When `true`, the {dataframe-transform} is deleted regardless of its
current state. The default value is `false`, meaning that the {dataframe-transform} must be
`stopped` before it can be deleted.
[[delete-data-frame-transform-examples]]
==== {api-examples-title}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -27,25 +28,39 @@ public class DeleteDataFrameTransformAction extends ActionType<AcknowledgedRespo
}
public static class Request extends MasterNodeRequest<Request> {
private String id;
private final String id;
private final boolean force;
public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
force = in.readBoolean();
} else {
force = false;
}
}
public String getId() {
return id;
}
public boolean isForce() {
return force;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeBoolean(force);
}
}
@Override
@ -55,7 +70,7 @@ public class DeleteDataFrameTransformAction extends ActionType<AcknowledgedRespo
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}
@Override
@ -68,7 +83,7 @@ public class DeleteDataFrameTransformAction extends ActionType<AcknowledgedRespo
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && force == other.force;
}
}
}

View File

@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAct
public class DeleteDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -23,24 +24,31 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.io.IOException;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameAuditor auditor;
private final Client client;
@Inject
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) {
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor,
Client client) {
super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
Request::new, indexNameExpressionResolver);
this.transformsConfigManager = transformsConfigManager;
this.auditor = auditor;
this.client = client;
}
@Override
@ -55,19 +63,34 @@ public class TransportDeleteDataFrameTransformAction extends TransportMasterNode
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
ActionListener<AcknowledgedResponse> listener) {
final PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null && request.isForce() == false) {
listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() +
"] as the task is running. Stop the task first", RestStatus.CONFLICT));
} else {
// Task is not running, delete the configuration document
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
r -> {
auditor.info(request.getId(), "Deleted data frame transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
listener::onFailure));
ActionListener<Void> stopTransformActionListener = ActionListener.wrap(
stopResponse -> transformsConfigManager.deleteTransform(request.getId(),
ActionListener.wrap(
r -> {
auditor.info(request.getId(), "Deleted data frame transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
listener::onFailure)),
listener::onFailure
);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
executeAsyncWithOrigin(client,
DATA_FRAME_ORIGIN,
StopDataFrameTransformAction.INSTANCE,
new StopDataFrameTransformAction.Request(request.getId(), true, true, null, true),
ActionListener.wrap(
r -> stopTransformActionListener.onResponse(null),
stopTransformActionListener::onFailure));
} else {
stopTransformActionListener.onResponse(null);
}
}
}

View File

@ -15,8 +15,6 @@ import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import java.io.IOException;
public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
public RestDeleteDataFrameTransformAction(Settings settings, RestController controller) {
@ -25,13 +23,14 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("delete data frame transforms requests can not have a request body");
}
String id = restRequest.param(DataFrameField.ID.getPreferredName());
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);
boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id, force);
return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request,
new RestToXContentListener<>(channel));

View File

@ -13,6 +13,13 @@
"required": true,
"description": "The id of the transform to delete"
}
},
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "When `true`, the transform is deleted regardless of its current state. The default value is `false`, meaning that the transform must be `stopped` before it can be deleted."
}
}
},
"body": null

View File

@ -558,3 +558,47 @@ setup:
"description": "yaml test transform on airline-data",
"version": "7.3.0"
}
---
"Test force deleting a running transform":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-start-delete"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-start-delete" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": {
"time": {
"field": "time",
"delay": "90m"
}
}
}
- match: { acknowledged: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-delete"
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-delete"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-delete" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }
- do:
catch: /Cannot delete data frame \[airline-transform-start-delete\] as the task is running/
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-delete"
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-delete"
force: true
- match: { acknowledged: true }

View File

@ -141,6 +141,12 @@ teardown:
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": {
"time": {
"field": "time",
"delay": "90m"
}
}
}