[ML] Add bucket_script agg support to data frames (#41594) (#41639)

This commit is contained in:
Benjamin Trent 2019-04-29 10:14:17 -05:00 committed by GitHub
parent a01f451ef7
commit 92a820bc1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 248 additions and 30 deletions

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import java.io.IOException;
@ -66,6 +67,10 @@ public class AggregationConfig implements Writeable, ToXContentObject {
return aggregations.getAggregatorFactories();
}
public Collection<PipelineAggregationBuilder> getPipelineAggregatorFactories() {
return aggregations.getPipelineAggregatorFactories();
}
public static AggregationConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
NamedXContentRegistry registry = parser.getXContentRegistry();
Map<String, Object> source = parser.mapOrdered();

View File

@ -368,6 +368,57 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
assertEquals(711.0, actual.doubleValue(), 0.000001);
}
public void testPivotWithBucketScriptAgg() throws Exception {
String transformId = "bucketScriptPivot";
String dataFrameIndex = "bucket_script_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId,
BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"avg_rating_again\": {"
+ " \"bucket_script\": {"
+ " \"buckets_path\": {\"param_1\": \"avg_rating\"},"
+ " \"script\": \"return params.param_1\""
+ " } }"
+ " } }"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));
// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
// get and check some users
Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating_again", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
@ -21,7 +22,9 @@ import org.elasticsearch.xpack.dataframe.transforms.IDGenerator;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
@ -42,6 +45,7 @@ final class AggregationResultUtils {
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggs,
Map<String, String> fieldTypeMap,
DataFrameIndexerTransformStats stats) {
return agg.getBuckets().stream().map(bucket -> {
@ -58,8 +62,10 @@ final class AggregationResultUtils {
document.put(destinationFieldName, value);
});
for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
String aggName = aggregationBuilder.getName();
List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());
aggNames.addAll(pipelineAggs.stream().map(PipelineAggregationBuilder::getName).collect(Collectors.toList()));
for (String aggName: aggNames) {
final String fieldType = fieldTypeMap.get(aggName);
// TODO: support other aggregation types
@ -67,9 +73,10 @@ final class AggregationResultUtils {
if (aggResult instanceof NumericMetricsAggregation.SingleValue) {
NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult;
// If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose
// formatted outputs.
if (isNumericType(fieldType)) {
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) ||
(aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) {
document.put(aggName, aggResultSingleValue.value());
} else {
document.put(aggName, aggResultSingleValue.getValueAsString());

View File

@ -35,7 +35,8 @@ public final class Aggregations {
MAX("max", SOURCE),
MIN("min", SOURCE),
SUM("sum", SOURCE),
SCRIPTED_METRIC("scripted_metric", DYNAMIC);
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC);
private final String aggregationType;
private final String targetMapping;

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -102,10 +103,12 @@ public class Pivot {
GroupConfig groups = config.getGroupConfig();
Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories();
return AggregationResultUtils.extractCompositeAggregationResults(agg,
groups,
aggregationBuilders,
pipelineAggregationBuilders,
fieldTypeMap,
dataFrameIndexerTransformStats);
}
@ -148,6 +151,7 @@ public class Pivot {
LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
compositeAggregation = CompositeAggregationBuilder.parse(COMPOSITE_AGGREGATION_NAME, parser);
config.getAggregationConfig().getAggregatorFactories().forEach(agg -> compositeAggregation.subAggregation(agg));
config.getAggregationConfig().getPipelineAggregatorFactories().forEach(agg -> compositeAggregation.subAggregation(agg));
} catch (IOException e) {
throw new RuntimeException(DataFrameMessages.DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION, e);
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRespon
import org.elasticsearch.client.Client;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
@ -85,6 +86,12 @@ public final class SchemaUtil {
}
}
// For pipeline aggs, since they are referencing other aggregations in the payload, they do not have any
// sourcefieldnames to put into the payload. Though, certain ones, i.e. avg_bucket, do have determinant value types
for (PipelineAggregationBuilder agg : config.getAggregationConfig().getPipelineAggregatorFactories()) {
aggregationTypes.put(agg.getName(), agg.getType());
}
Map<String, String> allFieldNames = new HashMap<>();
allFieldNames.putAll(aggregationSourceFieldNames);
allFieldNames.putAll(fieldNamesForGrouping);

View File

@ -14,9 +14,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
@ -43,6 +46,8 @@ import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBu
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
@ -78,6 +83,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
map.put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c));
map.put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c));
map.put(AvgAggregationBuilder.NAME, (p, c) -> ParsedAvg.fromXContent(p, (String) c));
map.put(BucketScriptPipelineAggregationBuilder.NAME, (p, c) -> ParsedSimpleValue.fromXContent(p, (String) c));
map.put(ScriptedMetricAggregationBuilder.NAME, (p, c) -> ParsedScriptedMetric.fromXContent(p, (String) c));
map.put(ValueCountAggregationBuilder.NAME, (p, c) -> ParsedValueCount.fromXContent(p, (String) c));
map.put(StatsAggregationBuilder.NAME, (p, c) -> ParsedStats.fromXContent(p, (String) c));
@ -150,7 +156,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField, "keyword",
aggName, "double"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20);
executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 20);
}
public void testExtractCompositeAggregationResultsMultipleGroups() throws IOException {
@ -236,7 +242,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField, "keyword",
targetField2, "keyword"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10);
}
public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException {
@ -306,7 +312,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggName, "double",
aggName2, "double"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 200);
executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 200);
}
public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException {
@ -343,7 +349,8 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggTypedName, asMap(
"value", 42.33),
aggTypedName2, asMap(
"value", 9.9),
"value", 9.9,
"value_as_string", "9.9F"),
DOC_COUNT, 1),
asMap(
KEY, asMap(
@ -353,7 +360,8 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggTypedName, asMap(
"value", 8.4),
aggTypedName2, asMap(
"value", 222.33),
"value", 222.33,
"value_as_string", "222.33F"),
DOC_COUNT, 2),
asMap(
KEY, asMap(
@ -363,7 +371,8 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggTypedName, asMap(
"value", 28.99),
aggTypedName2, asMap(
"value", -2.44),
"value", -2.44,
"value_as_string", "-2.44F"),
DOC_COUNT, 3),
asMap(
KEY, asMap(
@ -373,7 +382,8 @@ public class AggregationResultUtilsTests extends ESTestCase {
aggTypedName, asMap(
"value", 12.55),
aggTypedName2, asMap(
"value", -100.44),
"value", -100.44,
"value_as_string", "-100.44F"),
DOC_COUNT, 4)
));
@ -382,25 +392,25 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField, "ID1",
targetField2, "ID1_2",
aggName, 42.33,
aggName2, "9.9"
aggName2, "9.9F"
),
asMap(
targetField, "ID1",
targetField2, "ID2_2",
aggName, 8.4,
aggName2, "222.33"
aggName2, "222.33F"
),
asMap(
targetField, "ID2",
targetField2, "ID1_2",
aggName, 28.99,
aggName2, "-2.44"
aggName2, "-2.44F"
),
asMap(
targetField, "ID3",
targetField2, "ID2_2",
aggName, 12.55,
aggName2, "-100.44"
aggName2, "-100.44F"
)
);
Map<String, String> fieldTypeMap = asStringMap(
@ -409,7 +419,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField, "keyword",
targetField2, "keyword"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10);
}
public void testExtractCompositeAggregationResultsWithDynamicType() throws IOException {
@ -495,7 +505,112 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField, "keyword",
targetField2, "keyword"
);
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10);
}
public void testExtractCompositeAggregationResultsWithPipelineAggregation() throws IOException {
String targetField = randomAlphaOfLengthBetween(5, 10);
String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2";
GroupConfig groupBy = parseGroupConfig("{"
+ "\"" + targetField + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } },"
+ "\"" + targetField2 + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } }"
+ "}");
String aggName = randomAlphaOfLengthBetween(5, 10);
String aggTypedName = "avg#" + aggName;
String pipelineAggName = randomAlphaOfLengthBetween(5, 10) + "_2";
String pipelineAggTypedName = "bucket_script#" + pipelineAggName;
Collection<AggregationBuilder> aggregationBuilders = asList(AggregationBuilders.scriptedMetric(aggName));
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders =
asList(PipelineAggregatorBuilders.bucketScript(pipelineAggName,
Collections.singletonMap("param_1", aggName),
new Script("return params.param_1")));
Map<String, Object> input = asMap(
"buckets",
asList(
asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID1_2"
),
aggTypedName, asMap(
"value", 123.0),
pipelineAggTypedName, asMap(
"value", 123.0),
DOC_COUNT, 1),
asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID2_2"
),
aggTypedName, asMap(
"value", 1.0),
pipelineAggTypedName, asMap(
"value", 1.0),
DOC_COUNT, 2),
asMap(
KEY, asMap(
targetField, "ID2",
targetField2, "ID1_2"
),
aggTypedName, asMap(
"value", 2.13),
pipelineAggTypedName, asMap(
"value", 2.13),
DOC_COUNT, 3),
asMap(
KEY, asMap(
targetField, "ID3",
targetField2, "ID2_2"
),
aggTypedName, asMap(
"value", 12.0),
pipelineAggTypedName, asMap(
"value", 12.0),
DOC_COUNT, 4)
));
List<Map<String, Object>> expected = asList(
asMap(
targetField, "ID1",
targetField2, "ID1_2",
aggName, 123.0,
pipelineAggName, 123.0
),
asMap(
targetField, "ID1",
targetField2, "ID2_2",
aggName, 1.0,
pipelineAggName, 1.0
),
asMap(
targetField, "ID2",
targetField2, "ID1_2",
aggName, 2.13,
pipelineAggName, 2.13
),
asMap(
targetField, "ID3",
targetField2, "ID2_2",
aggName, 12.0,
pipelineAggName, 12.0
)
);
Map<String, String> fieldTypeMap = asStringMap(
targetField, "keyword",
targetField2, "keyword",
aggName, "double"
);
executeTest(groupBy, aggregationBuilders, pipelineAggregationBuilders, input, fieldTypeMap, expected, 10);
}
public void testExtractCompositeAggregationResultsDocIDs() throws IOException {
@ -598,8 +713,10 @@ public class AggregationResultUtilsTests extends ESTestCase {
targetField2, "keyword"
);
List<Map<String, Object>> resultFirstRun = runExtraction(groupBy, aggregationBuilders, inputFirstRun, fieldTypeMap, stats);
List<Map<String, Object>> resultSecondRun = runExtraction(groupBy, aggregationBuilders, inputSecondRun, fieldTypeMap, stats);
List<Map<String, Object>> resultFirstRun =
runExtraction(groupBy, aggregationBuilders, Collections.emptyList(), inputFirstRun, fieldTypeMap, stats);
List<Map<String, Object>> resultSecondRun =
runExtraction(groupBy, aggregationBuilders, Collections.emptyList(), inputSecondRun, fieldTypeMap, stats);
assertNotEquals(resultFirstRun, resultSecondRun);
@ -619,15 +736,23 @@ public class AggregationResultUtilsTests extends ESTestCase {
assertEquals(documentIdsFirstRun, documentIdsSecondRun);
}
private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input,
Map<String, String> fieldTypeMap, List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
private void executeTest(GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders,
Map<String, Object> input,
Map<String, String> fieldTypeMap,
List<Map<String, Object>> expected,
long expectedDocCounts) throws IOException {
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
builder.map(input);
List<Map<String, Object>> result = runExtraction(groups, aggregationBuilders, input, fieldTypeMap, stats);
List<Map<String, Object>> result = runExtraction(groups,
aggregationBuilders,
pipelineAggregationBuilders,
input,
fieldTypeMap,
stats);
// remove the document ids and test uniqueness
Set<String> documentIds = new HashSet<>();
@ -641,16 +766,24 @@ public class AggregationResultUtilsTests extends ESTestCase {
}
private List<Map<String, Object>> runExtraction(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders,
Map<String, Object> input, Map<String, String> fieldTypeMap, DataFrameIndexerTransformStats stats) throws IOException {
private List<Map<String, Object>> runExtraction(GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders,
Map<String, Object> input,
Map<String, String> fieldTypeMap,
DataFrameIndexerTransformStats stats) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
builder.map(input);
try (XContentParser parser = createParser(builder)) {
CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature");
return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats)
.collect(Collectors.toList());
return AggregationResultUtils.extractCompositeAggregationResults(agg,
groups,
aggregationBuilders,
pipelineAggregationBuilders,
fieldTypeMap,
stats).collect(Collectors.toList());
}
}

View File

@ -41,5 +41,9 @@ public class AggregationsTests extends ESTestCase {
// scripted_metric
assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", "int"));
// scripted_metric
assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", "int"));
}
}

View File

@ -184,6 +184,12 @@ public class PivotTests extends ESTestCase {
" }\n" +
"}}");
}
if (agg.equals(AggregationType.BUCKET_SCRIPT.getName())) {
return parseAggregations("{\"pivot_bucket_script\":{" +
"\"bucket_script\":{" +
"\"buckets_path\":{\"param_1\":\"other_bucket\"}," +
"\"script\":\"return params.param_1\"}}}");
}
return parseAggregations("{\n" + " \"pivot_" + agg + "\": {\n" + " \"" + agg + "\": {\n" + " \"field\": \"values\"\n"
+ " }\n" + " }" + "}");
}