[ML-Dataframe] Add Data Frame client to the Java HLRC (#40040)

Adds DataFrameClient to the Java HLRC and implements PUT and 
DELETE data frame transform.
This commit is contained in:
David Kyle 2019-03-14 14:57:12 +00:00 committed by GitHub
parent 24973cf464
commit c02f49e9d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 937 additions and 4 deletions

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import java.io.IOException;
import java.util.Collections;
public final class DataFrameClient {
private final RestHighLevelClient restHighLevelClient;
DataFrameClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
/**
* Creates a new Data Frame Transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Data Frame PUT transform documentation</a>
*
* @param request The PutDataFrameTransformRequest containing the
* {@link org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig}.
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return An AcknowledgedResponse object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public AcknowledgedResponse putDataFrameTransform(PutDataFrameTransformRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::putDataFrameTransform,
options,
AcknowledgedResponse::fromXContent,
Collections.emptySet());
}
/**
* Creates a new Data Frame Transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Data Frame PUT transform documentation</a>
*
* @param request The PutDataFrameTransformRequest containing the
* {@link org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig}.
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void putDataFrameTransformAsync(PutDataFrameTransformRequest request, RequestOptions options,
ActionListener<AcknowledgedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::putDataFrameTransform,
options,
AcknowledgedResponse::fromXContent,
listener,
Collections.emptySet());
}
/**
* Delete a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Data Frame delete transform documentation</a>
*
* @param request The delete data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return An AcknowledgedResponse object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public AcknowledgedResponse deleteDataFrameTransform(DeleteDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::deleteDataFrameTransform,
options,
AcknowledgedResponse::fromXContent,
Collections.emptySet());
}
/**
* Delete a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Data Frame delete transform documentation</a>
*
* @param request The delete data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void deleteDataFrameTransformAsync(DeleteDataFrameTransformRequest request, RequestOptions options,
ActionListener<AcknowledgedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::deleteDataFrameTransform,
options,
AcknowledgedResponse::fromXContent,
listener,
Collections.emptySet());
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import java.io.IOException;
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.createEntity;
final class DataFrameRequestConverters {
private DataFrameRequestConverters() {}
static Request putDataFrameTransform(PutDataFrameTransformRequest putRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(putRequest.getConfig().getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
request.setEntity(createEntity(putRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest request) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(request.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
}
}

View File

@ -256,6 +256,7 @@ public class RestHighLevelClient implements Closeable {
private final IndexLifecycleClient ilmClient = new IndexLifecycleClient(this);
private final RollupClient rollupClient = new RollupClient(this);
private final CcrClient ccrClient = new CcrClient(this);
private final DataFrameClient dataFrameClient = new DataFrameClient(this);
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
@ -466,6 +467,19 @@ public class RestHighLevelClient implements Closeable {
return securityClient;
}
/**
* Provides methods for accessing the Elastic Licensed Data Frame APIs that
* are shipped with the Elastic Stack distribution of Elasticsearch. All of
* these APIs will 404 if run against the OSS distribution of Elasticsearch.
* <p>
* See the <a href="TODO">Data Frame APIs on elastic.co</a> for more information.
*
* @return the client wrapper for making Data Frame API calls
*/
public DataFrameClient dataFrame() {
return dataFrameClient;
}
/**
* Executes a bulk request using the Bulk API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import java.util.Objects;
import java.util.Optional;
/**
* Request to delete a data frame transform
*/
public class DeleteDataFrameTransformRequest implements Validatable {
private final String id;
public DeleteDataFrameTransformRequest(String id) {
this.id = id;
}
public String getId() {
return id;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
ValidationException validationException = new ValidationException();
validationException.addValidationError("data frame transform id must not be null");
return Optional.of(validationException);
} else {
return Optional.empty();
}
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DeleteDataFrameTransformRequest other = (DeleteDataFrameTransformRequest) obj;
return Objects.equals(id, other.id);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class PutDataFrameTransformRequest implements ToXContentObject, Validatable {
private final DataFrameTransformConfig config;
public PutDataFrameTransformRequest(DataFrameTransformConfig config) {
this.config = config;
}
public DataFrameTransformConfig getConfig() {
return config;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return config.toXContent(builder, params);
}
@Override
public int hashCode() {
return Objects.hash(config);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
PutDataFrameTransformRequest other = (PutDataFrameTransformRequest) obj;
return Objects.equals(config, other.config);
}
}

View File

@ -48,7 +48,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
private final QueryConfig queryConfig;
private final PivotConfig pivotConfig;
private static final ConstructingObjectParser<DataFrameTransformConfig, String> PARSER =
public static final ConstructingObjectParser<DataFrameTransformConfig, String> PARSER =
new ConstructingObjectParser<>("data_frame_transform", true,
(args) -> {
String id = (String) args[0];

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameRequestConvertersTests extends ESTestCase {
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
public void testPutDataFrameTransform() throws IOException {
PutDataFrameTransformRequest putRequest = new PutDataFrameTransformRequest(
DataFrameTransformConfigTests.randomDataFrameTransformConfig());
Request request = DataFrameRequestConverters.putDataFrameTransform(putRequest);
assertEquals(HttpPut.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + putRequest.getConfig().getId()));
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null);
assertThat(parsedConfig, equalTo(putRequest.getConfig()));
}
}
public void testDeleteDataFrameTransform() {
DeleteDataFrameTransformRequest deleteRequest = new DeleteDataFrameTransformRequest("foo");
Request request = DataFrameRequestConverters.deleteDataFrameTransform(deleteRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo"));
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import java.io.IOException;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
private void createIndex(String indexName) throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject()
.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.endObject()
.startObject("user_id")
.field("type", "keyword")
.endObject()
.startObject("stars")
.field("type", "integer")
.endObject()
.endObject()
.endObject();
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(builder);
CreateIndexResponse response = highLevelClient().indices().create(request, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
}
public void testCreateDelete() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
String id = "test-crud";
DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
DataFrameClient client = highLevelClient().dataFrame();
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
client::putDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
ack = execute(new DeleteDataFrameTransformRequest(transform.getId()), client::deleteDataFrameTransform,
client::deleteDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
// The second delete should fail
ElasticsearchStatusException deleteError = expectThrows(ElasticsearchStatusException.class,
() -> execute(new DeleteDataFrameTransformRequest(transform.getId()), client::deleteDataFrameTransform,
client::deleteDataFrameTransformAsync));
assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
}
}

View File

@ -784,6 +784,7 @@ public class RestHighLevelClientTests extends ESTestCase {
apiName.startsWith("security.") == false &&
apiName.startsWith("index_lifecycle.") == false &&
apiName.startsWith("ccr.") == false &&
apiName.startsWith("data_frame") == false &&
apiName.endsWith("freeze") == false &&
// IndicesClientIT.getIndexTemplate should be renamed "getTemplate" in version 8.0 when we
// can get rid of 7.0's deprecated "getTemplate"

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.containsString;
public class DeleteDataFrameTransformRequestTests extends ESTestCase {
public void testValidate() {
assertFalse(new DeleteDataFrameTransformRequest("valid-id").validate().isPresent());
assertThat(new DeleteDataFrameTransformRequest(null).validate().get().getMessage(),
containsString("data frame transform id must not be null"));
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.Collections;
public class PutDataFrameTransformRequestTests extends AbstractXContentTestCase<PutDataFrameTransformRequest> {
@Override
protected PutDataFrameTransformRequest createTestInstance() {
return new PutDataFrameTransformRequest(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
@Override
protected PutDataFrameTransformRequest doParseInstance(XContentParser parser) throws IOException {
return new PutDataFrameTransformRequest(DataFrameTransformConfig.fromXContent(parser));
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
}

View File

@ -31,12 +31,17 @@ import java.util.Collections;
import java.util.function.Predicate;
public class DataFrameTransformConfigTests extends AbstractXContentTestCase<DataFrameTransformConfig> {
@Override
protected DataFrameTransformConfig createTestInstance() {
public static DataFrameTransformConfig randomDataFrameTransformConfig() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}
@Override
protected DataFrameTransformConfig createTestInstance() {
return randomDataFrameTransformConfig();
}
@Override
protected DataFrameTransformConfig doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformConfig.fromXContent(parser);

View File

@ -0,0 +1,220 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.documentation;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTestCase {
private void createIndex(String indexName) throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject()
.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.endObject()
.startObject("user_id")
.field("type", "keyword")
.endObject()
.startObject("stars")
.field("type", "integer")
.endObject()
.endObject()
.endObject();
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(builder);
CreateIndexResponse response = highLevelClient().indices().create(request, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
}
public void testPutDataFrameTransform() throws IOException, InterruptedException {
createIndex("source-index");
RestHighLevelClient client = highLevelClient();
// tag::put-data-frame-transform-query-config
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
// end::put-data-frame-transform-query-config
// tag::put-data-frame-transform-group-config
GroupConfig groupConfig =
new GroupConfig(Collections.singletonMap("reviewer", // <1>
new TermsGroupSource("user_id"))); // <2>
// end::put-data-frame-transform-group-config
// tag::put-data-frame-transform-agg-config
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(
AggregationBuilders.avg("avg_rating").field("stars")); // <1>
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
// end::put-data-frame-transform-agg-config
// tag::put-data-frame-transform-pivot-config
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
// end::put-data-frame-transform-pivot-config
// tag::put-data-frame-transform-config
DataFrameTransformConfig transformConfig =
new DataFrameTransformConfig("reviewer-avg-rating", // <1>
"source-index", // <2>
"pivot-destination", // <3>
queryConfig, // <4>
pivotConfig); // <5>
// end::put-data-frame-transform-config
{
// tag::put-data-frame-transform-request
PutDataFrameTransformRequest request =
new PutDataFrameTransformRequest(transformConfig); // <1>
// end::put-data-frame-transform-request
// tag::put-data-frame-transform-execute
AcknowledgedResponse response =
client.dataFrame().putDataFrameTransform(
request, RequestOptions.DEFAULT);
// end::put-data-frame-transform-execute
assertTrue(response.isAcknowledged());
}
{
DataFrameTransformConfig configWithDifferentId = new DataFrameTransformConfig("reviewer-avg-rating2",
transformConfig.getSource(), transformConfig.getDestination(), transformConfig.getQueryConfig(),
transformConfig.getPivotConfig());
PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(configWithDifferentId);
// tag::put-data-frame-transform-execute-listener
ActionListener<AcknowledgedResponse> listener =
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::put-data-frame-transform-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<AcknowledgedResponse> ackListener = listener;
listener = new LatchedActionListener<>(listener, latch);
// tag::put-data-frame-transform-execute-async
client.dataFrame().putDataFrameTransformAsync(
request, RequestOptions.DEFAULT, listener); // <1>
// end::put-data-frame-transform-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testDeleteDataFrameTransform() throws IOException, InterruptedException {
createIndex("source-data");
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
DataFrameTransformConfig transformConfig1 = new DataFrameTransformConfig("mega-transform",
"source-data", "pivot-dest", queryConfig, pivotConfig);
DataFrameTransformConfig transformConfig2 = new DataFrameTransformConfig("mega-transform2",
"source-data", "pivot-dest2", queryConfig, pivotConfig);
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig1), RequestOptions.DEFAULT);
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig2), RequestOptions.DEFAULT);
{
// tag::delete-data-frame-transform-request
DeleteDataFrameTransformRequest request =
new DeleteDataFrameTransformRequest("mega-transform"); // <1>
// end::delete-data-frame-transform-request
// tag::delete-data-frame-transform-execute
AcknowledgedResponse response =
client.dataFrame()
.deleteDataFrameTransform(request, RequestOptions.DEFAULT);
// end::delete-data-frame-transform-execute
assertTrue(response.isAcknowledged());
}
{
// tag::delete-data-frame-transform-execute-listener
ActionListener<AcknowledgedResponse> listener =
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::delete-data-frame-transform-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
DeleteDataFrameTransformRequest request = new DeleteDataFrameTransformRequest("mega-transform2");
// tag::delete-data-frame-transform-execute-async
client.dataFrame().deleteDataFrameTransformAsync(
request, RequestOptions.DEFAULT, listener); // <1>
// end::delete-data-frame-transform-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}

View File

@ -0,0 +1,25 @@
--
:api: delete-data-frame-transform
:request: DeleteDataFrameTransformRequest
:response: AcknowledgedResponse
--
[id="{upid}-{api}"]
=== Delete Data Frame Transform API
[id="{upid}-{api}-request"]
==== Delete Data Frame Transform Request
A +{request}+ object requires a non-null `id`.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new request referencing an existing {dataframe-transform}
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]
==== Response
The returned +{response}+ object acknowledges the Data Frame Transform deletion.

View File

@ -0,0 +1,93 @@
--
:api: put-data-frame-transform
:request: PutDataFrameTransformRequest
:response: AcknowledgedResponse
--
[id="{upid}-{api}"]
=== Put Data Frame Transform API
The Put Data Frame Transform API is used to create a new {dataframe-transform}.
The API accepts a +{request}+ object as a request and returns a +{response}+.
[id="{upid}-{api}-request"]
==== Put Data Frame Request
A +{request}+ requires the following argument:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------
<1> The configuration of the {dataframe-job} to create
[id="{upid}-{api}-config"]
==== Data Frame Transform Configuration
The `DataFrameTransformConfig` object contains all the details about the {dataframe-transform}
configuration and contains the following arguments:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-config]
--------------------------------------------------
<1> The {dataframe-transform} ID
<2> The source index or index pattern
<3> The destination index
<4> Optionally a QueryConfig
<5> The PivotConfig
[id="{upid}-{api}-query-config"]
==== QueryConfig
The query with which to select data from the source.
If not set a `match_all` query is used by default.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-query-config]
--------------------------------------------------
==== PivotConfig
Defines the pivot function `group by` fields and the aggregation to reduce the data.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-pivot-config]
--------------------------------------------------
===== GroupConfig
The grouping terms. Defines the group by and destination fields
which are produced by the pivot function. There are 3 types of
groups
* Terms
* Histogram
* Date Histogram
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-group-config]
--------------------------------------------------
<1> The destination field
<2> Group by values of the `user_id` field
===== AggregationConfig
Defines the aggregations for the group fields.
// TODO link to the supported aggregations
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-agg-config]
--------------------------------------------------
<1> Aggregate the average star rating
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]
==== Response
The returned +{response}+ acknowledges the successful creation of
the new {dataframe-transform} or an error if the configuration is invalid.

View File

@ -550,3 +550,16 @@ include::ilm/stop_lifecycle_management.asciidoc[]
include::ilm/lifecycle_management_status.asciidoc[]
include::ilm/retry_lifecycle_policy.asciidoc[]
include::ilm/remove_lifecycle_policy_from_index.asciidoc[]
== Data Frame APIs
:upid: {mainid}-dataframe
:doc-tests-file: {doc-tests}/DataFrameTransformDocumentationIT.java
The Java High Level REST Client supports the following Data Frame APIs:
* <<{upid}-put-data-frame-transform>>
* <<{upid}-delete-data-frame-transform>>
include::dataframe/put_data_frame.asciidoc[]
include::dataframe/delete_data_frame.asciidoc[]