From 92acc732de3bb872272b2ede83dbb09de78604b7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 16 Sep 2019 10:05:55 -0400 Subject: [PATCH] [ML][Transform] Use field caps for mapping deductino (#46703) (#46742) --- .../test/data_frame/transforms_start_stop.yml | 27 +++++++ .../transforms/pivot/SchemaUtil.java | 78 ++++++++----------- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 044f5212a99..199146a8f0d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -5,6 +5,9 @@ setup: body: mappings: properties: + time_alias: + type: alias + path: time time: type: date airline: @@ -322,3 +325,27 @@ teardown: - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-stop-all" +--- +"Test start/stop with field alias": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline_via_field_alias" + body: > + { + "source": {"index": "airline-data"}, + "dest": {"index": "airline-data-time-alias"}, + "pivot": { + "group_by": {"time": {"date_histogram": {"field": "time_alias", "calendar_interval": "1m"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline_via_field_alias" + - match: { acknowledged: true } + + - do: + indices.get_mapping: + index: airline-data-time-alias + - match: { airline-data-time-alias.mappings.properties.time.type: date } + - match: { airline-data-time-alias.mappings.properties.avg_response.type: double } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index 26bec18ea59..bc24ed10c07 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -9,9 +9,9 @@ package org.elasticsearch.xpack.transform.transforms.pivot; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction; -import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -99,12 +99,12 @@ public final class SchemaUtil { allFieldNames.putAll(fieldNamesForGrouping); getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), - ActionListener.wrap( - sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, - aggregationTypes, - fieldNamesForGrouping, - sourceMappings)), - listener::onFailure)); + ActionListener.wrap( + sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, + aggregationTypes, + fieldNamesForGrouping, + sourceMappings)), + listener::onFailure)); } /** @@ -118,15 +118,16 @@ public final class SchemaUtil { public static void getDestinationFieldMappings(final Client client, final String index, final ActionListener> listener) { - GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest(); - fieldMappingRequest.indices(index); - fieldMappingRequest.fields("*"); + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() + .indices(index) + .fields("*") + .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, - GetFieldMappingsAction.INSTANCE, - fieldMappingRequest, + FieldCapabilitiesAction.INSTANCE, + fieldCapabilitiesRequest, ActionListener.wrap( - r -> listener.onResponse(extractFieldMappings(r.mappings())), + r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure )); } @@ -143,7 +144,7 @@ public final class SchemaUtil { String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping); logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", - targetFieldName, aggregationName, destinationMapping); + targetFieldName, aggregationName, destinationMapping); if (Aggregations.isDynamicMapping(destinationMapping)) { logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName); @@ -171,42 +172,25 @@ public final class SchemaUtil { * Very "magic" helper method to extract the source mappings */ private static void getSourceFieldMappings(Client client, String[] index, String[] fields, - ActionListener> listener) { - GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest(); - fieldMappingRequest.indices(index); - fieldMappingRequest.fields(fields); - fieldMappingRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - - client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap( - response -> listener.onResponse(extractFieldMappings(response.mappings())), + ActionListener> listener) { + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() + .indices(index) + .fields(fields) + .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap( + response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)); } - private static Map extractFieldMappings(Map>> mappings) { + private static Map extractFieldMappings(FieldCapabilitiesResponse response) { Map extractedTypes = new HashMap<>(); - mappings.forEach((indexName, docTypeToMapping) -> { - // "_doc" -> - docTypeToMapping.forEach((docType, fieldNameToMapping) -> { - // "my_field" -> - fieldNameToMapping.forEach((fieldName, fieldMapping) -> { - // "mapping" -> "my_field" -> - fieldMapping.sourceAsMap().forEach((name, typeMap) -> { - // expected object: { "type": type } - if (typeMap instanceof Map) { - final Map map = (Map) typeMap; - if (map.containsKey("type")) { - String type = map.get("type").toString(); - if (logger.isTraceEnabled()) { - logger.trace("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName + "]"); - } - // TODO: overwrites types, requires resolve if - // types are mixed - extractedTypes.put(fieldName, type); - } - } - }); - }); + response.get().forEach((fieldName, capabilitiesMap) -> { + // TODO: overwrites types, requires resolve if + // types are mixed + capabilitiesMap.forEach((name, capability) -> { + logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); + extractedTypes.put(fieldName, capability.getType()); }); }); return extractedTypes;