[ML] adds support for non-numeric mapped types (#40220) (#40380)

* [ML] adds support for non-numeric mapped types and mapping overrides

* correcting hlrc compilation issues after merge

* removing mapping_override option

* clearing up unnecessary changes
This commit is contained in:
Benjamin Trent 2019-03-23 14:04:14 -05:00 committed by GitHub
parent 88f510ffc2
commit 2dd879abac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 370 additions and 56 deletions

View File

@ -45,7 +45,7 @@ public class DataFrameMessages {
"Failed to create composite aggregation from pivot function"; "Failed to create composite aggregation from pivot function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID = public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
"Data frame transform configuration [{0}] has invalid elements"; "Data frame transform configuration [{0}] has invalid elements";
public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY = public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
"Failed to parse query for data frame transform"; "Failed to parse query for data frame transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =

View File

@ -18,6 +18,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class DataFramePivotRestIT extends DataFrameRestTestCase { public class DataFramePivotRestIT extends DataFrameRestTestCase {
@ -267,6 +268,52 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
}); });
} }
public void testPivotWithMaxOnDateField() throws Exception {
String transformId = "simpleDateHistogramPivotWithMaxTime";
String dataFrameIndex = "pivot_reviews_via_date_histogram_with_max_time";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId,
BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
String config = "{"
+ " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
+ " \"dest\": \"" + dataFrameIndex + "\",";
config +=" \"pivot\": { \n" +
" \"group_by\": {\n" +
" \"by_day\": {\"date_histogram\": {\n" +
" \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" +
" }}\n" +
" },\n" +
" \n" +
" \"aggs\" :{\n" +
" \"avg_rating\": {\n" +
" \"avg\": {\"field\": \"stars\"}\n" +
" },\n" +
" \"timestamp\": {\n" +
" \"max\": {\"field\": \"timestamp\"}\n" +
" }\n" +
" }}"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
// we expect 21 documents as there shall be 21 days worth of docs
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15");
String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0);
// Do `containsString` as actual ending timestamp is indeterminate due to how data is generated
assertThat(actual, containsString("2017-01-15T20:"));
}
private void assertOnePivotValue(String query, double expected) throws IOException { private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query); Map<String, Object> searchResult = getAsMap(query);

View File

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.util.List; import java.util.List;
@ -57,9 +58,11 @@ public class TransportPreviewDataFrameTransformAction extends
return; return;
} }
Pivot pivot = new Pivot(request.getConfig().getSource(), final DataFrameTransformConfig config = request.getConfig();
request.getConfig().getQueryConfig().getQuery(),
request.getConfig().getPivotConfig()); Pivot pivot = new Pivot(config.getSource(),
config.getQueryConfig().getQuery(),
config.getPivotConfig());
getPreview(pivot, ActionListener.wrap( getPreview(pivot, ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
@ -68,18 +71,24 @@ public class TransportPreviewDataFrameTransformAction extends
} }
private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) { private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), pivot.deduceMappings(client, ActionListener.wrap(
ClientHelper.DATA_FRAME_ORIGIN, deducedMappings -> {
client, ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
SearchAction.INSTANCE, ClientHelper.DATA_FRAME_ORIGIN,
pivot.buildSearchRequest(null), client,
ActionListener.wrap( SearchAction.INSTANCE,
r -> { pivot.buildSearchRequest(null),
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); ActionListener.wrap(
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); r -> {
listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList())); final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
}, DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
listener::onFailure listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList()));
)); },
listener::onFailure
));
},
listener::onFailure
));
} }
} }

View File

