diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index ba4143dd1ce..7fd633a1e9d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -45,7 +45,7 @@ public class DataFrameMessages { "Failed to create composite aggregation from pivot function"; public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID = "Data frame transform configuration [{0}] has invalid elements"; - + public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY = "Failed to parse query for data frame transform"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 17e918625a2..635038e2a48 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Set; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class DataFramePivotRestIT extends DataFrameRestTestCase { @@ -267,6 +268,52 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { }); } + public void testPivotWithMaxOnDateField() throws Exception { + String transformId = "simpleDateHistogramPivotWithMaxTime"; + String dataFrameIndex = "pivot_reviews_via_date_histogram_with_max_time"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); + + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + String config = "{" + + " \"source\": \"" + REVIEWS_INDEX_NAME + "\"," + + " \"dest\": \"" + dataFrameIndex + "\","; + + config +=" \"pivot\": { \n" + + " \"group_by\": {\n" + + " \"by_day\": {\"date_histogram\": {\n" + + " \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" + + " }}\n" + + " },\n" + + " \n" + + " \"aggs\" :{\n" + + " \"avg_rating\": {\n" + + " \"avg\": {\"field\": \"stars\"}\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"max\": {\"field\": \"timestamp\"}\n" + + " }\n" + + " }}" + + "}"; + + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + assertTrue(indexExists(dataFrameIndex)); + + startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + // we expect 21 documents as there shall be 21 days worth of docs + Map indexStats = getAsMap(dataFrameIndex + "/_stats"); + assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82); + Map searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15"); + String actual = (String) ((List) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0); + // Do `containsString` as actual ending timestamp is indeterminate due to how data is generated + assertThat(actual, containsString("2017-01-15T20:")); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 21a87fa743a..8cafb33cc62 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.util.List; @@ -57,9 +58,11 @@ public class TransportPreviewDataFrameTransformAction extends return; } - Pivot pivot = new Pivot(request.getConfig().getSource(), - request.getConfig().getQueryConfig().getQuery(), - request.getConfig().getPivotConfig()); + final DataFrameTransformConfig config = request.getConfig(); + + Pivot pivot = new Pivot(config.getSource(), + config.getQueryConfig().getQuery(), + config.getPivotConfig()); getPreview(pivot, ActionListener.wrap( previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), @@ -68,18 +71,24 @@ public class TransportPreviewDataFrameTransformAction extends } private void getPreview(Pivot pivot, ActionListener>> listener) { - ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), - ClientHelper.DATA_FRAME_ORIGIN, - client, - SearchAction.INSTANCE, - pivot.buildSearchRequest(null), - ActionListener.wrap( - r -> { - final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); - DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); - listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList())); - }, - listener::onFailure - )); + pivot.deduceMappings(client, ActionListener.wrap( + deducedMappings -> { + ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), + ClientHelper.DATA_FRAME_ORIGIN, + client, + SearchAction.INSTANCE, + pivot.buildSearchRequest(null), + ActionListener.wrap( + r -> { + final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); + DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); + listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList())); + }, + listener::onFailure + )); + }, + listener::onFailure + )); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 8fa5a36b3d8..f32e9086940 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -44,6 +44,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer getFieldMappings(); + @Override protected void onStartJob(long now) { QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery(); @@ -70,7 +72,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer { + return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> { XContentBuilder builder; try { builder = jsonBuilder(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index a8d94f0ae70..107adc3b2bf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -230,6 +231,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; + private Map fieldMappings = null; private DataFrameTransformConfig transformConfig = null; @@ -248,6 +250,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return transformConfig; } + @Override + protected Map getFieldMappings() { + return fieldMappings; + } + @Override protected String getJobId() { return transformId; @@ -279,6 +286,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); } + if (fieldMappings == null) { + CountDownLatch latch = new CountDownLatch(1); + SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>( + ActionListener.wrap( + destinationMappings -> fieldMappings = destinationMappings, + e -> { + throw new RuntimeException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, + transformConfig.getDestination()), + e); + }), latch)); + try { + latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, + transformConfig.getDestination()), + e); + } + } + return super.maybeTriggerAsyncJob(now); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index 496f1e0ac13..fa7536497c4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType; + final class AggregationResultUtils { private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class); @@ -30,30 +32,38 @@ final class AggregationResultUtils { * @param agg The aggregation result * @param groups The original groupings used for querying * @param aggregationBuilders the aggregation used for querying - * @param dataFrameIndexerTransformStats stats collector + * @param fieldTypeMap A Map containing "field-name": "type" entries to determine the appropriate type for the aggregation results. + * @param stats stats collector * @return a map containing the results of the aggregation in a consumable way */ public static Stream> extractCompositeAggregationResults(CompositeAggregation agg, - GroupConfig groups, - Collection aggregationBuilders, - DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { + GroupConfig groups, + Collection aggregationBuilders, + Map fieldTypeMap, + DataFrameIndexerTransformStats stats) { return agg.getBuckets().stream().map(bucket -> { - dataFrameIndexerTransformStats.incrementNumDocuments(bucket.getDocCount()); + stats.incrementNumDocuments(bucket.getDocCount()); Map document = new HashMap<>(); - groups.getGroups().keySet().forEach(destinationFieldName -> { - document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)); - }); + groups.getGroups().keySet().forEach(destinationFieldName -> + document.put(destinationFieldName, bucket.getKey().get(destinationFieldName))); for (AggregationBuilder aggregationBuilder : aggregationBuilders) { String aggName = aggregationBuilder.getName(); + final String fieldType = fieldTypeMap.get(aggName); // TODO: support other aggregation types Aggregation aggResult = bucket.getAggregations().get(aggName); if (aggResult instanceof NumericMetricsAggregation.SingleValue) { NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult; - document.put(aggName, aggResultSingleValue.value()); + // If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose + // formatted outputs. + if (isNumericType(fieldType)) { + document.put(aggName, aggResultSingleValue.value()); + } else { + document.put(aggName, aggResultSingleValue.getValueAsString()); + } } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 2d23444e872..26ac7d93bf3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -77,12 +77,17 @@ public class Pivot { } public Stream> extractResults(CompositeAggregation agg, - DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { + Map fieldTypeMap, + DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { GroupConfig groups = config.getGroupConfig(); Collection aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); - return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, dataFrameIndexerTransformStats); + return AggregationResultUtils.extractCompositeAggregationResults(agg, + groups, + aggregationBuilders, + fieldTypeMap, + dataFrameIndexerTransformStats); } private void runTestQuery(Client client, final ActionListener listener) { @@ -99,7 +104,7 @@ public class Pivot { } listener.onResponse(true); }, e->{ - listener.onFailure(new RuntimeException("Failed to test query",e)); + listener.onFailure(new RuntimeException("Failed to test query", e)); })); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java index a8be175cf53..175be3ea30e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java @@ -13,20 +13,51 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; import org.elasticsearch.client.Client; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public class SchemaUtil { +public final class SchemaUtil { private static final Logger logger = LogManager.getLogger(SchemaUtil.class); + // Full collection of numeric field type strings + private static final Set NUMERIC_FIELD_MAPPER_TYPES; + static { + Set types = Stream.of(NumberFieldMapper.NumberType.values()) + .map(NumberFieldMapper.NumberType::typeName) + .collect(Collectors.toSet()); + types.add("scaled_float"); // have to add manually since scaled_float is in a module + NUMERIC_FIELD_MAPPER_TYPES = types; + } + private SchemaUtil() { } - public static void deduceMappings(final Client client, final PivotConfig config, final String source, + public static boolean isNumericType(String type) { + return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type); + } + + /** + * Deduce the mappings for the destination index given the source index + * + * The Listener is alerted with a {@code Map} that is a "field-name":"type" mapping + * + * @param client Client from which to make requests against the cluster + * @param config The PivotConfig for which to deduce destination mapping + * @param source Source index that contains the data to pivot + * @param listener Listener to alert on success or failure. + */ + public static void deduceMappings(final Client client, + final PivotConfig config, + final String source, final ActionListener> listener) { // collects the fieldnames used as source for aggregations Map aggregationSourceFieldNames = new HashMap<>(); @@ -56,18 +87,42 @@ public class SchemaUtil { allFieldNames.putAll(fieldNamesForGrouping); getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), - ActionListener.wrap(sourceMappings -> { - Map targetMapping = resolveMappings(aggregationSourceFieldNames, aggregationTypes, - fieldNamesForGrouping, sourceMappings); + ActionListener.wrap( + sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, + aggregationTypes, + fieldNamesForGrouping, + sourceMappings)), + listener::onFailure)); + } - listener.onResponse(targetMapping); - }, e -> { - listener.onFailure(e); - })); + /** + * Gathers the field mappings for the "destination" index. Listener will receive an error, or a {@code Map} of + * "field-name":"type". + * + * @param client Client used to execute the request + * @param index The index, or index pattern, from which to gather all the field mappings + * @param listener The listener to be alerted on success or failure. + */ + public static void getDestinationFieldMappings(final Client client, + final String index, + final ActionListener> listener) { + GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest(); + fieldMappingRequest.indices(index); + fieldMappingRequest.fields("*"); + ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + GetFieldMappingsAction.INSTANCE, + fieldMappingRequest, + ActionListener.wrap( + r -> listener.onResponse(extractFieldMappings(r.mappings())), + listener::onFailure + )); } private static Map resolveMappings(Map aggregationSourceFieldNames, - Map aggregationTypes, Map fieldNamesForGrouping, Map sourceMappings) { + Map aggregationTypes, + Map fieldNamesForGrouping, + Map sourceMappings) { Map targetMapping = new HashMap<>(); aggregationTypes.forEach((targetFieldName, aggregationName) -> { @@ -107,14 +162,12 @@ public class SchemaUtil { fieldMappingRequest.indices(index); fieldMappingRequest.fields(fields); - client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(response -> { - listener.onResponse(extractSourceFieldMappings(response.mappings())); - }, e -> { - listener.onFailure(e); - })); + client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap( + response -> listener.onResponse(extractFieldMappings(response.mappings())), + listener::onFailure)); } - private static Map extractSourceFieldMappings(Map>> mappings) { + private static Map extractFieldMappings(Map>> mappings) { Map extractedTypes = new HashMap<>(); mappings.forEach((indexName, docTypeToMapping) -> { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java index 30b710ac13c..287f327d0f6 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java @@ -140,8 +140,11 @@ public class AggregationResultUtilsTests extends ESTestCase { aggName, 12.55 ) ); - - executeTest(groupBy, aggregationBuilders, input, expected, 20); + Map fieldTypeMap = asStringMap( + targetField, "keyword", + aggName, "double" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20); } public void testExtractCompositeAggregationResultsMultiSources() throws IOException { @@ -222,7 +225,12 @@ public class AggregationResultUtilsTests extends ESTestCase { aggName, 12.55 ) ); - executeTest(groupBy, aggregationBuilders, input, expected, 10); + Map fieldTypeMap = asStringMap( + aggName, "double", + targetField, "keyword", + targetField2, "keyword" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10); } public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { @@ -287,11 +295,119 @@ public class AggregationResultUtilsTests extends ESTestCase { aggName2, -2.44 ) ); - executeTest(groupBy, aggregationBuilders, input, expected, 200); + Map fieldTypeMap = asStringMap( + targetField, "keyword", + aggName, "double", + aggName2, "double" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 200); + } + + public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + + GroupConfig groupBy = parseGroupConfig("{" + + "\"" + targetField + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }," + + "\"" + targetField2 + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }" + + "}"); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + + String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + String aggTypedName2 = "max#" + aggName2; + + Collection aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2)); + + Map input = asMap( + "buckets", + asList( + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 42.33), + aggTypedName2, asMap( + "value", 9.9), + DOC_COUNT, 1), + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 8.4), + aggTypedName2, asMap( + "value", 222.33), + DOC_COUNT, 2), + asMap( + KEY, asMap( + targetField, "ID2", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 28.99), + aggTypedName2, asMap( + "value", -2.44), + DOC_COUNT, 3), + asMap( + KEY, asMap( + targetField, "ID3", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 12.55), + aggTypedName2, asMap( + "value", -100.44), + DOC_COUNT, 4) + )); + + List> expected = asList( + asMap( + targetField, "ID1", + targetField2, "ID1_2", + aggName, 42.33, + aggName2, "9.9" + ), + asMap( + targetField, "ID1", + targetField2, "ID2_2", + aggName, 8.4, + aggName2, "222.33" + ), + asMap( + targetField, "ID2", + targetField2, "ID1_2", + aggName, 28.99, + aggName2, "-2.44" + ), + asMap( + targetField, "ID3", + targetField2, "ID2_2", + aggName, 12.55, + aggName2, "-100.44" + ) + ); + Map fieldTypeMap = asStringMap( + aggName, "double", + aggName2, "keyword", // If the second aggregation was some non-numeric mapped field + targetField, "keyword", + targetField2, "keyword" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10); } private void executeTest(GroupConfig groups, Collection aggregationBuilders, Map input, - List> expected, long expectedDocCounts) throws IOException { + Map fieldTypeMap, List> expected, long expectedDocCounts) throws IOException { DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); @@ -299,7 +415,7 @@ public class AggregationResultUtilsTests extends ESTestCase { try (XContentParser parser = createParser(builder)) { CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); List> result = AggregationResultUtils - .extractCompositeAggregationResults(agg, groups, aggregationBuilders, stats).collect(Collectors.toList()); + .extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats).collect(Collectors.toList()); assertEquals(expected, result); assertEquals(expectedDocCounts, stats.getNumDocuments()); @@ -321,4 +437,14 @@ public class AggregationResultUtilsTests extends ESTestCase { } return map; } + + static Map asStringMap(String... strings) { + assert strings.length % 2 == 0; + final Map map = new HashMap<>(); + for (int i = 0; i < strings.length; i += 2) { + String field = strings[i]; + map.put(field, strings[i + 1]); + } + return map; + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index 1621c714805..f253efad0e9 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -37,7 +37,9 @@ import org.junit.Before; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -97,7 +99,9 @@ public class PivotTests extends ESTestCase { public void testSearchFailure() throws Exception { // test a failure during the search operation, transform creation fails if // search has failures although they might just be temporary - Pivot pivot = new Pivot("existing_source_index_with_failing_shards", new MatchAllQueryBuilder(), getValidPivotConfig()); + Pivot pivot = new Pivot("existing_source_index_with_failing_shards", + new MatchAllQueryBuilder(), + getValidPivotConfig()); assertInvalidTransform(client, pivot); } @@ -106,7 +110,9 @@ public class PivotTests extends ESTestCase { for (String agg : supportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); + Pivot pivot = new Pivot("existing_source", + new MatchAllQueryBuilder(), + getValidPivotConfig(aggregationConfig)); assertValidTransform(client, pivot); } @@ -116,7 +122,9 @@ public class PivotTests extends ESTestCase { for (String agg : unsupportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); + Pivot pivot = new Pivot("existing_source", + new MatchAllQueryBuilder(), + getValidPivotConfig(aggregationConfig)); assertInvalidTransform(client, pivot); } @@ -178,6 +186,10 @@ public class PivotTests extends ESTestCase { + " }\n" + " }" + "}"); } + private Map getFieldMappings() { + return Collections.singletonMap("values", "double"); + } + private AggregationConfig parseAggregations(String json) throws IOException { final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index d007a51fe44..013731116f3 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -105,7 +105,6 @@ setup: - match: { count: 2 } - match: { transforms.0.id: "airline-transform" } - match: { transforms.1.id: "airline-transform-dos" } - - do: data_frame.get_data_frame_transform: transform_id: "airline-transform,airline-transform-dos" @@ -135,6 +134,7 @@ setup: size: 1 - match: { count: 1 } - match: { transforms.0.id: "airline-transform-dos" } + --- "Test transform with invalid page parameter": - do: @@ -143,3 +143,25 @@ setup: transform_id: "_all" from: 0 size: 10000 + +--- +"Verify put transform creates destination index with appropriate mapping": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": "airline-data", + "dest": "airline-data-by-airline", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}} + } + } + - match: { acknowledged: true } + - do: + indices.get_mapping: + index: airline-data-by-airline + - match: { airline-data-by-airline.mappings.properties.airline.type: keyword } + - match: { airline-data-by-airline.mappings.properties.avg_response.type: double } + - match: { airline-data-by-airline.mappings.properties.time.type: date }