* [ML][Data Frame] Add deduced mappings to _preview response payload (#43742) * [ML][Data Frame] Add deduced mappings to _preview response payload * updating preview docs * fixing code for backport
This commit is contained in:
parent
b977f019b8
commit
82c1ddc117
|
@ -29,23 +29,32 @@ import java.util.Objects;
|
|||
public class PreviewDataFrameTransformResponse {
|
||||
|
||||
private static final String PREVIEW = "preview";
|
||||
private static final String MAPPINGS = "mappings";
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static PreviewDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
|
||||
Object previewDocs = parser.map().get(PREVIEW);
|
||||
return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs);
|
||||
Map<String, Object> previewMap = parser.mapOrdered();
|
||||
Object previewDocs = previewMap.get(PREVIEW);
|
||||
Object mappings = previewMap.get(MAPPINGS);
|
||||
return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs, (Map<String, Object>) mappings);
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> docs;
|
||||
private Map<String, Object> mappings;
|
||||
|
||||
public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs) {
|
||||
public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs, Map<String, Object> mappings) {
|
||||
this.docs = docs;
|
||||
this.mappings = mappings;
|
||||
}
|
||||
|
||||
public List<Map<String, Object>> getDocs() {
|
||||
return docs;
|
||||
}
|
||||
|
||||
public Map<String, Object> getMappings() {
|
||||
return mappings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) {
|
||||
|
@ -57,12 +66,12 @@ public class PreviewDataFrameTransformResponse {
|
|||
}
|
||||
|
||||
PreviewDataFrameTransformResponse other = (PreviewDataFrameTransformResponse) obj;
|
||||
return Objects.equals(other.docs, docs);
|
||||
return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(docs);
|
||||
return Objects.hash(docs, mappings);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.junit.After;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -71,6 +72,7 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.oneOf;
|
||||
|
@ -277,6 +279,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|||
assertThat(taskState, is(DataFrameTransformTaskState.STOPPED));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPreview() throws IOException {
|
||||
String sourceIndex = "transform-source";
|
||||
createIndex(sourceIndex);
|
||||
|
@ -298,6 +301,12 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|||
Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
|
||||
assertTrue(michel.isPresent());
|
||||
assertEquals(3.6d, (double) michel.get().get("avg_rating"), 0.1d);
|
||||
|
||||
Map<String, Object> mappings = preview.getMappings();
|
||||
assertThat(mappings, hasKey("properties"));
|
||||
Map<String, Object> fields = (Map<String, Object>)mappings.get("properties");
|
||||
assertThat(fields.get("reviewer"), equalTo(Collections.singletonMap("type", "keyword")));
|
||||
assertThat(fields.get("avg_rating"), equalTo(Collections.singletonMap("type", "double")));
|
||||
}
|
||||
|
||||
private DataFrameTransformConfig validDataFrameTransformConfig(String id, String source, String destination) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -53,8 +54,13 @@ public class PreviewDataFrameTransformResponseTests extends ESTestCase {
|
|||
}
|
||||
docs.add(doc);
|
||||
}
|
||||
int numMappingEntries = randomIntBetween(5, 10);
|
||||
Map<String, Object> mappings = new HashMap<>(numMappingEntries);
|
||||
for (int i = 0; i < numMappingEntries; i++) {
|
||||
mappings.put(randomAlphaOfLength(10), Collections.singletonMap("type", randomAlphaOfLength(10)));
|
||||
}
|
||||
|
||||
return new PreviewDataFrameTransformResponse(docs);
|
||||
return new PreviewDataFrameTransformResponse(docs, mappings);
|
||||
}
|
||||
|
||||
private void toXContent(PreviewDataFrameTransformResponse response, XContentBuilder builder) throws IOException {
|
||||
|
@ -64,6 +70,7 @@ public class PreviewDataFrameTransformResponseTests extends ESTestCase {
|
|||
builder.map(doc);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.field("mappings", response.getMappings());
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -447,6 +447,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
|||
// end::preview-data-frame-transform-execute
|
||||
|
||||
assertNotNull(response.getDocs());
|
||||
assertNotNull(response.getMappings());
|
||||
}
|
||||
{
|
||||
// tag::preview-data-frame-transform-execute-listener
|
||||
|
|
|
@ -90,7 +90,17 @@ The data that is returned for this example is as follows:
|
|||
"customer_id" : "12"
|
||||
}
|
||||
...
|
||||
]
|
||||
],
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"max_price": {
|
||||
"type": "double"
|
||||
},
|
||||
"customer_id": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
----
|
||||
// NOTCONSOLE
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.core.dataframe.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
@ -28,6 +29,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -137,11 +139,14 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
|
|||
public static class Response extends ActionResponse implements ToXContentObject {
|
||||
|
||||
private List<Map<String, Object>> docs;
|
||||
private Map<String, Object> mappings;
|
||||
public static ParseField PREVIEW = new ParseField("preview");
|
||||
public static ParseField MAPPINGS = new ParseField("mappings");
|
||||
|
||||
static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
|
||||
static {
|
||||
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
|
||||
PARSER.declareObject(Response::setMappings, (p, c) -> p.mapOrdered(), MAPPINGS);
|
||||
}
|
||||
public Response() {}
|
||||
|
||||
|
@ -151,6 +156,10 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
|
|||
for (int i = 0; i < size; i++) {
|
||||
this.docs.add(in.readMap());
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
Map<String, Object> objectMap = in.readMap();
|
||||
this.mappings = objectMap == null ? null : Collections.unmodifiableMap(objectMap);
|
||||
}
|
||||
}
|
||||
|
||||
public Response(List<Map<String, Object>> docs) {
|
||||
|
@ -161,18 +170,56 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
|
|||
this.docs = new ArrayList<>(docs);
|
||||
}
|
||||
|
||||
public void setMappings(Map<String, Object> mappings) {
|
||||
this.mappings = Collections.unmodifiableMap(mappings);
|
||||
}
|
||||
|
||||
/**
|
||||
* This takes the a {@code Map<String, String>} of the type "fieldname: fieldtype" and transforms it into the
|
||||
* typical mapping format.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* input:
|
||||
* {"field1.subField1": "long", "field2": "keyword"}
|
||||
*
|
||||
* output:
|
||||
* {
|
||||
* "properties": {
|
||||
* "field1.subField1": {
|
||||
* "type": "long"
|
||||
* },
|
||||
* "field2": {
|
||||
* "type": "keyword"
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* @param mappings A Map of the form {"fieldName": "fieldType"}
|
||||
*/
|
||||
public void setMappingsFromStringMap(Map<String, String> mappings) {
|
||||
Map<String, Object> fieldMappings = new HashMap<>();
|
||||
mappings.forEach((k, v) -> fieldMappings.put(k, Collections.singletonMap("type", v)));
|
||||
this.mappings = Collections.singletonMap("properties", fieldMappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeInt(docs.size());
|
||||
for (Map<String, Object> doc : docs) {
|
||||
out.writeMapWithConsistentOrder(doc);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
out.writeMap(mappings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(PREVIEW.getPreferredName(), docs);
|
||||
if (mappings != null) {
|
||||
builder.field(MAPPINGS.getPreferredName(), mappings);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -188,12 +235,12 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
|
|||
}
|
||||
|
||||
Response other = (Response) obj;
|
||||
return Objects.equals(other.docs, docs);
|
||||
return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(docs);
|
||||
return Objects.hash(docs, mappings);
|
||||
}
|
||||
|
||||
public static Response fromXContent(final XContentParser parser) throws IOException {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAc
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -35,13 +36,28 @@ public class PreviewDataFrameTransformsActionResponseTests extends AbstractSeria
|
|||
int size = randomIntBetween(0, 10);
|
||||
List<Map<String, Object>> data = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Map<String, Object> datum = new HashMap<>();
|
||||
Map<String, Object> entry = new HashMap<>();
|
||||
entry.put("value1", randomIntBetween(1, 100));
|
||||
datum.put(randomAlphaOfLength(10), entry);
|
||||
data.add(datum);
|
||||
data.add(Collections.singletonMap(randomAlphaOfLength(10), Collections.singletonMap("value1", randomIntBetween(1, 100))));
|
||||
}
|
||||
return new Response(data);
|
||||
|
||||
Response response = new Response(data);
|
||||
if (randomBoolean()) {
|
||||
size = randomIntBetween(0, 10);
|
||||
if (randomBoolean()) {
|
||||
Map<String, Object> mappings = new HashMap<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
mappings.put(randomAlphaOfLength(10), Collections.singletonMap("type", randomAlphaOfLength(10)));
|
||||
}
|
||||
response.setMappings(mappings);
|
||||
} else {
|
||||
Map<String, String> mappings = new HashMap<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
mappings.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
||||
}
|
||||
response.setMappingsFromStringMap(mappings);
|
||||
}
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -105,17 +105,7 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
|
||||
Pivot pivot = new Pivot(config.getPivotConfig());
|
||||
|
||||
getPreview(pivot,
|
||||
config.getSource(),
|
||||
config.getDestination().getPipeline(),
|
||||
config.getDestination().getIndex(),
|
||||
ActionListener.wrap(
|
||||
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
|
||||
error -> {
|
||||
logger.error("Failure gathering preview", error);
|
||||
listener.onFailure(error);
|
||||
}
|
||||
));
|
||||
getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -123,7 +113,8 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
SourceConfig source,
|
||||
String pipeline,
|
||||
String dest,
|
||||
ActionListener<List<Map<String, Object>>> listener) {
|
||||
ActionListener<PreviewDataFrameTransformAction.Response> listener) {
|
||||
final PreviewDataFrameTransformAction.Response previewResponse = new PreviewDataFrameTransformAction.Response();
|
||||
ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(
|
||||
simulatePipelineResponse -> {
|
||||
List<Map<String, Object>> response = new ArrayList<>(simulatePipelineResponse.getResults().size());
|
||||
|
@ -136,12 +127,14 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
response.add((Map<String, Object>)XContentMapValues.extractValue("doc._source", tempMap));
|
||||
}
|
||||
}
|
||||
listener.onResponse(response);
|
||||
previewResponse.setDocs(response);
|
||||
listener.onResponse(previewResponse);
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
pivot.deduceMappings(client, source, ActionListener.wrap(
|
||||
deducedMappings -> {
|
||||
previewResponse.setMappingsFromStringMap(deducedMappings);
|
||||
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
|
||||
ClientHelper.DATA_FRAME_ORIGIN,
|
||||
client,
|
||||
|
@ -158,7 +151,8 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
|
||||
.peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_")))
|
||||
.collect(Collectors.toList());
|
||||
listener.onResponse(results);
|
||||
previewResponse.setDocs(results);
|
||||
listener.onResponse(previewResponse);
|
||||
} else {
|
||||
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
|
||||
.map(doc -> {
|
||||
|
|
|
@ -98,6 +98,11 @@ setup:
|
|||
- match: { preview.2.avg_response: 42.0 }
|
||||
- match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
|
||||
- match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
|
||||
- match: { mappings.properties.airline.type: "keyword" }
|
||||
- match: { mappings.properties.by-hour.type: "date" }
|
||||
- match: { mappings.properties.avg_response.type: "double" }
|
||||
- match: { mappings.properties.time\.max.type: "date" }
|
||||
- match: { mappings.properties.time\.min.type: "date" }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
|
@ -141,6 +146,9 @@ setup:
|
|||
- match: { preview.2.by-hour: 1487379600000 }
|
||||
- match: { preview.2.avg_response: 42.0 }
|
||||
- match: { preview.2.my_field: 42 }
|
||||
- match: { mappings.properties.airline.type: "keyword" }
|
||||
- match: { mappings.properties.by-hour.type: "date" }
|
||||
- match: { mappings.properties.avg_response.type: "double" }
|
||||
|
||||
---
|
||||
"Test preview transform with invalid config":
|
||||
|
|
Loading…
Reference in New Issue