@ -44,6 +44,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
protected abstract DataFrameTransformConfig getConfig(); protected abstract DataFrameTransformConfig getConfig();
protected abstract Map<String, String> getFieldMappings();
@Override @Override
protected void onStartJob(long now) { protected void onStartJob(long now) {
QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery(); QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery();
@ -70,7 +72,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
final DataFrameTransformConfig transformConfig = getConfig(); final DataFrameTransformConfig transformConfig = getConfig();
String indexName = transformConfig.getDestination(); String indexName = transformConfig.getDestination();
return pivot.extractResults(agg, getStats()).map(document -> { return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> {
XContentBuilder builder; XContentBuilder builder;
try { try {
builder = jsonBuilder(); builder = jsonBuilder();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -230,6 +231,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService; private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId; private final String transformId;
private Map<String, String> fieldMappings = null;
private DataFrameTransformConfig transformConfig = null; private DataFrameTransformConfig transformConfig = null;
@ -248,6 +250,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return transformConfig; return transformConfig;
} }
@Override
protected Map<String, String> getFieldMappings() {
return fieldMappings;
}
@Override @Override
protected String getJobId() { protected String getJobId() {
return transformId; return transformId;
@ -279,6 +286,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
} }
if (fieldMappings == null) {
CountDownLatch latch = new CountDownLatch(1);
SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>(
ActionListener.wrap(
destinationMappings -> fieldMappings = destinationMappings,
e -> {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
transformConfig.getDestination()),
e);
}), latch));
try {
latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
transformConfig.getDestination()),
e);
}
}
return super.maybeTriggerAsyncJob(now); return super.maybeTriggerAsyncJob(now);
} }

View File

@ -21,6 +21,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
final class AggregationResultUtils { final class AggregationResultUtils {
private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class); private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
@ -30,30 +32,38 @@ final class AggregationResultUtils {
* @param agg The aggregation result * @param agg The aggregation result
* @param groups The original groupings used for querying * @param groups The original groupings used for querying
* @param aggregationBuilders the aggregation used for querying * @param aggregationBuilders the aggregation used for querying
* @param dataFrameIndexerTransformStats stats collector * @param fieldTypeMap A Map containing "field-name": "type" entries to determine the appropriate type for the aggregation results.
* @param stats stats collector
* @return a map containing the results of the aggregation in a consumable way * @return a map containing the results of the aggregation in a consumable way
*/ */
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg, public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
GroupConfig groups, GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders, Collection<AggregationBuilder> aggregationBuilders,
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { Map<String, String> fieldTypeMap,
DataFrameIndexerTransformStats stats) {
return agg.getBuckets().stream().map(bucket -> { return agg.getBuckets().stream().map(bucket -> {
dataFrameIndexerTransformStats.incrementNumDocuments(bucket.getDocCount()); stats.incrementNumDocuments(bucket.getDocCount());
Map<String, Object> document = new HashMap<>(); Map<String, Object> document = new HashMap<>();
groups.getGroups().keySet().forEach(destinationFieldName -> { groups.getGroups().keySet().forEach(destinationFieldName ->
document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)); document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)));
});
for (AggregationBuilder aggregationBuilder : aggregationBuilders) { for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
String aggName = aggregationBuilder.getName(); String aggName = aggregationBuilder.getName();
final String fieldType = fieldTypeMap.get(aggName);
// TODO: support other aggregation types // TODO: support other aggregation types
Aggregation aggResult = bucket.getAggregations().get(aggName); Aggregation aggResult = bucket.getAggregations().get(aggName);
if (aggResult instanceof NumericMetricsAggregation.SingleValue) { if (aggResult instanceof NumericMetricsAggregation.SingleValue) {
NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult; NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult;
document.put(aggName, aggResultSingleValue.value()); // If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose
// formatted outputs.
if (isNumericType(fieldType)) {
document.put(aggName, aggResultSingleValue.value());
} else {
document.put(aggName, aggResultSingleValue.getValueAsString());
}
} else { } else {
// Execution should never reach this point! // Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible // Creating transforms with unsupported aggregations shall not be possible

View File

@ -77,12 +77,17 @@ public class Pivot {
} }
public Stream<Map<String, Object>> extractResults(CompositeAggregation agg, public Stream<Map<String, Object>> extractResults(CompositeAggregation agg,
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { Map<String, String> fieldTypeMap,
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
GroupConfig groups = config.getGroupConfig(); GroupConfig groups = config.getGroupConfig();
Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, dataFrameIndexerTransformStats); return AggregationResultUtils.extractCompositeAggregationResults(agg,
groups,
aggregationBuilders,
fieldTypeMap,
dataFrameIndexerTransformStats);
} }
private void runTestQuery(Client client, final ActionListener<Boolean> listener) { private void runTestQuery(Client client, final ActionListener<Boolean> listener) {
@ -99,7 +104,7 @@ public class Pivot {
} }
listener.onResponse(true); listener.onResponse(true);
}, e->{ }, e->{
listener.onFailure(new RuntimeException("Failed to test query",e)); listener.onFailure(new RuntimeException("Failed to test query", e));
})); }));
} }

