[ML] Data Frame HLRC Preview API (#40258)

This commit is contained in:
David Kyle 2019-03-21 09:38:27 +00:00 committed by GitHub
parent d485be631b
commit a4cb92a300
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 651 additions and 21 deletions

View File

@ -22,6 +22,8 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest; import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
@ -120,6 +122,45 @@ public final class DataFrameClient {
Collections.emptySet()); Collections.emptySet());
} }
/**
* Preview the result of a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Preview Data Frame transform documentation</a>
*
* @param request The preview data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return A response containing the results of the applied transform
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public PreviewDataFrameTransformResponse previewDataFrameTransform(PreviewDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::previewDataFrameTransform,
options,
PreviewDataFrameTransformResponse::fromXContent,
Collections.emptySet());
}
/**
* Preview the result of a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Preview Data Frame transform documentation</a>
*
* @param request The preview data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void previewDataFrameTransformAsync(PreviewDataFrameTransformRequest request, RequestOptions options,
ActionListener<PreviewDataFrameTransformResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::previewDataFrameTransform,
options,
PreviewDataFrameTransformResponse::fromXContent,
listener,
Collections.emptySet());
}
/** /**
* Start a data frame transform * Start a data frame transform

View File

@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest; import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
@ -84,4 +85,13 @@ final class DataFrameRequestConverters {
} }
return request; return request;
} }
static Request previewDataFrameTransform(PreviewDataFrameTransformRequest previewRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms", "_preview")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(previewRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
} }

View File

@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
public class PreviewDataFrameTransformRequest implements ToXContentObject, Validatable {
private final DataFrameTransformConfig config;
public PreviewDataFrameTransformRequest(DataFrameTransformConfig config) {
this.config = config;
}
public DataFrameTransformConfig getConfig() {
return config;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
return config.toXContent(builder, params);
}
@Override
public Optional<ValidationException> validate() {
ValidationException validationException = new ValidationException();
if (config == null) {
validationException.addValidationError("preview requires a non-null data frame config");
return Optional.of(validationException);
} else {
if (config.getSource() == null) {
validationException.addValidationError("data frame transform source cannot be null");
}
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
} else {
return Optional.of(validationException);
}
}
@Override
public int hashCode() {
return Objects.hash(config);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
PreviewDataFrameTransformRequest other = (PreviewDataFrameTransformRequest) obj;
return Objects.equals(config, other.config);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class PreviewDataFrameTransformResponse {
private static final String PREVIEW = "preview";
@SuppressWarnings("unchecked")
public static PreviewDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
Object previewDocs = parser.map().get(PREVIEW);
return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs);
}
private List<Map<String, Object>> docs;
public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs) {
this.docs = docs;
}
public List<Map<String, Object>> getDocs() {
return docs;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
PreviewDataFrameTransformResponse other = (PreviewDataFrameTransformResponse) obj;
return Objects.equals(other.docs, docs);
}
@Override
public int hashCode() {
return Objects.hashCode(docs);
}
}

View File

@ -20,12 +20,14 @@
package org.elasticsearch.client.dataframe; package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.Validatable; import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
public class PutDataFrameTransformRequest implements ToXContentObject, Validatable { public class PutDataFrameTransformRequest implements ToXContentObject, Validatable {
@ -39,6 +41,31 @@ public class PutDataFrameTransformRequest implements ToXContentObject, Validatab
return config; return config;
} }
@Override
public Optional<ValidationException> validate() {
ValidationException validationException = new ValidationException();
if (config == null) {
validationException.addValidationError("put requires a non-null data frame config");
return Optional.of(validationException);
} else {
if (config.getId() == null) {
validationException.addValidationError("data frame transform id cannot be null");
}
if (config.getSource() == null) {
validationException.addValidationError("data frame transform source cannot be null");
}
if (config.getDestination() == null) {
validationException.addValidationError("data frame transform destination cannot be null");
}
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
} else {
return Optional.of(validationException);
}
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return config.toXContent(builder, params); return config.toXContent(builder, params);

View File

@ -77,9 +77,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
final String dest, final String dest,
final QueryConfig queryConfig, final QueryConfig queryConfig,
final PivotConfig pivotConfig) { final PivotConfig pivotConfig) {
this.id = Objects.requireNonNull(id); this.id = id;
this.source = Objects.requireNonNull(source); this.source = source;
this.dest = Objects.requireNonNull(dest); this.dest = dest;
this.queryConfig = queryConfig; this.queryConfig = queryConfig;
this.pivotConfig = pivotConfig; this.pivotConfig = pivotConfig;
} }
@ -104,24 +104,16 @@ public class DataFrameTransformConfig implements ToXContentObject {
return queryConfig; return queryConfig;
} }
public boolean isValid() {
if (queryConfig != null && queryConfig.isValid() == false) {
return false;
}
if (pivotConfig == null || pivotConfig.isValid() == false) {
return false;
}
return true;
}
@Override @Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(ID.getPreferredName(), id); if (id != null) {
builder.field(ID.getPreferredName(), id);
}
builder.field(SOURCE.getPreferredName(), source); builder.field(SOURCE.getPreferredName(), source);
builder.field(DEST.getPreferredName(), dest); if (dest != null) {
builder.field(DEST.getPreferredName(), dest);
}
if (queryConfig != null) { if (queryConfig != null) {
builder.field(QUERY.getPreferredName(), queryConfig); builder.field(QUERY.getPreferredName(), queryConfig);
} }

View File

@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest; import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
@ -122,4 +123,18 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
assertFalse(request.getParameters().containsKey("timeout")); assertFalse(request.getParameters().containsKey("timeout"));
} }
} }
public void testPreviewDataFrameTransform() throws IOException {
PreviewDataFrameTransformRequest previewRequest = new PreviewDataFrameTransformRequest(
DataFrameTransformConfigTests.randomDataFrameTransformConfig());
Request request = DataFrameRequestConverters.previewDataFrameTransform(previewRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/_preview"));
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null);
assertThat(parsedConfig, equalTo(previewRequest.getConfig()));
}
}
} }

View File

@ -20,8 +20,14 @@
package org.elasticsearch.client; package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest; import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
@ -36,19 +42,29 @@ import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.junit.After;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
private List<String> transformsToClean = new ArrayList<>();
private void createIndex(String indexName) throws IOException { private void createIndex(String indexName) throws IOException {
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
@ -72,6 +88,58 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertTrue(response.isAcknowledged()); assertTrue(response.isAcknowledged());
} }
private void indexData(String indexName) throws IOException {
BulkRequest request = new BulkRequest();
{
Map<String, Object> doc = new HashMap<>();
doc.put("timestamp", "2019-03-10T12:00:00+00");
doc.put("user_id", "theresa");
doc.put("stars", 2);
request.add(new IndexRequest(indexName).source(doc, XContentType.JSON));
doc = new HashMap<>();
doc.put("timestamp", "2019-03-10T18:00:00+00");
doc.put("user_id", "theresa");
doc.put("stars", 3);
request.add(new IndexRequest(indexName).source(doc, XContentType.JSON));
doc = new HashMap<>();
doc.put("timestamp", "2019-03-10T12:00:00+00");
doc.put("user_id", "michel");
doc.put("stars", 5);
request.add(new IndexRequest(indexName).source(doc, XContentType.JSON));
doc = new HashMap<>();
doc.put("timestamp", "2019-03-10T18:00:00+00");
doc.put("user_id", "michel");
doc.put("stars", 3);
request.add(new IndexRequest(indexName).source(doc, XContentType.JSON));
doc = new HashMap<>();
doc.put("timestamp", "2019-03-11T12:00:00+00");
doc.put("user_id", "michel");
doc.put("stars", 3);
request.add(new IndexRequest(indexName).source(doc, XContentType.JSON));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
BulkResponse response = highLevelClient().bulk(request, RequestOptions.DEFAULT);
assertFalse(response.hasFailures());
}
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().deleteDataFrameTransform(
new DeleteDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
}
transformsToClean = new ArrayList<>();
}
public void testCreateDelete() throws IOException { public void testCreateDelete() throws IOException {
String sourceIndex = "transform-source"; String sourceIndex = "transform-source";
createIndex(sourceIndex); createIndex(sourceIndex);
@ -120,6 +188,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
client::putDataFrameTransformAsync); client::putDataFrameTransformAsync);
assertTrue(ack.isAcknowledged()); assertTrue(ack.isAcknowledged());
transformsToClean.add(id);
StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id); StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id);
StartDataFrameTransformResponse startResponse = StartDataFrameTransformResponse startResponse =
@ -137,5 +206,35 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertThat(stopResponse.getNodeFailures(), empty()); assertThat(stopResponse.getNodeFailures(), empty());
assertThat(stopResponse.getTaskFailures(), empty()); assertThat(stopResponse.getTaskFailures(), empty());
} }
public void testPreview() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
indexData(sourceIndex);
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", sourceIndex, null, queryConfig, pivotConfig);
DataFrameClient client = highLevelClient().dataFrame();
PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform),
client::previewDataFrameTransform,
client::previewDataFrameTransformAsync);
List<Map<String, Object>> docs = preview.getDocs();
assertThat(docs, hasSize(2));
Optional<Map<String, Object>> theresa = docs.stream().filter(doc -> "theresa".equals(doc.get("reviewer"))).findFirst();
assertTrue(theresa.isPresent());
assertEquals(2.5d, (double)theresa.get().get("avg_rating"), 0.01d);
Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
assertTrue(michel.isPresent());
assertEquals(3.6d, (double)michel.get().get("avg_rating"), 0.1d);
}
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.client.dataframe.transforms.QueryConfigTests;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.Matchers.containsString;
public class PreviewDataFrameTransformRequestTests extends AbstractXContentTestCase<PreviewDataFrameTransformRequest> {
@Override
protected PreviewDataFrameTransformRequest createTestInstance() {
return new PreviewDataFrameTransformRequest(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
@Override
protected PreviewDataFrameTransformRequest doParseInstance(XContentParser parser) throws IOException {
return new PreviewDataFrameTransformRequest(DataFrameTransformConfig.fromXContent(parser));
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
public void testValidate() {
assertFalse(new PreviewDataFrameTransformRequest(DataFrameTransformConfigTests.randomDataFrameTransformConfig())
.validate().isPresent());
assertThat(new PreviewDataFrameTransformRequest(null).validate().get().getMessage(),
containsString("preview requires a non-null data frame config"));
// null id and destination is valid
DataFrameTransformConfig config = new DataFrameTransformConfig(null, "source", null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
assertFalse(new PreviewDataFrameTransformRequest(config).validate().isPresent());
// null source is not valid
config = new DataFrameTransformConfig(null, null, null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
Optional<ValidationException> error = new PreviewDataFrameTransformRequest(config).validate();
assertTrue(error.isPresent());
assertThat(error.get().getMessage(), containsString("data frame transform source cannot be null"));
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class PreviewDataFrameTransformResponseTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
this::createTestInstance,
this::toXContent,
PreviewDataFrameTransformResponse::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(path -> path.isEmpty() == false)
.test();
}
private PreviewDataFrameTransformResponse createTestInstance() {
int numDocs = randomIntBetween(5, 10);
List<Map<String, Object>> docs = new ArrayList<>(numDocs);
for (int i=0; i<numDocs; i++) {
int numFields = randomIntBetween(1, 4);
Map<String, Object> doc = new HashMap<>();
for (int j=0; j<numFields; j++) {
doc.put(randomAlphaOfLength(5), randomAlphaOfLength(5));
}
docs.add(doc);
}
return new PreviewDataFrameTransformResponse(docs);
}
private void toXContent(PreviewDataFrameTransformResponse response, XContentBuilder builder) throws IOException {
builder.startObject();
builder.startArray("preview");
for (Map<String, Object> doc : response.getDocs()) {
builder.map(doc);
}
builder.endArray();
builder.endObject();
}
}

View File

@ -19,8 +19,11 @@
package org.elasticsearch.client.dataframe; package org.elasticsearch.client.dataframe;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.client.dataframe.transforms.QueryConfigTests;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -29,8 +32,29 @@ import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.Matchers.containsString;
public class PutDataFrameTransformRequestTests extends AbstractXContentTestCase<PutDataFrameTransformRequest> { public class PutDataFrameTransformRequestTests extends AbstractXContentTestCase<PutDataFrameTransformRequest> {
public void testValidate() {
assertFalse(createTestInstance().validate().isPresent());
DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
Optional<ValidationException> error = new PutDataFrameTransformRequest(config).validate();
assertTrue(error.isPresent());
assertThat(error.get().getMessage(), containsString("data frame transform id cannot be null"));
assertThat(error.get().getMessage(), containsString("data frame transform source cannot be null"));
assertThat(error.get().getMessage(), containsString("data frame transform destination cannot be null"));
error = new PutDataFrameTransformRequest(null).validate();
assertTrue(error.isPresent());
assertThat(error.get().getMessage(), containsString("put requires a non-null data frame config"));
}
@Override @Override
protected PutDataFrameTransformRequest createTestInstance() { protected PutDataFrameTransformRequest createTestInstance() {
return new PutDataFrameTransformRequest(DataFrameTransformConfigTests.randomDataFrameTransformConfig()); return new PutDataFrameTransformRequest(DataFrameTransformConfigTests.randomDataFrameTransformConfig());

View File

@ -26,6 +26,8 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest; import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
@ -356,6 +358,67 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
assertTrue(latch.await(30L, TimeUnit.SECONDS)); assertTrue(latch.await(30L, TimeUnit.SECONDS));
} }
}
public void testPreview() throws IOException, InterruptedException {
createIndex("source-data");
RestHighLevelClient client = highLevelClient();
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
// tag::preview-data-frame-transform-request
DataFrameTransformConfig transformConfig =
new DataFrameTransformConfig(null, // <1>
"source-data",
null, // <2>
queryConfig,
pivotConfig);
PreviewDataFrameTransformRequest request =
new PreviewDataFrameTransformRequest(transformConfig); // <3>
// end::preview-data-frame-transform-request
{
// tag::preview-data-frame-transform-execute
PreviewDataFrameTransformResponse response =
client.dataFrame()
.previewDataFrameTransform(request, RequestOptions.DEFAULT);
// end::preview-data-frame-transform-execute
assertNotNull(response.getDocs());
}
{
// tag::preview-data-frame-transform-execute-listener
ActionListener<PreviewDataFrameTransformResponse> listener =
new ActionListener<PreviewDataFrameTransformResponse>() {
@Override
public void onResponse(PreviewDataFrameTransformResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::preview-data-frame-transform-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::preview-data-frame-transform-execute-async
client.dataFrame().previewDataFrameTransformAsync(
request, RequestOptions.DEFAULT, listener); // <1>
// end::preview-data-frame-transform-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
} }
} }

View File

@ -0,0 +1,32 @@
--
:api: preview-data-frame-transform
:request: PreviewDataFrameTransformRequest
:response: PreviewDataFrameTransformResponse
--
[id="{upid}-{api}"]
=== Preview Data Frame Transform API
The Preview Data Frame Transform API is used to preview the results of
a {dataframe-transform}.
The API accepts a +{request}+ object as a request and returns a +{response}+.
[id="{upid}-{api}-request"]
==== Preview Data Frame Request
A +{request}+ takes a single argument: a valid data frame transform config.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------
<1> The transform Id may be null for the preview
<2> The destination may be null for the preview
<3> The configuration of the {dataframe-job} to preview
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]
==== Response
The returned +{response}+ contains the preview documents

View File

@ -556,10 +556,12 @@ The Java High Level REST Client supports the following Data Frame APIs:
* <<{upid}-put-data-frame-transform>> * <<{upid}-put-data-frame-transform>>
* <<{upid}-delete-data-frame-transform>> * <<{upid}-delete-data-frame-transform>>
* <<{upid}-preview-data-frame-transform>>
* <<{upid}-start-data-frame-transform>> * <<{upid}-start-data-frame-transform>>
* <<{upid}-stop-data-frame-transform>> * <<{upid}-stop-data-frame-transform>>
include::dataframe/put_data_frame.asciidoc[] include::dataframe/put_data_frame.asciidoc[]
include::dataframe/delete_data_frame.asciidoc[] include::dataframe/delete_data_frame.asciidoc[]
include::dataframe/preview_data_frame.asciidoc[]
include::dataframe/start_data_frame.asciidoc[] include::dataframe/start_data_frame.asciidoc[]
include::dataframe/stop_data_frame.asciidoc[] include::dataframe/stop_data_frame.asciidoc[]

View File

@ -60,6 +60,7 @@ public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTran
Map<String, Object> content = parser.map(); Map<String, Object> content = parser.map();
// Destination and ID are not required for Preview, so we just supply our own // Destination and ID are not required for Preview, so we just supply our own
content.put(DataFrameField.DESTINATION.getPreferredName(), "unused-transform-preview-index"); content.put(DataFrameField.DESTINATION.getPreferredName(), "unused-transform-preview-index");
content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content); try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
XContentParser newParser = XContentType.JSON XContentParser newParser = XContentType.JSON
.xContent() .xContent()

View File

@ -6,10 +6,13 @@
package org.elasticsearch.xpack.core.dataframe.action; package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction.Request; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction.Request;
@ -28,7 +31,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
private NamedXContentRegistry namedXContentRegistry; private NamedXContentRegistry namedXContentRegistry;
@Before @Before
public void registerAggregationNamedObjects() throws Exception { public void registerAggregationNamedObjects() {
// register aggregations as NamedWriteable // register aggregations as NamedWriteable
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
@ -67,4 +70,24 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
return new Request(config); return new Request(config);
} }
public void testParsingOverwritesIdAndDestFields() throws IOException {
// id & dest fields will be set by the parser
BytesArray json = new BytesArray(
"{ " +
"\"source\":\"foo\", " +
"\"query\": {\"match_all\": {}}," +
"\"pivot\": {" +
"\"group_by\": {\"destination-field2\": {\"terms\": {\"field\": \"term-field\"}}}," +
"\"aggs\": {\"avg_response\": {\"avg\": {\"field\": \"responsetime\"}}}" +
"}" +
"}");
try (XContentParser parser = JsonXContent.jsonXContent
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput())) {
Request request = Request.fromXContent(parser);
assertEquals("transform-preview", request.getConfig().getId());
assertEquals("unused-transform-preview-index", request.getConfig().getDestination());
}
}
} }

View File

@ -29,9 +29,7 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(DataFrameField.ID.getPreferredName()); String id = restRequest.param(DataFrameField.ID.getPreferredName());
StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id); StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id);
if (restRequest.hasParam(DataFrameField.TIMEOUT.getPreferredName())) { request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
}
return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }