[7.x] [ML][Data Frame] adds new pipeline field to dest config (#43124) (#43388)

* [ML][Data Frame] adds new pipeline field to dest config (#43124)

* [ML][Data Frame] adds new pipeline field to dest config

* Adding pipeline support to _preview

* removing unused import

* moving towards extracting _source from pipeline simulation

* fixing permission requirement, adding _index entry to doc

* adjusting for java 8 compatibility

* adjusting bwc serialization version to 7.3.0
This commit is contained in:
Benjamin Trent 2019-06-19 16:18:27 -05:00 committed by GitHub
parent b957aa46ce
commit b333ced5a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 412 additions and 78 deletions

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Configuration containing the destination index for the {@link DataFrameTransformConfig}
@ -35,29 +36,40 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
public class DestConfig implements ToXContentObject {
public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");
public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
true,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String)args[1]));
static {
PARSER.declareString(constructorArg(), INDEX);
PARSER.declareString(optionalConstructorArg(), PIPELINE);
}
private final String index;
private final String pipeline;
public DestConfig(String index) {
DestConfig(String index, String pipeline) {
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}
public String getIndex() {
return index;
}
public String getPipeline() {
return pipeline;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
@ -72,11 +84,45 @@ public class DestConfig implements ToXContentObject {
}
DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}
@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String index;
private String pipeline;
/**
* Sets which index to which to write the data
* @param index where to write the data
* @return The {@link Builder} with index set
*/
public Builder setIndex(String index) {
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
return this;
}
/**
* Sets the pipeline through which the indexed documents should be processed
* @param pipeline The pipeline ID
* @return The {@link Builder} with pipeline set
*/
public Builder setPipeline(String pipeline) {
this.pipeline = pipeline;
return this;
}
public DestConfig build() {
return new DestConfig(index, pipeline);
}
}
}

View File

@ -307,7 +307,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build();
DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null;
return DataFrameTransformConfig.builder()
.setId(id)
@ -334,7 +334,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
DataFrameTransformConfig transform = DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.setDescription("transform for testing stats")
.build();

View File

@ -27,7 +27,8 @@ import java.io.IOException;
public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {
public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}
@Override

View File

@ -125,6 +125,11 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.setIndex("source-index")
.setQueryConfig(queryConfig).build();
// end::put-data-frame-transform-source-config
// tag::put-data-frame-transform-dest-config
DestConfig destConfig = DestConfig.builder()
.setIndex("pivot-destination")
.setPipeline("my-pipeline").build();
// end::put-data-frame-transform-dest-config
// tag::put-data-frame-transform-group-config
GroupConfig groupConfig = GroupConfig.builder()
.groupBy("reviewer", // <1>
@ -149,7 +154,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.builder()
.setId("reviewer-avg-rating") // <1>
.setSource(sourceConfig) // <2>
.setDest(new DestConfig("pivot-destination")) // <3>
.setDest(destConfig) // <3>
.setPivotConfig(pivotConfig) // <4>
.setDescription("This is my test transform") // <5>
.build();
@ -222,7 +227,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder()
.setId("mega-transform")
.setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
@ -344,7 +349,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder()
@ -353,7 +358,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest2"))
.setDest(DestConfig.builder().setIndex("pivot-dest2").build())
.setPivotConfig(pivotConfig)
.build();
@ -488,7 +493,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
@ -574,7 +579,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

View File

@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config]
--------------------------------------------------
<1> The {dataframe-transform} ID
<2> The source indices and query from which to gather data
<3> The destination index
<3> The destination index and optional pipeline
<4> The PivotConfig
<5> Optional free text description of the transform
@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default.
include-tagged::{doc-tests-file}[{api}-source-config]
--------------------------------------------------
==== DestConfig
The index where to write the data and the optional pipeline
through which the docs should be indexed
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-dest-config]
--------------------------------------------------
===== QueryConfig
The query with which to select data from the source.

View File