View File

@ -13,20 +13,51 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SchemaUtil { public final class SchemaUtil {
private static final Logger logger = LogManager.getLogger(SchemaUtil.class); private static final Logger logger = LogManager.getLogger(SchemaUtil.class);
// Full collection of numeric field type strings
private static final Set<String> NUMERIC_FIELD_MAPPER_TYPES;
static {
Set<String> types = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toSet());
types.add("scaled_float"); // have to add manually since scaled_float is in a module
NUMERIC_FIELD_MAPPER_TYPES = types;
}
private SchemaUtil() { private SchemaUtil() {
} }
public static void deduceMappings(final Client client, final PivotConfig config, final String source, public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
}
/**
* Deduce the mappings for the destination index given the source index
*
* The Listener is alerted with a {@code Map<String, String>} that is a "field-name":"type" mapping
*
* @param client Client from which to make requests against the cluster
* @param config The PivotConfig for which to deduce destination mapping
* @param source Source index that contains the data to pivot
* @param listener Listener to alert on success or failure.
*/
public static void deduceMappings(final Client client,
final PivotConfig config,
final String source,
final ActionListener<Map<String, String>> listener) { final ActionListener<Map<String, String>> listener) {
// collects the fieldnames used as source for aggregations // collects the fieldnames used as source for aggregations
Map<String, String> aggregationSourceFieldNames = new HashMap<>(); Map<String, String> aggregationSourceFieldNames = new HashMap<>();
@ -56,18 +87,42 @@ public class SchemaUtil {
allFieldNames.putAll(fieldNamesForGrouping); allFieldNames.putAll(fieldNamesForGrouping);
getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
ActionListener.wrap(sourceMappings -> { ActionListener.wrap(
Map<String, String> targetMapping = resolveMappings(aggregationSourceFieldNames, aggregationTypes, sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
fieldNamesForGrouping, sourceMappings); aggregationTypes,
fieldNamesForGrouping,
sourceMappings)),
listener::onFailure));
}
listener.onResponse(targetMapping); /**
}, e -> { * Gathers the field mappings for the "destination" index. Listener will receive an error, or a {@code Map<String, String>} of
listener.onFailure(e); * "field-name":"type".
})); *
* @param client Client used to execute the request
* @param index The index, or index pattern, from which to gather all the field mappings
* @param listener The listener to be alerted on success or failure.
*/
public static void getDestinationFieldMappings(final Client client,
final String index,
final ActionListener<Map<String, String>> listener) {
GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
fieldMappingRequest.indices(index);
fieldMappingRequest.fields("*");
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
GetFieldMappingsAction.INSTANCE,
fieldMappingRequest,
ActionListener.wrap(
r -> listener.onResponse(extractFieldMappings(r.mappings())),
listener::onFailure
));
} }
private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames, private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes, Map<String, String> fieldNamesForGrouping, Map<String, String> sourceMappings) { Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping,
Map<String, String> sourceMappings) {
Map<String, String> targetMapping = new HashMap<>(); Map<String, String> targetMapping = new HashMap<>();
aggregationTypes.forEach((targetFieldName, aggregationName) -> { aggregationTypes.forEach((targetFieldName, aggregationName) -> {
@ -107,14 +162,12 @@ public class SchemaUtil {
fieldMappingRequest.indices(index); fieldMappingRequest.indices(index);
fieldMappingRequest.fields(fields); fieldMappingRequest.fields(fields);
client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(response -> { client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(
listener.onResponse(extractSourceFieldMappings(response.mappings())); response -> listener.onResponse(extractFieldMappings(response.mappings())),
}, e -> { listener::onFailure));
listener.onFailure(e);
}));
} }
private static Map<String, String> extractSourceFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) { private static Map<String, String> extractFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
Map<String, String> extractedTypes = new HashMap<>(); Map<String, String> extractedTypes = new HashMap<>();
mappings.forEach((indexName, docTypeToMapping) -> { mappings.forEach((indexName, docTypeToMapping) -> {

View File

@ -140,8 +140,11 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggName, 12.55 aggName, 12.55
) )
); );
Map<String, String> fieldTypeMap = asStringMap(
executeTest(groupBy, aggregationBuilders, input, expected, 20); targetField, "keyword",
aggName, "double"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20);
} }
public void testExtractCompositeAggregationResultsMultiSources() throws IOException { public void testExtractCompositeAggregationResultsMultiSources() throws IOException {
@ -222,7 +225,12 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggName, 12.55 aggName, 12.55
) )
); );
executeTest(groupBy, aggregationBuilders, input, expected, 10); Map<String, String> fieldTypeMap = asStringMap(
aggName, "double",
targetField, "keyword",
targetField2, "keyword"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
} }
public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException {
@ -287,11 +295,119 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggName2, -2.44 aggName2, -2.44
) )
); );
executeTest(groupBy, aggregationBuilders, input, expected, 200); Map<String, String> fieldTypeMap = asStringMap(
targetField, "keyword",
aggName, "double",
aggName2, "double"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 200);
}
public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException {
String targetField = randomAlphaOfLengthBetween(5, 10);
String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2";
GroupConfig groupBy = parseGroupConfig("{"
+ "\"" + targetField + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } },"
+ "\"" + targetField2 + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } }"
+ "}");
String aggName = randomAlphaOfLengthBetween(5, 10);
String aggTypedName = "avg#" + aggName;
String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2";
String aggTypedName2 = "max#" + aggName2;
Collection<AggregationBuilder> aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2));
Map<String, Object> input = asMap(
"buckets",
asList(
asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID1_2"
),
aggTypedName, asMap(
"value", 42.33),
aggTypedName2, asMap(
"value", 9.9),
DOC_COUNT, 1),
asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID2_2"
),
aggTypedName, asMap(
"value", 8.4),
aggTypedName2, asMap(
"value", 222.33),
DOC_COUNT, 2),
asMap(
KEY, asMap(
targetField, "ID2",
targetField2, "ID1_2"
),
aggTypedName, asMap(
"value", 28.99),
aggTypedName2, asMap(
"value", -2.44),
DOC_COUNT, 3),
asMap(
KEY, asMap(
targetField, "ID3",
targetField2, "ID2_2"
),
aggTypedName, asMap(
"value", 12.55),
aggTypedName2, asMap(
"value", -100.44),
DOC_COUNT, 4)
));
List<Map<String, Object>> expected = asList(
asMap(
targetField, "ID1",
targetField2, "ID1_2",
aggName, 42.33,
aggName2, "9.9"
),
asMap(
targetField, "ID1",
targetField2, "ID2_2",
aggName, 8.4,
aggName2, "222.33"
),
asMap(
targetField, "ID2",
targetField2, "ID1_2",
aggName, 28.99,
aggName2, "-2.44"
),
asMap(
targetField, "ID3",
targetField2, "ID2_2",
aggName, 12.55,
aggName2, "-100.44"
)
);
Map<String, String> fieldTypeMap = asStringMap(
aggName, "double",
aggName2, "keyword", // If the second aggregation was some non-numeric mapped field
targetField, "keyword",
targetField2, "keyword"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
} }
private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input, private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input,
List<Map<String, Object>> expected, long expectedDocCounts) throws IOException { Map<String, String> fieldTypeMap, List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
builder.map(input); builder.map(input);
@ -299,7 +415,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
try (XContentParser parser = createParser(builder)) { try (XContentParser parser = createParser(builder)) {
CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature");
List<Map<String, Object>> result = AggregationResultUtils List<Map<String, Object>> result = AggregationResultUtils
.extractCompositeAggregationResults(agg, groups, aggregationBuilders, stats).collect(Collectors.toList()); .extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats).collect(Collectors.toList());
assertEquals(expected, result); assertEquals(expected, result);
assertEquals(expectedDocCounts, stats.getNumDocuments()); assertEquals(expectedDocCounts, stats.getNumDocuments());
@ -321,4 +437,14 @@ public class AggregationResultUtilsTests extends ESTestCase {
} }
return map; return map;
} }
static Map<String, String> asStringMap(String... strings) {
assert strings.length % 2 == 0;
final Map<String, String> map = new HashMap<>();
for (int i = 0; i < strings.length; i += 2) {
String field = strings[i];
map.put(field, strings[i + 1]);
}
return map;
}
} }

