[ML-DataFrame] Add _preview endpoint (#38924) (#39319)

* [DATA-FRAME] add preview endpoint

* adjusting preview tests and fixing parser

* adjusing preview transport

* remove unused import

* adjusting test

* Addressing PR comments

* Fixing failing test and adjusting for pr comments

* fixing integration test
This commit is contained in:
Benjamin Trent 2019-02-22 10:55:38 -06:00 committed by GitHub
parent 4f2bd238d2
commit 3262d6c917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 490 additions and 8 deletions

View File

@ -12,8 +12,11 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ -224,6 +227,34 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
}
@SuppressWarnings("unchecked")
public void testPreviewTransform() throws Exception {
final Request createPreviewRequest = new Request("POST", DATAFRAME_ENDPOINT + "_preview");
String config = "{"
+ " \"source\": \"reviews\",";
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {\"terms\": { \"field\": \"user_id\" }},"
+ " \"by_day\": {\"date_histogram\": {\"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"}}},"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createPreviewRequest.setJsonEntity(config);
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
assertThat(preview.size(), equalTo(393));
Set<String> expectedFields = new HashSet<>(Arrays.asList("reviewer", "by_day", "avg_rating"));
preview.forEach(p -> {
Set<String> keys = p.keySet();
assertThat(keys, equalTo(expectedFields));
});
}
private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");

View File

@ -50,12 +50,14 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
@ -64,6 +66,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestPreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction;
@ -138,7 +141,8 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
new RestStopDataFrameTransformAction(settings, restController),
new RestDeleteDataFrameTransformAction(settings, restController),
new RestGetDataFrameTransformsAction(settings, restController),
new RestGetDataFrameTransformsStatsAction(settings, restController)
new RestGetDataFrameTransformsStatsAction(settings, restController),
new RestPreviewDataFrameTransformAction(settings, restController)
);
}
@ -154,7 +158,8 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class)
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class),
new ActionHandler<>(PreviewDataFrameTransformAction.INSTANCE, TransportPreviewDataFrameTransformAction.class)
);
}

View File

@ -0,0 +1,202 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTransformAction.Response> {
public static final PreviewDataFrameTransformAction INSTANCE = new PreviewDataFrameTransformAction();
public static final String NAME = "cluster:admin/data_frame/preview";
private PreviewDataFrameTransformAction() {
super(NAME);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
private DataFrameTransformConfig config;
public Request(DataFrameTransformConfig config) {
this.setConfig(config);
}
public Request() { }
public static Request fromXContent(final XContentParser parser) throws IOException {
Map<String, Object> content = parser.map();
// Destination and ID are not required for Preview, so we just supply our own
content.put(DataFrameTransformConfig.DESTINATION.getPreferredName(), "unused-transform-preview-index");
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
XContentParser newParser = XContentType.JSON
.xContent()
.createParser(parser.getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
BytesReference.bytes(xContentBuilder).streamInput())) {
return new Request(DataFrameTransformConfig.fromXContent(newParser, "transform-preview", true));
}
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return this.config.toXContent(builder, params);
}
public DataFrameTransformConfig getConfig() {
return config;
}
public void setConfig(DataFrameTransformConfig config) {
this.config = config;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.config = new DataFrameTransformConfig(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.config.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(config);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config);
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, PreviewDataFrameTransformAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private List<Map<String, Object>> docs;
public static ParseField PREVIEW = new ParseField("preview");
static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
static {
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
}
public Response() {}
public Response(StreamInput in) throws IOException {
int size = in.readInt();
this.docs = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
this.docs.add(in.readMap());
}
}
public Response(List<Map<String, Object>> docs) {
this.docs = new ArrayList<>(docs);
}
public void setDocs(List<Map<String, Object>> docs) {
this.docs = new ArrayList<>(docs);
}
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readInt();
this.docs = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
this.docs.add(in.readMap());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docs.size());
for (Map<String, Object> doc : docs) {
out.writeMapWithConsistentOrder(doc);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(PREVIEW.getPreferredName(), docs);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(other.docs, docs);
}
@Override
public int hashCode() {
return Objects.hashCode(docs);
}
public static Response fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME;
public class TransportPreviewDataFrameTransformAction extends
HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {
private final XPackLicenseState licenseState;
private final Client client;
private final ThreadPool threadPool;
@Inject
public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
Client client, ThreadPool threadPool, XPackLicenseState licenseState) {
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters,
(Supplier<PreviewDataFrameTransformAction.Request>) PreviewDataFrameTransformAction.Request::new);
this.licenseState = licenseState;
this.client = client;
this.threadPool = threadPool;
}
@Override
protected void doExecute(Task task,
PreviewDataFrameTransformAction.Request request,
ActionListener<PreviewDataFrameTransformAction.Response> listener) {
if (!licenseState.isDataFrameAllowed()) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
return;
}
Pivot pivot = new Pivot(request.getConfig().getSource(),
request.getConfig().getQueryConfig().getQuery(),
request.getConfig().getPivotConfig());
getPreview(pivot, ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
listener::onFailure
));
}
private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
ClientHelper.DATA_FRAME_ORIGIN,
client,
SearchAction.INSTANCE,
pivot.buildSearchRequest(null),
ActionListener.wrap(
r -> {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList()));
},
listener::onFailure
));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.rest.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction;
import java.io.IOException;
public class RestPreviewDataFrameTransformAction extends BaseRestHandler {
public RestPreviewDataFrameTransformAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, DataFrameField.REST_BASE_PATH + "transforms/_preview", this);
}
@Override
public String getName() {
return "data_frame_preview_transform_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
PreviewDataFrameTransformAction.Request request = PreviewDataFrameTransformAction.Request.fromXContent(parser);
return channel -> client.execute(PreviewDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -32,7 +32,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, DataFrameIndexerTransformStats> {
private static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
private static final Logger logger = LogManager.getLogger(DataFrameIndexer.class);
private Pivot pivot;

View File

@ -35,13 +35,13 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
*/
public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransformConfig> implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transforms";
private static final ParseField SOURCE = new ParseField("source");
private static final ParseField DESTINATION = new ParseField("dest");
private static final ParseField QUERY = new ParseField("query");
public static final String NAME = "data_frame_transforms";
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField QUERY = new ParseField("query");
// types of transforms
private static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction.Request;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.QueryConfigTests;
import org.elasticsearch.xpack.dataframe.transforms.pivot.PivotConfigTests;
import org.junit.Before;
import java.io.IOException;
import static java.util.Collections.emptyList;
public class PreviewDataFrameTransformActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
private NamedWriteableRegistry namedWriteableRegistry;
private NamedXContentRegistry namedXContentRegistry;
@Before
public void registerAggregationNamedObjects() throws Exception {
// register aggregations as NamedWriteable
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}
@Override
protected Request doParseInstance(XContentParser parser) throws IOException {
return Request.fromXContent(parser);
}
@Override
protected Request createBlankInstance() {
return new Request();
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10),
"unused-transform-preview-index", QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
return new Request(config);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PreviewDataFrameTransformsActionResponseTests extends AbstractStreamableXContentTestCase<Response> {
@Override
protected Response doParseInstance(XContentParser parser) throws IOException {
return Response.fromXContent(parser);
}
@Override
protected Response createBlankInstance() {
return new Response();
}
@Override
protected Response createTestInstance() {
int size = randomIntBetween(0, 10);
List<Map<String, Object>> data = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> datum = new HashMap<>();
Map<String, Object> entry = new HashMap<>();
entry.put("value1", randomIntBetween(1, 100));
datum.put(randomAlphaOfLength(10), entry);
data.add(datum);
}
return new Response(data);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}