mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
insert explict mappings for objects in nested output to avoid clashes with index templates fixes #51321
This commit is contained in:
parent
61663b495e
commit
2239ba8c6e
@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
|
||||
private static boolean indicesCreated = false;
|
||||
|
||||
// preserve indices in order to reuse source indices in several test cases
|
||||
@Override
|
||||
protected boolean preserveIndicesUponCompletion() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createIndexes() throws IOException {
|
||||
|
||||
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
|
||||
if (indicesCreated) {
|
||||
return;
|
||||
}
|
||||
|
||||
createReviewsIndex();
|
||||
indicesCreated = true;
|
||||
}
|
||||
|
||||
public void testIndexTemplateMappingClash() throws Exception {
|
||||
String transformId = "special_pivot_template_mappings_clash";
|
||||
String transformIndex = "special_pivot_template_mappings_clash";
|
||||
|
||||
// create a template that defines a field "rating" with a type "float" which will clash later with
|
||||
// output field "rating.avg" in the pivot config
|
||||
final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template");
|
||||
|
||||
String template = "{"
|
||||
+ "\"index_patterns\" : [\"special_pivot_template*\"],"
|
||||
+ " \"mappings\" : {"
|
||||
+ " \"properties\": {"
|
||||
+ " \"rating\":{"
|
||||
+ " \"type\": \"float\"\n"
|
||||
+ " }"
|
||||
+ " }"
|
||||
+ " }"
|
||||
+ "}";
|
||||
|
||||
createIndexTemplateRequest.setJsonEntity(template);
|
||||
Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
|
||||
assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
|
||||
final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
|
||||
|
||||
String config = "{"
|
||||
+ " \"source\": {\"index\":\""
|
||||
+ REVIEWS_INDEX_NAME
|
||||
+ "\"},"
|
||||
+ " \"dest\": {\"index\":\""
|
||||
+ transformIndex
|
||||
+ "\"},";
|
||||
|
||||
config += " \"pivot\": {"
|
||||
+ " \"group_by\": {"
|
||||
+ " \"reviewer\": {"
|
||||
+ " \"terms\": {"
|
||||
+ " \"field\": \"user_id\""
|
||||
+ " } } },"
|
||||
+ " \"aggregations\": {"
|
||||
+ " \"rating.avg\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"field\": \"stars\""
|
||||
+ " } }"
|
||||
+ " } }"
|
||||
+ "}";
|
||||
|
||||
createTransformRequest.setJsonEntity(config);
|
||||
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
|
||||
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
|
||||
startAndWaitForTransform(transformId, transformIndex);
|
||||
assertTrue(indexExists(transformIndex));
|
||||
|
||||
// we expect 27 documents as there shall be 27 user_id's
|
||||
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
|
||||
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
|
||||
|
||||
// get and check some users
|
||||
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
|
||||
|
||||
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
|
||||
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
|
||||
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
|
||||
}
|
||||
}
|
@ -427,6 +427,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
||||
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
|
||||
|
||||
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
|
||||
|
||||
// assert that the transform did not fail
|
||||
assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
|
||||
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
|
||||
}
|
||||
|
||||
|
@ -42,8 +42,7 @@ public final class SchemaUtil {
|
||||
NUMERIC_FIELD_MAPPER_TYPES = types;
|
||||
}
|
||||
|
||||
private SchemaUtil() {
|
||||
}
|
||||
private SchemaUtil() {}
|
||||
|
||||
public static boolean isNumericType(String type) {
|
||||
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
|
||||
@ -59,10 +58,12 @@ public final class SchemaUtil {
|
||||
* @param source Source index that contains the data to pivot
|
||||
* @param listener Listener to alert on success or failure.
|
||||
*/
|
||||
public static void deduceMappings(final Client client,
|
||||
final PivotConfig config,
|
||||
final String[] source,
|
||||
final ActionListener<Map<String, String>> listener) {
|
||||
public static void deduceMappings(
|
||||
final Client client,
|
||||
final PivotConfig config,
|
||||
final String[] source,
|
||||
final ActionListener<Map<String, String>> listener
|
||||
) {
|
||||
// collects the fieldnames used as source for aggregations
|
||||
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
|
||||
// collects the aggregation types by source name
|
||||
@ -70,16 +71,16 @@ public final class SchemaUtil {
|
||||
// collects the fieldnames and target fieldnames used for grouping
|
||||
Map<String, String> fieldNamesForGrouping = new HashMap<>();
|
||||
|
||||
config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> {
|
||||
fieldNamesForGrouping.put(destinationFieldName, group.getField());
|
||||
});
|
||||
config.getGroupConfig()
|
||||
.getGroups()
|
||||
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
|
||||
|
||||
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
|
||||
if (agg instanceof ValuesSourceAggregationBuilder) {
|
||||
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
|
||||
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
|
||||
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
|
||||
} else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
|
||||
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
|
||||
aggregationTypes.put(agg.getName(), agg.getType());
|
||||
} else {
|
||||
// execution should not reach this point
|
||||
@ -98,13 +99,17 @@ public final class SchemaUtil {
|
||||
allFieldNames.putAll(aggregationSourceFieldNames);
|
||||
allFieldNames.putAll(fieldNamesForGrouping);
|
||||
|
||||
getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
|
||||
getSourceFieldMappings(
|
||||
client,
|
||||
source,
|
||||
allFieldNames.values().toArray(new String[0]),
|
||||
ActionListener.wrap(
|
||||
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
|
||||
aggregationTypes,
|
||||
fieldNamesForGrouping,
|
||||
sourceMappings)),
|
||||
listener::onFailure));
|
||||
sourceMappings -> listener.onResponse(
|
||||
resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
|
||||
),
|
||||
listener::onFailure
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -115,27 +120,29 @@ public final class SchemaUtil {
|
||||
* @param index The index, or index pattern, from which to gather all the field mappings
|
||||
* @param listener The listener to be alerted on success or failure.
|
||||
*/
|
||||
public static void getDestinationFieldMappings(final Client client,
|
||||
final String index,
|
||||
final ActionListener<Map<String, String>> listener) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
||||
.indices(index)
|
||||
public static void getDestinationFieldMappings(
|
||||
final Client client,
|
||||
final String index,
|
||||
final ActionListener<Map<String, String>> listener
|
||||
) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
|
||||
.fields("*")
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
ClientHelper.executeAsyncWithOrigin(client,
|
||||
ClientHelper.executeAsyncWithOrigin(
|
||||
client,
|
||||
ClientHelper.TRANSFORM_ORIGIN,
|
||||
FieldCapabilitiesAction.INSTANCE,
|
||||
fieldCapabilitiesRequest,
|
||||
ActionListener.wrap(
|
||||
r -> listener.onResponse(extractFieldMappings(r)),
|
||||
listener::onFailure
|
||||
));
|
||||
ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure)
|
||||
);
|
||||
}
|
||||
|
||||
private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
|
||||
Map<String, String> aggregationTypes,
|
||||
Map<String, String> fieldNamesForGrouping,
|
||||
Map<String, String> sourceMappings) {
|
||||
private static Map<String, String> resolveMappings(
|
||||
Map<String, String> aggregationSourceFieldNames,
|
||||
Map<String, String> aggregationTypes,
|
||||
Map<String, String> fieldNamesForGrouping,
|
||||
Map<String, String> sourceMappings
|
||||
) {
|
||||
Map<String, String> targetMapping = new HashMap<>();
|
||||
|
||||
aggregationTypes.forEach((targetFieldName, aggregationName) -> {
|
||||
@ -143,8 +150,7 @@ public final class SchemaUtil {
|
||||
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
|
||||
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
|
||||
|
||||
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
|
||||
targetFieldName, aggregationName, destinationMapping);
|
||||
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);
|
||||
|
||||
if (Aggregations.isDynamicMapping(destinationMapping)) {
|
||||
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
|
||||
@ -165,34 +171,75 @@ public final class SchemaUtil {
|
||||
targetMapping.put(targetFieldName, "keyword");
|
||||
}
|
||||
});
|
||||
|
||||
// insert object mappings for nested fields
|
||||
insertNestedObjectMappings(targetMapping);
|
||||
|
||||
return targetMapping;
|
||||
}
|
||||
|
||||
/*
|
||||
* Very "magic" helper method to extract the source mappings
|
||||
*/
|
||||
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
|
||||
ActionListener<Map<String, String>> listener) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
|
||||
.indices(index)
|
||||
private static void getSourceFieldMappings(
|
||||
Client client,
|
||||
String[] index,
|
||||
String[] fields,
|
||||
ActionListener<Map<String, String>> listener
|
||||
) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
|
||||
.fields(fields)
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
|
||||
response -> listener.onResponse(extractFieldMappings(response)),
|
||||
listener::onFailure));
|
||||
client.execute(
|
||||
FieldCapabilitiesAction.INSTANCE,
|
||||
fieldCapabilitiesRequest,
|
||||
ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)
|
||||
);
|
||||
}
|
||||
|
||||
private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
|
||||
Map<String, String> extractedTypes = new HashMap<>();
|
||||
|
||||
response.get().forEach((fieldName, capabilitiesMap) -> {
|
||||
// TODO: overwrites types, requires resolve if
|
||||
// types are mixed
|
||||
capabilitiesMap.forEach((name, capability) -> {
|
||||
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
|
||||
extractedTypes.put(fieldName, capability.getType());
|
||||
});
|
||||
});
|
||||
response.get()
|
||||
.forEach(
|
||||
(fieldName, capabilitiesMap) -> {
|
||||
// TODO: overwrites types, requires resolve if
|
||||
// types are mixed
|
||||
capabilitiesMap.forEach((name, capability) -> {
|
||||
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
|
||||
extractedTypes.put(fieldName, capability.getType());
|
||||
});
|
||||
}
|
||||
);
|
||||
return extractedTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert object mappings for fields like:
|
||||
*
|
||||
* a.b.c : some_type
|
||||
*
|
||||
* in which case it creates additional mappings:
|
||||
*
|
||||
* a.b : object
|
||||
* a : object
|
||||
*
|
||||
* avoids snafu with index templates injecting incompatible mappings
|
||||
*
|
||||
* @param fieldMappings field mappings to inject to
|
||||
*/
|
||||
static void insertNestedObjectMappings(Map<String, String> fieldMappings) {
|
||||
Map<String, String> additionalMappings = new HashMap<>();
|
||||
fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> {
|
||||
int pos;
|
||||
String objectKey = key;
|
||||
// lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0
|
||||
while ((pos = objectKey.lastIndexOf(".")) > 0) {
|
||||
objectKey = objectKey.substring(0, pos);
|
||||
additionalMappings.putIfAbsent(objectKey, "object");
|
||||
}
|
||||
});
|
||||
|
||||
additionalMappings.forEach(fieldMappings::putIfAbsent);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.transforms.pivot;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class SchemaUtilTests extends ESTestCase {
|
||||
|
||||
public void testInsertNestedObjectMappings() {
|
||||
Map<String, String> fieldMappings = new HashMap<>();
|
||||
|
||||
// creates: a.b, a
|
||||
fieldMappings.put("a.b.c", "long");
|
||||
fieldMappings.put("a.b.d", "double");
|
||||
// creates: c.b, c
|
||||
fieldMappings.put("c.b.a", "double");
|
||||
// creates: c.d
|
||||
fieldMappings.put("c.d.e", "object");
|
||||
fieldMappings.put("d", "long");
|
||||
fieldMappings.put("e.f.g", "long");
|
||||
// cc: already there
|
||||
fieldMappings.put("e.f", "object");
|
||||
// cc: already there but different type (should not be possible)
|
||||
fieldMappings.put("e", "long");
|
||||
// cc: start with . (should not be possible)
|
||||
fieldMappings.put(".x", "long");
|
||||
// cc: start and ends with . (should not be possible), creates: .y
|
||||
fieldMappings.put(".y.", "long");
|
||||
// cc: ends with . (should not be possible), creates: .z
|
||||
fieldMappings.put(".z.", "long");
|
||||
|
||||
SchemaUtil.insertNestedObjectMappings(fieldMappings);
|
||||
|
||||
assertEquals(18, fieldMappings.size());
|
||||
assertEquals("long", fieldMappings.get("a.b.c"));
|
||||
assertEquals("object", fieldMappings.get("a.b"));
|
||||
assertEquals("double", fieldMappings.get("a.b.d"));
|
||||
assertEquals("object", fieldMappings.get("a"));
|
||||
assertEquals("object", fieldMappings.get("c.d"));
|
||||
assertEquals("object", fieldMappings.get("e.f"));
|
||||
assertEquals("long", fieldMappings.get("e"));
|
||||
assertEquals("object", fieldMappings.get(".y"));
|
||||
assertEquals("object", fieldMappings.get(".z"));
|
||||
assertFalse(fieldMappings.containsKey("."));
|
||||
assertFalse(fieldMappings.containsKey(""));
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user