View File

@ -37,7 +37,9 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -97,7 +99,9 @@ public class PivotTests extends ESTestCase {
public void testSearchFailure() throws Exception { public void testSearchFailure() throws Exception {
// test a failure during the search operation, transform creation fails if // test a failure during the search operation, transform creation fails if
// search has failures although they might just be temporary // search has failures although they might just be temporary
Pivot pivot = new Pivot("existing_source_index_with_failing_shards", new MatchAllQueryBuilder(), getValidPivotConfig()); Pivot pivot = new Pivot("existing_source_index_with_failing_shards",
new MatchAllQueryBuilder(),
getValidPivotConfig());
assertInvalidTransform(client, pivot); assertInvalidTransform(client, pivot);
} }
@ -106,7 +110,9 @@ public class PivotTests extends ESTestCase {
for (String agg : supportedAggregations) { for (String agg : supportedAggregations) {
AggregationConfig aggregationConfig = getAggregationConfig(agg); AggregationConfig aggregationConfig = getAggregationConfig(agg);
Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); Pivot pivot = new Pivot("existing_source",
new MatchAllQueryBuilder(),
getValidPivotConfig(aggregationConfig));
assertValidTransform(client, pivot); assertValidTransform(client, pivot);
} }
@ -116,7 +122,9 @@ public class PivotTests extends ESTestCase {
for (String agg : unsupportedAggregations) { for (String agg : unsupportedAggregations) {
AggregationConfig aggregationConfig = getAggregationConfig(agg); AggregationConfig aggregationConfig = getAggregationConfig(agg);
Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); Pivot pivot = new Pivot("existing_source",
new MatchAllQueryBuilder(),
getValidPivotConfig(aggregationConfig));
assertInvalidTransform(client, pivot); assertInvalidTransform(client, pivot);
} }
@ -178,6 +186,10 @@ public class PivotTests extends ESTestCase {
+ " }\n" + " }" + "}"); + " }\n" + " }" + "}");
} }
private Map<String, String> getFieldMappings() {
return Collections.singletonMap("values", "double");
}
private AggregationConfig parseAggregations(String json) throws IOException { private AggregationConfig parseAggregations(String json) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(), final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);

