parent
20cb95ca5e
commit
92acc732de
|
@ -5,6 +5,9 @@ setup:
|
|||
body:
|
||||
mappings:
|
||||
properties:
|
||||
time_alias:
|
||||
type: alias
|
||||
path: time
|
||||
time:
|
||||
type: date
|
||||
airline:
|
||||
|
@ -322,3 +325,27 @@ teardown:
|
|||
- do:
|
||||
data_frame.delete_data_frame_transform:
|
||||
transform_id: "airline-transform-stop-all"
|
||||
---
|
||||
"Test start/stop with field alias":
|
||||
- do:
|
||||
data_frame.put_data_frame_transform:
|
||||
transform_id: "airline_via_field_alias"
|
||||
body: >
|
||||
{
|
||||
"source": {"index": "airline-data"},
|
||||
"dest": {"index": "airline-data-time-alias"},
|
||||
"pivot": {
|
||||
"group_by": {"time": {"date_histogram": {"field": "time_alias", "calendar_interval": "1m"}}},
|
||||
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
|
||||
}
|
||||
}
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline_via_field_alias"
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
indices.get_mapping:
|
||||
index: airline-data-time-alias
|
||||
- match: { airline-data-time-alias.mappings.properties.time.type: date }
|
||||
- match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
|
||||
|
|
|
@ -9,9 +9,9 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
|
@ -99,12 +99,12 @@ public final class SchemaUtil {
|
|||
allFieldNames.putAll(fieldNamesForGrouping);
|
||||
|
||||
getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
|
||||
ActionListener.wrap(
|
||||
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
|
||||
aggregationTypes,
|
||||
fieldNamesForGrouping,
|
||||
sourceMappings)),
|
||||
listener::onFailure));
|
||||
ActionListener.wrap(
|
||||
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
|
||||
aggregationTypes,
|
||||
fieldNamesForGrouping,
|
||||
sourceMappings)),
|
||||
listener::onFailure));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,15 +118,16 @@ public final class SchemaUtil {
|
|||
public static void getDestinationFieldMappings(final Client client,
|
||||
final String index,
|
||||
final ActionListener<Map<String, String>> listener) {
|
||||
GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
|
||||
fieldMappingRequest.indices(index);
|
||||
fieldMappingRequest.fields("*");
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
||||
.indices(index)
|
||||
.fields("*")
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
ClientHelper.executeAsyncWithOrigin(client,
|
||||
ClientHelper.DATA_FRAME_ORIGIN,
|
||||
GetFieldMappingsAction.INSTANCE,
|
||||
fieldMappingRequest,
|
||||
FieldCapabilitiesAction.INSTANCE,
|
||||
fieldCapabilitiesRequest,
|
||||
ActionListener.wrap(
|
||||
r -> listener.onResponse(extractFieldMappings(r.mappings())),
|
||||
r -> listener.onResponse(extractFieldMappings(r)),
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
|
@ -143,7 +144,7 @@ public final class SchemaUtil {
|
|||
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
|
||||
|
||||
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
|
||||
targetFieldName, aggregationName, destinationMapping);
|
||||
targetFieldName, aggregationName, destinationMapping);
|
||||
|
||||
if (Aggregations.isDynamicMapping(destinationMapping)) {
|
||||
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
|
||||
|
@ -171,42 +172,25 @@ public final class SchemaUtil {
|
|||
* Very "magic" helper method to extract the source mappings
|
||||
*/
|
||||
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
|
||||
ActionListener<Map<String, String>> listener) {
|
||||
GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
|
||||
fieldMappingRequest.indices(index);
|
||||
fieldMappingRequest.fields(fields);
|
||||
fieldMappingRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
|
||||
client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(
|
||||
response -> listener.onResponse(extractFieldMappings(response.mappings())),
|
||||
ActionListener<Map<String, String>> listener) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
||||
.indices(index)
|
||||
.fields(fields)
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
|
||||
response -> listener.onResponse(extractFieldMappings(response)),
|
||||
listener::onFailure));
|
||||
}
|
||||
|
||||
private static Map<String, String> extractFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
|
||||
private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
|
||||
Map<String, String> extractedTypes = new HashMap<>();
|
||||
|
||||
mappings.forEach((indexName, docTypeToMapping) -> {
|
||||
// "_doc" ->
|
||||
docTypeToMapping.forEach((docType, fieldNameToMapping) -> {
|
||||
// "my_field" ->
|
||||
fieldNameToMapping.forEach((fieldName, fieldMapping) -> {
|
||||
// "mapping" -> "my_field" ->
|
||||
fieldMapping.sourceAsMap().forEach((name, typeMap) -> {
|
||||
// expected object: { "type": type }
|
||||
if (typeMap instanceof Map) {
|
||||
final Map<?, ?> map = (Map<?, ?>) typeMap;
|
||||
if (map.containsKey("type")) {
|
||||
String type = map.get("type").toString();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName + "]");
|
||||
}
|
||||
// TODO: overwrites types, requires resolve if
|
||||
// types are mixed
|
||||
extractedTypes.put(fieldName, type);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
response.get().forEach((fieldName, capabilitiesMap) -> {
|
||||
// TODO: overwrites types, requires resolve if
|
||||
// types are mixed
|
||||
capabilitiesMap.forEach((name, capability) -> {
|
||||
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
|
||||
extractedTypes.put(fieldName, capability.getType());
|
||||
});
|
||||
});
|
||||
return extractedTypes;
|
||||
|
|
Loading…
Reference in New Issue