HLRC for delete expired data by job Id (#57722) (#57975)

High level rest client changes for #57337
This commit is contained in:
David Kyle 2020-06-12 09:44:17 +01:00 committed by GitHub
parent c8031c6f99
commit 39020f3900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 64 additions and 26 deletions

View File

@ -171,6 +171,7 @@ final class MLRequestConverters {
String endpoint = new EndpointBuilder() String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml") .addPathPartAsIs("_ml")
.addPathPartAsIs("_delete_expired_data") .addPathPartAsIs("_delete_expired_data")
.addPathPart(deleteExpiredDataRequest.getJobId())
.build(); .build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE)); request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE));

View File

@ -34,16 +34,21 @@ public class DeleteExpiredDataRequest extends ActionRequest implements ToXConten
static final String REQUESTS_PER_SECOND = "requests_per_second"; static final String REQUESTS_PER_SECOND = "requests_per_second";
static final String TIMEOUT = "timeout"; static final String TIMEOUT = "timeout";
static final String JOB_ID = "job_id";
private final String jobId;
private final Float requestsPerSecond; private final Float requestsPerSecond;
private final TimeValue timeout; private final TimeValue timeout;
/** /**
* Create a new request to delete expired data * Create a new request to delete expired data
*/ */
public DeleteExpiredDataRequest() { public DeleteExpiredDataRequest() {
this(null, null); this(null, null, null);
} }
public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) { public DeleteExpiredDataRequest(String jobId, Float requestsPerSecond, TimeValue timeout) {
this.jobId = jobId;
this.requestsPerSecond = requestsPerSecond; this.requestsPerSecond = requestsPerSecond;
this.timeout = timeout; this.timeout = timeout;
} }
@ -68,13 +73,24 @@ public class DeleteExpiredDataRequest extends ActionRequest implements ToXConten
return timeout; return timeout;
} }
/**
* The optional job id
*
* The default is `null` meaning all jobs.
* @return The job id or null
*/
public String getJobId() {
return jobId;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o; DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o;
return Objects.equals(requestsPerSecond, that.requestsPerSecond) && return Objects.equals(requestsPerSecond, that.requestsPerSecond) &&
Objects.equals(timeout, that.timeout); Objects.equals(timeout, that.timeout) &&
Objects.equals(jobId, that.jobId);
} }
@Override @Override
@ -83,12 +99,15 @@ public class DeleteExpiredDataRequest extends ActionRequest implements ToXConten
} }
public int hashCode() { public int hashCode() {
return Objects.hash(requestsPerSecond, timeout); return Objects.hash(requestsPerSecond, timeout, jobId);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
if (jobId != null) {
builder.field(JOB_ID, jobId);
}
if (requestsPerSecond != null) { if (requestsPerSecond != null) {
builder.field(REQUESTS_PER_SECOND, requestsPerSecond); builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
} }

View File

@ -216,14 +216,23 @@ public class MLRequestConvertersTests extends ESTestCase {
public void testDeleteExpiredData() throws Exception { public void testDeleteExpiredData() throws Exception {
float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false); float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false);
String jobId = randomBoolean() ? null : randomAlphaOfLength(8);
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest( DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(
jobId,
requestsPerSec, requestsPerSec,
TimeValue.timeValueHours(1)); TimeValue.timeValueHours(1));
Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest); Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/_delete_expired_data", request.getEndpoint());
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request)); String expectedPath = jobId == null ? "/_ml/_delete_expired_data" : "/_ml/_delete_expired_data/" + jobId;
assertEquals(expectedPath, request.getEndpoint());
if (jobId == null) {
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request));
} else {
assertEquals("{\"job_id\":\"" + jobId + "\",\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}",
requestEntityToString(request));
}
} }
public void testDeleteJob() { public void testDeleteJob() {
@ -946,7 +955,7 @@ public class MLRequestConvertersTests extends ESTestCase {
} }
} }
public void testGetFilter() throws IOException { public void testGetFilter() {
String id = randomAlphaOfLength(10); String id = randomAlphaOfLength(10);
GetFiltersRequest getFiltersRequest = new GetFiltersRequest(); GetFiltersRequest getFiltersRequest = new GetFiltersRequest();

View File

@ -2037,8 +2037,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
{ {
// tag::delete-expired-data-request // tag::delete-expired-data-request
DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1> DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1>
1000.0f, // <2> null, // <2>
TimeValue.timeValueHours(12) // <3> 1000.0f, // <3>
TimeValue.timeValueHours(12) // <4>
); );
// end::delete-expired-data-request // end::delete-expired-data-request

View File

@ -33,9 +33,11 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<Dele
private static ConstructingObjectParser<DeleteExpiredDataRequest, Void> PARSER = new ConstructingObjectParser<>( private static ConstructingObjectParser<DeleteExpiredDataRequest, Void> PARSER = new ConstructingObjectParser<>(
"delete_expired_data_request", "delete_expired_data_request",
true, true,
(a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1]) (a) -> new DeleteExpiredDataRequest((String) a[0], (Float) a[1], (TimeValue) a[2])
); );
static { static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(),
new ParseField(DeleteExpiredDataRequest.JOB_ID));
PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(), PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(),
new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND)); new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND));
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
@ -46,7 +48,9 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<Dele
@Override @Override
protected DeleteExpiredDataRequest createTestInstance() { protected DeleteExpiredDataRequest createTestInstance() {
return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(), return new DeleteExpiredDataRequest(
randomBoolean() ? null : randomAlphaOfLength(6),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")); randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test"));
} }