View File

@ -105,7 +105,6 @@ setup:
- match: { count: 2 } - match: { count: 2 }
- match: { transforms.0.id: "airline-transform" } - match: { transforms.0.id: "airline-transform" }
- match: { transforms.1.id: "airline-transform-dos" } - match: { transforms.1.id: "airline-transform-dos" }
- do: - do:
data_frame.get_data_frame_transform: data_frame.get_data_frame_transform:
transform_id: "airline-transform,airline-transform-dos" transform_id: "airline-transform,airline-transform-dos"
@ -135,6 +134,7 @@ setup:
size: 1 size: 1
- match: { count: 1 } - match: { count: 1 }
- match: { transforms.0.id: "airline-transform-dos" } - match: { transforms.0.id: "airline-transform-dos" }
--- ---
"Test transform with invalid page parameter": "Test transform with invalid page parameter":
- do: - do:
@ -143,3 +143,25 @@ setup:
transform_id: "_all" transform_id: "_all"
from: 0 from: 0
size: 10000 size: 10000
---
"Verify put transform creates destination index with appropriate mapping":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": "airline-data",
"dest": "airline-data-by-airline",
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}}
}
}
- match: { acknowledged: true }
- do:
indices.get_mapping:
index: airline-data-by-airline
- match: { airline-data-by-airline.mappings.properties.airline.type: keyword }
- match: { airline-data-by-airline.mappings.properties.avg_response.type: double }
- match: { airline-data-by-airline.mappings.properties.time.type: date }