@ -38,7 +38,8 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
`source` (required):: (object) The source configuration, consisting of `index` and optionally
a `query`.
`dest` (required):: (object) The destination configuration, consisting of `index`.
`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a
`pipeline` id.
`pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to
reduce the data. See <<data-frame-transform-pivot, data frame transform pivot objects>>.
@ -76,7 +77,8 @@ PUT _data_frame/transforms/ecommerce_transform
}
},
"dest": {
"index": "kibana_sample_data_ecommerce_transform"
"index": "kibana_sample_data_ecommerce_transform",
"pipeline": "add_timestamp_pipeline"
},
"pivot": {
"group_by": {

View File

@ -24,10 +24,11 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -66,8 +67,20 @@ public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTran
public static Request fromXContent(final XContentParser parser) throws IOException {
Map<String, Object> content = parser.map();
// Destination and ID are not required for Preview, so we just supply our own
content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index"));
// dest.index and ID are not required for Preview, so we just supply our own
Map<String, String> tempDestination = new HashMap<>();
tempDestination.put(DestConfig.INDEX.getPreferredName(), "unused-transform-preview-index");
// Users can still provide just dest.pipeline to preview what their data would look like given the pipeline ID
Object providedDestination = content.get(DataFrameField.DESTINATION.getPreferredName());
if (providedDestination instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> destMap = (Map<String, String>)providedDestination;
String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName());
if (pipeline != null) {
tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline);
}
}
content.put(DataFrameField.DESTINATION.getPreferredName(), tempDestination);
content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
XContentParser newParser = XContentType.JSON

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -20,10 +21,12 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DestConfig implements Writeable, ToXContentObject {
public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");
public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);
@ -31,25 +34,37 @@ public class DestConfig implements Writeable, ToXContentObject {
private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
lenient,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String) args[1]));
parser.declareString(constructorArg(), INDEX);
parser.declareString(optionalConstructorArg(), PIPELINE);
return parser;
}
private final String index;
private final String pipeline;
public DestConfig(String index) {
public DestConfig(String index, String pipeline) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}
public DestConfig(final StreamInput in) throws IOException {
index = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
pipeline = in.readOptionalString();
} else {
pipeline = null;
}
}
public String getIndex() {
return index;
}
public String getPipeline() {
return pipeline;
}
public boolean isValid() {
return index.isEmpty() == false;
}
@ -57,12 +72,18 @@ public class DestConfig implements Writeable, ToXContentObject {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalString(pipeline);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
@ -77,12 +98,13 @@ public class DestConfig implements Writeable, ToXContentObject {
}
DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}
@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}
public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {

View File

@ -40,7 +40,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractSeriali
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
new DestConfig("unused-transform-preview-index"),
new DestConfig("unused-transform-preview-index", null),
null, PivotConfigTests.randomPivotConfig(), null);
return new Request(config);
}

View File

@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestCo
private boolean lenient;
public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}
@Before

View File

@ -205,7 +205,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
return DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
.setDest(new DestConfig(destinationIndex))
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
.setPivotConfig(createPivotConfig(groups, aggregations))
.setDescription("Test data frame transform config id: " + id)
.build();

View File