View File

@ -21,9 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments.
include-tagged::{doc-tests-file}[{api}-request] include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------- ---------------------------------------------------
<1> Constructing a new request. <1> Constructing a new request.
<2> Providing requests per second throttling for the <2> Optionally set a job ID. Use `null` for the default wild card all `*`.
<3> Providing requests per second throttling for the
deletion processes. Default is no throttling. deletion processes. Default is no throttling.
<3> Setting how long the deletion processes will be allowed <4> Setting how long the deletion processes will be allowed
to run before they are canceled. Default value is `8h` (8 hours). to run before they are canceled. Default value is `8h` (8 hours).
[id="{upid}-{api}-response"] [id="{upid}-{api}-response"]

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -20,6 +19,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.io.IOException; import java.io.IOException;
@ -51,9 +51,17 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
PARSER.declareString(Request::setJobId, Job.ID); PARSER.declareString(Request::setJobId, Job.ID);
} }
public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (jobId != null) {
request.jobId = jobId;
}
return request;
}
private Float requestsPerSecond; private Float requestsPerSecond;
private TimeValue timeout; private TimeValue timeout;
private String jobId = Metadata.ALL; private String jobId;
public Request() {} public Request() {}
@ -72,7 +80,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
this.timeout = null; this.timeout = null;
} }
if (in.getVersion().onOrAfter(Version.V_7_9_0)) { if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
jobId = in.readString(); jobId = in.readOptionalString();
} }
} }
@ -136,7 +144,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
out.writeOptionalTimeValue(timeout); out.writeOptionalTimeValue(timeout);
} }
if (out.getVersion().onOrAfter(Version.V_7_9_0)) { if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeString(jobId); out.writeOptionalString(jobId);
} }
} }
} }

View File

@ -114,8 +114,6 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
List<MlDataRemover> dataRemovers, List<MlDataRemover> dataRemovers,
ActionListener<DeleteExpiredDataAction.Response> listener, ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) { Supplier<Boolean> isTimedOutSupplier) {
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
// If there is no throttle provided, default to none // If there is no throttle provided, default to none
float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond(); float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond();

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.rest; package org.elasticsearch.xpack.ml.rest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.rest.action.RestToXContentListener;
@ -44,11 +43,14 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
DeleteExpiredDataAction.Request request; DeleteExpiredDataAction.Request request;
if (restRequest.hasContent()) { if (restRequest.hasContent()) {
request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null); request = DeleteExpiredDataAction.Request.parseRequest(jobId, restRequest.contentParser());
} else { } else {
request = new DeleteExpiredDataAction.Request(); request = new DeleteExpiredDataAction.Request();
request.setJobId(jobId);
String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName()); String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName());
if (perSecondParam != null) { if (perSecondParam != null) {
@ -67,11 +69,6 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
} }
} }
String jobId = restRequest.param(Job.ID.getPreferredName());
if (Strings.isNullOrEmpty(jobId) == false) {
request.setJobId(jobId);
}
return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }
} }