[ML] Data Frame HLRC start & stop APIs (#40197)

This commit is contained in:
David Kyle 2019-03-19 13:30:01 +00:00 committed by GitHub
parent 8dc6862b17
commit 387648065d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1067 additions and 4 deletions

View File

@ -23,6 +23,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import java.io.IOException;
import java.util.Collections;
@ -115,4 +119,85 @@ public final class DataFrameClient {
listener,
Collections.emptySet());
}
/**
* Start a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
*
* @param request The start data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return A response object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StartDataFrameTransformResponse startDataFrameTransform(StartDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::startDataFrameTransform,
options,
StartDataFrameTransformResponse::fromXContent,
Collections.emptySet());
}
/**
* Start a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
*
* @param request The start data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void startDataFrameTransformAsync(StartDataFrameTransformRequest request, RequestOptions options,
ActionListener<StartDataFrameTransformResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::startDataFrameTransform,
options,
StartDataFrameTransformResponse::fromXContent,
listener,
Collections.emptySet());
}
/**
* Stop a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
*
* @param request The stop data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return A response object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StopDataFrameTransformResponse stopDataFrameTransform(StopDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::stopDataFrameTransform,
options,
StopDataFrameTransformResponse::fromXContent,
Collections.emptySet());
}
/**
* Stop a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
*
* @param request The stop data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void stopDataFrameTransformAsync(StopDataFrameTransformRequest request, RequestOptions options,
ActionListener<StopDataFrameTransformResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::stopDataFrameTransform,
options,
StopDataFrameTransformResponse::fromXContent,
listener,
Collections.emptySet());
}
}

View File

@ -20,9 +20,12 @@
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import java.io.IOException;
@ -50,4 +53,35 @@ final class DataFrameRequestConverters {
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
}
static Request startDataFrameTransform(StartDataFrameTransformRequest startRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(startRequest.getId())
.addPathPartAsIs("_start")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (startRequest.getTimeout() != null) {
params.withTimeout(startRequest.getTimeout());
}
return request;
}
static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(stopRequest.getId())
.addPathPartAsIs("_stop")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (stopRequest.getWaitForCompletion() != null) {
params.withWaitForCompletion(stopRequest.getWaitForCompletion());
}
if (stopRequest.getTimeout() != null) {
params.withTimeout(stopRequest.getTimeout());
}
return request;
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.core;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class AcknowledgedTasksResponse {
protected static final ParseField TASK_FAILURES = new ParseField("task_failures");
protected static final ParseField NODE_FAILURES = new ParseField("node_failures");
@SuppressWarnings("unchecked")
protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<T, Void> generateParser(
String name,
TriFunction<Boolean, List<TaskOperationFailure>, List<? extends ElasticsearchException>, T> ctor,
String ackFieldName) {
ConstructingObjectParser<T, Void> parser = new ConstructingObjectParser<>(name, true,
args -> ctor.apply((boolean) args[0], (List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
parser.declareBoolean(constructorArg(), new ParseField(ackFieldName));
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), TASK_FAILURES);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), NODE_FAILURES);
return parser;
}
private boolean acknowledged;
private List<TaskOperationFailure> taskFailures;
private List<ElasticsearchException> nodeFailures;
public AcknowledgedTasksResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
this.acknowledged = acknowledged;
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
}
public boolean isAcknowledged() {
return acknowledged;
}
public List<TaskOperationFailure> getTaskFailures() {
return taskFailures;
}
public List<ElasticsearchException> getNodeFailures() {
return nodeFailures;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
AcknowledgedTasksResponse other = (AcknowledgedTasksResponse) obj;
return acknowledged == other.acknowledged
&& taskFailures.equals(other.taskFailures)
&& nodeFailures.equals(other.nodeFailures);
}
@Override
public int hashCode() {
return Objects.hash(acknowledged, taskFailures, nodeFailures);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Objects;
import java.util.Optional;
public class StartDataFrameTransformRequest implements Validatable {
private final String id;
private TimeValue timeout;
public StartDataFrameTransformRequest(String id) {
this.id = id;
}
public StartDataFrameTransformRequest(String id, TimeValue timeout) {
this.id = id;
this.timeout = timeout;
}
public String getId() {
return id;
}
public TimeValue getTimeout() {
return timeout;
}
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
ValidationException validationException = new ValidationException();
validationException.addValidationError("data frame transform id must not be null");
return Optional.of(validationException);
} else {
return Optional.empty();
}
}
@Override
public int hashCode() {
return Objects.hash(id, timeout);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
StartDataFrameTransformRequest other = (StartDataFrameTransformRequest) obj;
return Objects.equals(this.id, other.id)
&& Objects.equals(this.timeout, other.timeout);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.core.AcknowledgedTasksResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
public class StartDataFrameTransformResponse extends AcknowledgedTasksResponse {
private static final String STARTED = "started";
private static final ConstructingObjectParser<StartDataFrameTransformResponse, Void> PARSER =
AcknowledgedTasksResponse.generateParser("start_data_frame_transform_response", StartDataFrameTransformResponse::new, STARTED);
public static StartDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public StartDataFrameTransformResponse(boolean started, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
super(started, taskFailures, nodeFailures);
}
public boolean isStarted() {
return isAcknowledged();
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Objects;
import java.util.Optional;
public class StopDataFrameTransformRequest implements Validatable {
private final String id;
private Boolean waitForCompletion;
private TimeValue timeout;
public StopDataFrameTransformRequest(String id) {
this.id = id;
waitForCompletion = null;
timeout = null;
}
public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
this.id = id;
this.waitForCompletion = waitForCompletion;
this.timeout = timeout;
}
public String getId() {
return id;
}
public void setWaitForCompletion(Boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}
public Boolean getWaitForCompletion() {
return waitForCompletion;
}
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}
public TimeValue getTimeout() {
return timeout;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
ValidationException validationException = new ValidationException();
validationException.addValidationError("data frame transform id must not be null");
return Optional.of(validationException);
} else {
return Optional.empty();
}
}
@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, timeout);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
StopDataFrameTransformRequest other = (StopDataFrameTransformRequest) obj;
return Objects.equals(this.id, other.id)
&& Objects.equals(this.waitForCompletion, other.waitForCompletion)
&& Objects.equals(this.timeout, other.timeout);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.core.AcknowledgedTasksResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
public class StopDataFrameTransformResponse extends AcknowledgedTasksResponse {
private static final String STOPPED = "stopped";
private static final ConstructingObjectParser<StopDataFrameTransformResponse, Void> PARSER =
AcknowledgedTasksResponse.generateParser("stop_data_frame_transform_response", StopDataFrameTransformResponse::new, STOPPED);
public static StopDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public StopDataFrameTransformResponse(boolean stopped, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
super(stopped, taskFailures, nodeFailures);
}
public boolean isStopped() {
return isAcknowledged();
}
}

View File

@ -20,12 +20,16 @@
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -66,4 +70,56 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo"));
}
public void testStartDataFrameTransform() {
String id = randomAlphaOfLength(10);
TimeValue timeValue = null;
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id, timeValue);
Request request = DataFrameRequestConverters.startDataFrameTransform(startRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + startRequest.getId() + "/_start"));
if (timeValue != null) {
assertTrue(request.getParameters().containsKey("timeout"));
assertEquals(startRequest.getTimeout(), TimeValue.parseTimeValue(request.getParameters().get("timeout"), "timeout"));
} else {
assertFalse(request.getParameters().containsKey("timeout"));
}
}
public void testStopDataFrameTransform() {
String id = randomAlphaOfLength(10);
Boolean waitForCompletion = null;
if (randomBoolean()) {
waitForCompletion = randomBoolean();
}
TimeValue timeValue = null;
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue);
Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + stopRequest.getId() + "/_stop"));
if (waitForCompletion != null) {
assertTrue(request.getParameters().containsKey("wait_for_completion"));
assertEquals(stopRequest.getWaitForCompletion(), Boolean.parseBoolean(request.getParameters().get("wait_for_completion")));
} else {
assertFalse(request.getParameters().containsKey("wait_for_completion"));
}
if (timeValue != null) {
assertTrue(request.getParameters().containsKey("timeout"));
assertEquals(stopRequest.getTimeout(), TimeValue.parseTimeValue(request.getParameters().get("timeout"), "timeout"));
} else {
assertFalse(request.getParameters().containsKey("timeout"));
}
}
}

View File

@ -23,6 +23,10 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
@ -41,6 +45,7 @@ import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
@ -96,5 +101,41 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
client::deleteDataFrameTransformAsync));
assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
}
public void testStartStop() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
String id = "test-stop-start";
DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
DataFrameClient client = highLevelClient().dataFrame();
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
client::putDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id);
StartDataFrameTransformResponse startResponse =
execute(startRequest, client::startDataFrameTransform, client::startDataFrameTransformAsync);
assertTrue(startResponse.isStarted());
assertThat(startResponse.getNodeFailures(), empty());
assertThat(startResponse.getTaskFailures(), empty());
// TODO once get df stats is implemented assert the df has started
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isStopped());
assertThat(stopResponse.getNodeFailures(), empty());
assertThat(stopResponse.getTaskFailures(), empty());
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.core;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.BiPredicate;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
import static org.hamcrest.Matchers.containsString;
public class AcknowledgedTasksResponseTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
this::createTestInstance,
AcknowledgedTasksResponseTests::toXContent,
AcknowledgedTasksResponseTests::fromXContent)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(false)
.supportsUnknownFields(false)
.test();
}
// Serialisation of TaskOperationFailure and ElasticsearchException changes
// the object so use a custom compare method rather than Object.equals
private void assertEqualInstances(AcknowledgedTasksResponse expected, AcknowledgedTasksResponse actual) {
assertNotSame(expected, actual);
assertEquals(expected.isAcknowledged(), actual.isAcknowledged());
List<TaskOperationFailure> expectedTaskFailures = expected.getTaskFailures();
List<TaskOperationFailure> actualTaskFailures = actual.getTaskFailures();
assertListEquals(expectedTaskFailures, actualTaskFailures, (a, b) ->
Objects.equals(a.getNodeId(), b.getNodeId())
&& Objects.equals(a.getTaskId(), b.getTaskId())
&& Objects.equals(a.getStatus(), b.getStatus())
);
List<ElasticsearchException> expectedExceptions = expected.getNodeFailures();
List<ElasticsearchException> actualExceptions = actual.getNodeFailures();
// actualException is a wrapped copy of expectedException so the
// error messages won't be the same but actualException should contain
// the error message from expectedException
assertListEquals(expectedExceptions, actualExceptions, (expectedException, actualException) -> {
assertThat(actualException.getDetailedMessage(), containsString(expectedException.getMessage()));
return true;
});
}
private <T> void assertListEquals(List<T> expected, List<T> actual, BiPredicate<T, T> comparator) {
if (expected == null) {
assertNull(actual);
return;
} else {
assertNotNull(actual);
}
assertEquals(expected.size(), actual.size());
for (int i=0; i<expected.size(); i++) {
assertTrue(comparator.test(expected.get(i), actual.get(i)));
}
}
private static AcknowledgedTasksResponse fromXContent(XContentParser parser) {
return AcknowledgedTasksResponse.generateParser("ack_tasks_response",
AcknowledgedTasksResponse::new, "acknowleged")
.apply(parser, null);
}
private AcknowledgedTasksResponse createTestInstance() {
List<TaskOperationFailure> taskFailures = null;
if (randomBoolean()) {
taskFailures = new ArrayList<>();
int numTaskFailures = randomIntBetween(1, 4);
for (int i=0; i<numTaskFailures; i++) {
taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(4), randomNonNegativeLong(), new IllegalStateException()));
}
}
List<ElasticsearchException> nodeFailures = null;
if (randomBoolean()) {
nodeFailures = new ArrayList<>();
int numNodeFailures = randomIntBetween(1, 4);
for (int i=0; i<numNodeFailures; i++) {
nodeFailures.add(new ElasticsearchException("AcknowledgedTasksResponseTest"));
}
}
return new AcknowledgedTasksResponse(randomBoolean(), taskFailures, nodeFailures);
}
public static void toXContent(AcknowledgedTasksResponse response, XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("acknowleged", response.isAcknowledged());
List<TaskOperationFailure> taskFailures = response.getTaskFailures();
if (taskFailures != null && taskFailures.isEmpty() == false) {
builder.startArray(AcknowledgedTasksResponse.TASK_FAILURES.getPreferredName());
for (TaskOperationFailure failure : taskFailures) {
builder.startObject();
failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
builder.endArray();
}
List<ElasticsearchException> nodeFailures = response.getNodeFailures();
if (nodeFailures != null && nodeFailures.isEmpty() == false) {
builder.startArray(AcknowledgedTasksResponse.NODE_FAILURES.getPreferredName());
for (ElasticsearchException failure : nodeFailures) {
builder.startObject();
failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
builder.endArray();
}
}
builder.endObject();
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.test.ESTestCase;
import java.util.Optional;
import static org.hamcrest.Matchers.containsString;
public class StartDataFrameTransformRequestTests extends ESTestCase {
public void testValidate_givenNullId() {
StartDataFrameTransformRequest request = new StartDataFrameTransformRequest(null, null);
Optional<ValidationException> validate = request.validate();
assertTrue(validate.isPresent());
assertThat(validate.get().getMessage(), containsString("data frame transform id must not be null"));
}
public void testValidate_givenValid() {
StartDataFrameTransformRequest request = new StartDataFrameTransformRequest("foo", null);
Optional<ValidationException> validate = request.validate();
assertFalse(validate.isPresent());
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.test.ESTestCase;
import java.util.Optional;
import static org.hamcrest.Matchers.containsString;
public class StopDataFrameTransformRequestTests extends ESTestCase {
public void testValidate_givenNullId() {
StopDataFrameTransformRequest request = new StopDataFrameTransformRequest(null);
Optional<ValidationException> validate = request.validate();
assertTrue(validate.isPresent());
assertThat(validate.get().getMessage(), containsString("data frame transform id must not be null"));
}
public void testValidate_givenValid() {
StopDataFrameTransformRequest request = new StopDataFrameTransformRequest("foo");
Optional<ValidationException> validate = request.validate();
assertFalse(validate.isPresent());
}
}

View File

@ -27,6 +27,10 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
@ -35,13 +39,17 @@ import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -49,6 +57,22 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTestCase {
private List<String> transformsToClean = new ArrayList<>();
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().deleteDataFrameTransform(
new DeleteDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
}
transformsToClean = new ArrayList<>();
}
private void createIndex(String indexName) throws IOException {
XContentBuilder builder = jsonBuilder();
@ -152,6 +176,123 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
}
}
public void testStartStop() throws IOException, InterruptedException {
createIndex("source-data");
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform",
"source-data", "pivot-dest", queryConfig, pivotConfig);
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
transformsToClean.add(transformConfig.getId());
{
// tag::start-data-frame-transform-request
StartDataFrameTransformRequest request =
new StartDataFrameTransformRequest("mega-transform"); // <1>
// end::start-data-frame-transform-request
// tag::start-data-frame-transform-request-options
request.setTimeout(TimeValue.timeValueSeconds(20)); // <1>
// end::start-data-frame-transform-request-options
// tag::start-data-frame-transform-execute
StartDataFrameTransformResponse response =
client.dataFrame().startDataFrameTransform(
request, RequestOptions.DEFAULT);
// end::start-data-frame-transform-execute
assertTrue(response.isStarted());
}
{
// tag::stop-data-frame-transform-request
StopDataFrameTransformRequest request =
new StopDataFrameTransformRequest("mega-transform"); // <1>
// end::stop-data-frame-transform-request
// tag::stop-data-frame-transform-request-options
request.setWaitForCompletion(Boolean.TRUE); // <1>
request.setTimeout(TimeValue.timeValueSeconds(30)); // <2>
// end::stop-data-frame-transform-request-options
// tag::stop-data-frame-transform-execute
StopDataFrameTransformResponse response =
client.dataFrame().stopDataFrameTransform(
request, RequestOptions.DEFAULT);
// end::stop-data-frame-transform-execute
assertTrue(response.isStopped());
}
{
// tag::start-data-frame-transform-execute-listener
ActionListener<StartDataFrameTransformResponse> listener =
new ActionListener<StartDataFrameTransformResponse>() {
@Override
public void onResponse(
StartDataFrameTransformResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::start-data-frame-transform-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<StartDataFrameTransformResponse> ackListener = listener;
listener = new LatchedActionListener<>(listener, latch);
StartDataFrameTransformRequest request = new StartDataFrameTransformRequest("mega-transform");
// tag::start-data-frame-transform-execute-async
client.dataFrame().startDataFrameTransformAsync(
request, RequestOptions.DEFAULT, listener); // <1>
// end::start-data-frame-transform-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
{
// tag::stop-data-frame-transform-execute-listener
ActionListener<StopDataFrameTransformResponse> listener =
new ActionListener<StopDataFrameTransformResponse>() {
@Override
public void onResponse(
StopDataFrameTransformResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::stop-data-frame-transform-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<StopDataFrameTransformResponse> ackListener = listener;
listener = new LatchedActionListener<>(listener, latch);
StopDataFrameTransformRequest request = new StopDataFrameTransformRequest("mega-transform");
// tag::stop-data-frame-transform-execute-async
client.dataFrame().stopDataFrameTransformAsync(
request, RequestOptions.DEFAULT, listener); // <1>
// end::stop-data-frame-transform-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testDeleteDataFrameTransform() throws IOException, InterruptedException {
createIndex("source-data");

View File

@ -0,0 +1,37 @@
--
:api: start-data-frame-transform
:request: StartDataFrameTransformRequest
:response: StartDataFrameTransformResponse
--
[id="{upid}-{api}"]
=== Start Data Frame Transform API
Start a {dataframe-job}.
It accepts a +{request}+ object and responds with a +{response}+ object.
[id="{upid}-{api}-request"]
==== Start Data Frame Request
A +{request}+ object requires a non-null `id`.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new start request referencing an existing {dataframe-job}
==== Optional Arguments
The following arguments are optional.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-options]
--------------------------------------------------
<1> Controls the amount of time to wait until the {dataframe-job} starts.
include::../execution.asciidoc[]
==== Response
The returned +{response}+ object acknowledges the {dataframe-job} has started.

View File

@ -0,0 +1,38 @@
--
:api: stop-data-frame-transform
:request: StopDataFrameTransformRequest
:response: StopDataFrameTransformResponse
--
[id="{upid}-{api}"]
=== Stop Data Frame Transform API
Stop a started {dataframe-job}.
It accepts a +{request}+ object and responds with a +{response}+ object.
[id="{upid}-{api}-request"]
==== Stop Data Frame Request
A +{request}+ object requires a non-null `id`.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new stop request referencing an existing {dataframe-job}
==== Optional Arguments
The following arguments are optional.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-options]
--------------------------------------------------
<1> If true wait for the data frame task to stop before responding
<2> Controls the amount of time to wait until the {dataframe-job} stops.
include::../execution.asciidoc[]
==== Response
The returned +{response}+ object acknowledges the {dataframe-job} has stopped.

View File

@ -556,6 +556,10 @@ The Java High Level REST Client supports the following Data Frame APIs:
* <<{upid}-put-data-frame-transform>>
* <<{upid}-delete-data-frame-transform>>
* <<{upid}-start-data-frame-transform>>
* <<{upid}-stop-data-frame-transform>>
include::dataframe/put_data_frame.asciidoc[]
include::dataframe/delete_data_frame.asciidoc[]
include::dataframe/delete_data_frame.asciidoc[]
include::dataframe/start_data_frame.asciidoc[]
include::dataframe/stop_data_frame.asciidoc[]

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.dataframe.rest.action;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
@ -13,7 +14,6 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import java.io.IOException;
@ -27,9 +27,11 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(RollupField.ID.getPreferredName());
String id = restRequest.param(DataFrameField.ID.getPreferredName());
StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id);
if (restRequest.hasParam(DataFrameField.TIMEOUT.getPreferredName())) {
request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
}
return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -11,6 +11,13 @@
"required": true,
"description": "The id of the transform to start"
}
},
"params": {
"timeout": {
"type": "time",
"required": false,
"description": "Controls the time to wait for the transform to start"
}
}
},
"body": null