@ -55,7 +55,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
String dataFrameIndex = "pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
createPivotReviewsTransform(transformId, dataFrameIndex, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
createPivotReviewsTransform(transformId, dataFrameIndex, null, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
@ -77,7 +77,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
String query = "\"match\": {\"user_id\": \"user_26\"}";
createPivotReviewsTransform(transformId, dataFrameIndex, query, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
createPivotReviewsTransform(transformId, dataFrameIndex, query, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
@ -87,6 +87,46 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
}
public void testPivotWithPipeline() throws Exception {
String transformId = "simple_pivot_with_pipeline";
String dataFrameIndex = "pivot_with_pipeline";
String pipelineId = "my-pivot-pipeline";
int pipelineValue = 42;
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity("{\n" +
" \"description\" : \"my pivot pipeline\",\n" +
" \"processors\" : [\n" +
" {\n" +
" \"set\" : {\n" +
" \"field\": \"pipeline_field\",\n" +
" \"value\": " + pipelineValue +
" }\n" +
" }\n" +
" ]\n" +
"}");
client().performRequest(pipelineRequest);
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
createPivotReviewsTransform(transformId, dataFrameIndex, null, pipelineId, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
// get and check some users
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_0");
Integer actual = (Integer) ((List<?>) XContentMapValues.extractValue("hits.hits._source.pipeline_field", searchResult)).get(0);
assertThat(actual, equalTo(pipelineValue));
}
public void testHistogramPivot() throws Exception {
String transformId = "simple_histogram_pivot";
String dataFrameIndex = "pivot_reviews_via_histogram";
@ -138,38 +178,38 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"sum_rating\": {"
+ " \"sum\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"cardinality_business\": {"
+ " \"cardinality\": {"
+ " \"field\": \"business_id\""
+ " } },"
+ " \"min_rating\": {"
+ " \"min\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"max_rating\": {"
+ " \"max\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"count\": {"
+ " \"value_count\": {"
+ " \"field\": \"business_id\""
+ " } }"
+ " } }"
+ "}";
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"sum_rating\": {"
+ " \"sum\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"cardinality_business\": {"
+ " \"cardinality\": {"
+ " \"field\": \"business_id\""
+ " } },"
+ " \"min_rating\": {"
+ " \"min\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"max_rating\": {"
+ " \"max\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"count\": {"
+ " \"value_count\": {"
+ " \"field\": \"business_id\""
+ " } }"
+ " } }"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
@ -260,7 +300,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createPreviewRequest.setJsonEntity(config);
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
List<Map<String, Object>> preview = (List<Map<String, Object>>) previewDataframeResponse.get("preview");
// preview is limited to 100
assertThat(preview.size(), equalTo(100));
Set<String> expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day"));
@ -268,6 +308,57 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
preview.forEach(p -> {
Set<String> keys = p.keySet();
assertThat(keys, equalTo(expectedTopLevelFields));
Map<String, Object> nestedObj = (Map<String, Object>) p.get("user");
keys = nestedObj.keySet();
assertThat(keys, equalTo(expectedNestedFields));
});
}
@SuppressWarnings("unchecked")
public void testPreviewTransformWithPipeline() throws Exception {
String pipelineId = "my-preview-pivot-pipeline";
int pipelineValue = 42;
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity("{\n" +
" \"description\" : \"my pivot preview pipeline\",\n" +
" \"processors\" : [\n" +
" {\n" +
" \"set\" : {\n" +
" \"field\": \"pipeline_field\",\n" +
" \"value\": " + pipelineValue +
" }\n" +
" }\n" +
" ]\n" +
"}");
client().performRequest(pipelineRequest);
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + "_preview", null);
String config = "{ \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ,"
+ "\"dest\": {\"pipeline\": \"" + pipelineId + "\"},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"user.id\": {\"terms\": { \"field\": \"user_id\" }},"
+ " \"by_day\": {\"date_histogram\": {\"fixed_interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-dd\"}}},"
+ " \"aggregations\": {"
+ " \"user.avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createPreviewRequest.setJsonEntity(config);
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
// preview is limited to 100
assertThat(preview.size(), equalTo(100));
Set<String> expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day", "pipeline_field"));
Set<String> expectedNestedFields = new HashSet<>(Arrays.asList("id", "avg_rating"));
preview.forEach(p -> {
Set<String> keys = p.keySet();
assertThat(keys, equalTo(expectedTopLevelFields));
assertThat(p.get("pipeline_field"), equalTo(pipelineValue));
Map<String, Object> nestedObj = (Map<String, Object>)p.get("user");
keys = nestedObj.keySet();
assertThat(keys, equalTo(expectedNestedFields));

View File

@ -147,12 +147,23 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
createPivotReviewsTransform(transformId, dataFrameIndex, query, null);
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String authHeader)
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline)
throws IOException {
createPivotReviewsTransform(transformId, dataFrameIndex, query, pipeline, null);
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, authHeader);
String config = "{"
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
String config = "{";
if (pipeline != null) {
config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\", \"pipeline\":\"" + pipeline + "\"},";
} else {
config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
}
if (query != null) {
config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},";

View File

@ -120,7 +120,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
public void testGetProgress() throws Exception {
createReviewsIndex();
SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME);
DestConfig destConfig = new DestConfig("unnecessary");
DestConfig destConfig = new DestConfig("unnecessary", null);
GroupConfig histgramGroupConfig = new GroupConfig(Collections.emptyMap(),
Collections.singletonMap("every_50", new HistogramGroupSource("count", 50.0)));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();

View File

@ -6,8 +6,14 @@
package org.elasticsearch.xpack.dataframe.action;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulatePipelineAction;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -16,7 +22,14 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
@ -26,6 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
@ -34,15 +48,19 @@ import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME;
public class TransportPreviewDataFrameTransformAction extends
HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPreviewDataFrameTransformAction.class);
private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
private final XPackLicenseState licenseState;
private final Client client;
@ -87,13 +105,41 @@ public class TransportPreviewDataFrameTransformAction extends
Pivot pivot = new Pivot(config.getPivotConfig());
getPreview(pivot, config.getSource(), ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
listener::onFailure
getPreview(pivot,
config.getSource(),
config.getDestination().getPipeline(),
config.getDestination().getIndex(),
ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
error -> {
logger.error("Failure gathering preview", error);
listener.onFailure(error);
}
));
}
private void getPreview(Pivot pivot, SourceConfig source, ActionListener<List<Map<String, Object>>> listener) {
@SuppressWarnings("unchecked")
private void getPreview(Pivot pivot,
SourceConfig source,
String pipeline,
String dest,
ActionListener<List<Map<String, Object>>> listener) {
ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(
simulatePipelineResponse -> {
List<Map<String, Object>> response = new ArrayList<>(simulatePipelineResponse.getResults().size());
for(SimulateDocumentResult simulateDocumentResult : simulatePipelineResponse.getResults()) {
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
Map<String, Object> tempMap = XContentHelper.convertToMap(BytesReference.bytes(content),
true,
XContentType.JSON).v2();
response.add((Map<String, Object>)XContentMapValues.extractValue("doc._source", tempMap));
}
}
listener.onResponse(response);
},
listener::onFailure
);
pivot.deduceMappings(client, source, ActionListener.wrap(
deducedMappings -> {
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
@ -103,17 +149,41 @@ public class TransportPreviewDataFrameTransformAction extends
pivot.buildSearchRequest(source, null, NUMBER_OF_PREVIEW_BUCKETS),
ActionListener.wrap(
r -> {
try {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
// remove all internal fields
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.peek(record -> {
record.keySet().removeIf(k -> k.startsWith("_"));
}).collect(Collectors.toList());
listener.onResponse(results);
if (pipeline == null) {
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_")))
.collect(Collectors.toList());
listener.onResponse(results);
} else {
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.map(doc -> {
Map<String, Object> src = new HashMap<>();
String id = (String) doc.get(DataFrameField.DOCUMENT_ID_FIELD);
doc.keySet().removeIf(k -> k.startsWith("_"));
src.put("_source", doc);
src.put("_id", id);
src.put("_index", dest);
return src;
}).collect(Collectors.toList());
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.field("docs", results);
builder.endObject();
SimulatePipelineRequest pipelineRequest =
new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
pipelineRequest.setId(pipeline);
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
SimulatePipelineAction.INSTANCE,
pipelineRequest,
pipelineResponseActionListener);
}
}
} catch (AggregationResultUtils.AggregationExtractionException extractionException) {
listener.onFailure(
new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST));

View File

@ -179,6 +179,9 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
IndexRequest request = new IndexRequest(indexName).source(builder).id(id);
if (transformConfig.getDestination().getPipeline() != null) {
request.setPipeline(transformConfig.getDestination().getPipeline());
}
return request;
});
}

View File

@ -99,6 +99,49 @@ setup:
- match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
- match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
- do:
ingest.put_pipeline:
id: "data_frame_simple_pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "my_field",
"value": 42
}
}
]
}
- match: { acknowledged: true }
- do:
data_frame.preview_data_frame_transform:
body: >
{
"source": { "index": "airline-data" },
"dest": { "pipeline": "data_frame_simple_pipeline" },
"pivot": {
"group_by": {
"airline": {"terms": {"field": "airline"}},
"by-hour": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-dd HH"}}},
"aggs": {
"avg_response": {"avg": {"field": "responsetime"}}
}
}
}
- match: { preview.0.airline: foo }
- match: { preview.0.by-hour: "2017-02-18 00" }
- match: { preview.0.avg_response: 1.0 }
- match: { preview.0.my_field: 42 }
- match: { preview.1.airline: bar }
- match: { preview.1.by-hour: "2017-02-18 01" }
- match: { preview.1.avg_response: 42.0 }
- match: { preview.1.my_field: 42 }
- match: { preview.2.airline: foo }
- match: { preview.2.by-hour: "2017-02-18 01" }
- match: { preview.2.avg_response: 42.0 }
- match: { preview.2.my_field: 42 }
---
"Test preview transform with invalid config":
- do:
@ -127,7 +170,6 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test preview returns bad request with invalid agg":
- do:
@ -161,4 +203,21 @@ setup:
}
}
}
---
"Test preview with missing pipeline":
- do:
catch: bad_request
data_frame.preview_data_frame_transform:
body: >
{
"source": { "index": "airline-data" },
"dest": { "pipeline": "missing-pipeline" },
"pivot": {
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
"aggs": {
"avg_response": {"avg": {"field": "responsetime"}},
"time.min": {"min": {"field": "time"}}
}
}
}

View File

@ -74,7 +74,7 @@ setup:
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-again" },
"dest": { "index": "airline-data-by-airline-again", "pipeline": "airline-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}