From 58245b46173959bde30376ead92a3765614f5309 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 10 Dec 2021 10:53:23 +0800 Subject: [PATCH] Support JsonPath functions in JsonPath expressions (#11722) * Add jsonPath functions support * Add jsonPath function test for Avro * Add jsonPath function length() to Orc * Add jsonPath function length() to Parquet * Add more tests to ORC format * update doc * Fix exception during ingestion * Add IT test case * Revert "Fix exception during ingestion" This reverts commit 5a5484b9ea9d984622149c8113a566269cc10842. * update IT test case * Add 'keys()' * Commit IT test case * Fix UT --- .../common/parsers/JSONFlattenerMaker.java | 12 ++- .../common/parsers/JSONPathParserTest.java | 33 ++++++++ docs/ingestion/data-formats.md | 14 +++ .../input/avro/AvroFlattenerMakerTest.java | 31 +++++++ .../example/test_json_path_functions.orc | Bin 0 -> 473 bytes .../data/input/orc/OrcStructJsonProvider.java | 2 +- .../druid/data/input/orc/OrcReaderTest.java | 59 +++++++++++++ .../example/flattening/flat_1_flatten.json | 5 ++ .../simple/ParquetGroupJsonProvider.java | 2 +- .../parquet/FlattenSpecParquetInputTest.java | 1 + .../druid/tests/indexer/ITIndexerTest.java | 16 +++- .../indexer/json_path_index_queries.json | 49 +++++++++++ .../indexer/json_path_index_task.json | 80 ++++++++++++++++++ website/.spelling | 1 + 14 files changed, 301 insertions(+), 4 deletions(-) create mode 100644 extensions-core/orc-extensions/example/test_json_path_functions.orc create mode 100644 integration-tests/src/test/resources/indexer/json_path_index_queries.json create mode 100644 integration-tests/src/test/resources/indexer/json_path_index_task.json diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java index e799dc5b803..6df37ae1d11 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java @@ -115,7 +115,17 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false); jsonParser.parseToMap(NOT_JSON); } + + @Test + public void testJSONPathFunctions() + { + List fields = Arrays.asList( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-length", "$.met.a.length()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-min", "$.met.a.min()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-max", "$.met.a.max()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-avg", "$.met.a.avg()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-sum", "$.met.a.sum()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-stddev", "$.met.a.stddev()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-append", "$.met.a.append(10)"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "concat", "$.concat($.foo.bar1, $.foo.bar2)") + ); + + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false); + final Map jsonMap = jsonParser.parseToMap(NESTED_JSON); + + // values of met.a array are: 7,8,9 + Assert.assertEquals(3, jsonMap.get("met-array-length")); + Assert.assertEquals(7.0, jsonMap.get("met-array-min")); + Assert.assertEquals(9.0, jsonMap.get("met-array-max")); + Assert.assertEquals(8.0, jsonMap.get("met-array-avg")); + Assert.assertEquals(24.0, jsonMap.get("met-array-sum")); + + //deviation of [7,8,9] is 1/3, stddev is sqrt(1/3), approximately 0.8165 + Assert.assertEquals(0.8165, (double) jsonMap.get("met-array-stddev"), 0.00001); + + Assert.assertEquals(ImmutableList.of(7L, 8L, 9L, 10L), jsonMap.get("met-array-append")); + Assert.assertEquals("aaabbb", jsonMap.get("concat")); + } + } diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 36c5f155570..fc56018c015 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -620,6 +620,20 @@ Each entry in the `fields` list can have the following components: * If `useFieldDiscovery` is enabled, any discovered field with the same name as one already defined in the `fields` list will be skipped, rather than added twice. * [http://jsonpath.herokuapp.com/](http://jsonpath.herokuapp.com/) is useful for testing `path`-type expressions. * jackson-jq supports a subset of the full [jq](https://stedolan.github.io/jq/) syntax. Please refer to the [jackson-jq documentation](https://github.com/eiiches/jackson-jq) for details. +* [JsonPath](https://github.com/jayway/JsonPath) supports a bunch of functions, but not all of these functions are supported by Druid now. Following matrix shows the current supported JsonPath functions and corresponding data formats. Please also note the output data type of these functions. + + | Function | Description | Output type | json | orc | avro | parquet | + | :----------| :------------------------------------------------------------------ |:----------- |:-----|:----|:-----|:-----| + | min() | Provides the min value of an array of numbers | Double | ✓ | ✓ | ✓ | ✓ | + | max() | Provides the max value of an array of numbers | Double | ✓ | ✓ | ✓ | ✓ | + | avg() | Provides the average value of an array of numbers | Double | ✓ | ✓ | ✓ | ✓ | + | stddev() | Provides the standard deviation value of an array of numbers | Double | ✓ | ✓ | ✓ | ✓ | + | length() | Provides the length of an array | Integer | ✓ | ✓ | ✓ | ✓ | + | sum() | Provides the sum value of an array of numbers | Double | ✓ | ✓ | ✓ | ✓ | + | concat(X) | Provides a concatenated version of the path output with a new item | like input | ✓ | ✗ | ✗ | ✗ | + | append(X) | add an item to the json path output array | like input | ✓ | ✗ | ✗ | ✗ | + | keys() | Provides the property keys (An alternative for terminal tilde ~) | Set | ✗ | ✗ | ✗ | ✗ | + ## Parser diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java index db2fea429c7..e7d503ab971 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java @@ -262,6 +262,37 @@ public class AvroFlattenerMakerTest record.getSomeIntArray(), flattener.makeJsonPathExtractor("$.someIntArray").apply(record) ); + Assert.assertEquals( + (double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).min().getAsInt(), + + //return type of min is double + flattener.makeJsonPathExtractor("$.someIntArray.min()").apply(record) + ); + Assert.assertEquals( + (double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).max().getAsInt(), + + //return type of max is double + flattener.makeJsonPathExtractor("$.someIntArray.max()").apply(record) + ); + Assert.assertEquals( + record.getSomeIntArray().stream().mapToInt(Integer::intValue).average().getAsDouble(), + flattener.makeJsonPathExtractor("$.someIntArray.avg()").apply(record) + ); + Assert.assertEquals( + record.getSomeIntArray().size(), + flattener.makeJsonPathExtractor("$.someIntArray.length()").apply(record) + ); + Assert.assertEquals( + (double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).sum(), + + //return type of sum is double + flattener.makeJsonPathExtractor("$.someIntArray.sum()").apply(record) + ); + Assert.assertEquals( + 2.681, + (double) flattener.makeJsonPathExtractor("$.someIntArray.stddev()").apply(record), + 0.0001 + ); Assert.assertEquals( record.getSomeStringArray(), flattener.makeJsonPathExtractor("$.someStringArray").apply(record) diff --git a/extensions-core/orc-extensions/example/test_json_path_functions.orc b/extensions-core/orc-extensions/example/test_json_path_functions.orc new file mode 100644 index 0000000000000000000000000000000000000000..e3916d6a23308afc61f3c1a4573022ea9aab116b GIT binary patch literal 473 zcmeYdau#G@;9?VE;b074Fk@hNJn3mtDg&dUAfKD6o!+!5XJVWc4Kxo59$;XQ1}fy{ zVrF1qfT-36@`WLM9u8(9HV!@kAqj&31{07ekCT!am_erK+jX|3nwP&d3vUQuZ-gR@EpIhRB>n#>juW@i>K zR!r86JO1d5%ijk*NA`T_IpV|60W|Z|3`0N-JIhhsaGcUQPOA>Pw@?{4L;V_4Ym=ntRp9~%iJ2}X`CMgc~Nh6Vv8CI%i2cLVmOFU-vT HLCz8YkPMkW literal 0 HcmV?d00001 diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java index de84bfbd3ba..780de1930ca 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java @@ -185,6 +185,6 @@ public class OrcStructJsonProvider implements JsonProvider @Override public Object unwrap(final Object o) { - throw new UnsupportedOperationException("Unused"); + return o; } } diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index 9726c0e1467..49ebd54d5e3 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -112,6 +112,11 @@ public class OrcReaderTest ImmutableList.of( new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_int", "$.middle.list[1].int1"), new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_intlist", "$.middle.list[*].int1"), + new JSONPathFieldSpec( + JSONPathFieldType.PATH, + "struct_list_struct_middleListLength", + "$.middle.list.length()" + ), new JSONPathFieldSpec(JSONPathFieldType.PATH, "list_struct_string", "$.list[0].string1"), new JSONPathFieldSpec(JSONPathFieldType.PATH, "map_struct_int", "$.map.chani.int1") ) @@ -145,6 +150,8 @@ public class OrcReaderTest Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_int"))); Assert.assertEquals(ImmutableList.of("1", "2"), row.getDimension("struct_list_struct_intlist")); Assert.assertEquals("good", Iterables.getOnlyElement(row.getDimension("list_struct_string"))); + + Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_middleListLength"))); Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), row.getTimestamp()); while (iterator.hasNext()) { @@ -253,6 +260,58 @@ public class OrcReaderTest } } + /** + * schema: struct, ts:timestamp> + * data: {"dim1","[7,8,9]","2000-03-12 15:00:00"} + */ + @Test + public void testJsonPathFunctions() throws IOException + { + final OrcInputFormat inputFormat = new OrcInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "min", "$.list.min()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "max", "$.list.max()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "avg", "$.list.avg()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "len", "$.list.length()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "sum", "$.list.sum()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "stddev", "$.list.stddev()"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "append", "$.list.append(10)") + ) + ), + null, + new Configuration() + ); + final InputEntityReader reader = createReader( + new TimestampSpec("ts", "millis", null), + new DimensionsSpec(null), + inputFormat, + "example/test_json_path_functions.orc" + ); + try (CloseableIterator iterator = reader.read()) { + int actualRowCount = 0; + + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + actualRowCount++; + + Assert.assertEquals("7.0", Iterables.getOnlyElement(row.getDimension("min"))); + Assert.assertEquals("8.0", Iterables.getOnlyElement(row.getDimension("avg"))); + Assert.assertEquals("9.0", Iterables.getOnlyElement(row.getDimension("max"))); + Assert.assertEquals("24.0", Iterables.getOnlyElement(row.getDimension("sum"))); + Assert.assertEquals("3", Iterables.getOnlyElement(row.getDimension("len"))); + + //deviation of [7,8,9] is 1/3, stddev is sqrt(1/3), approximately 0.8165 + Assert.assertEquals(0.8165, Double.parseDouble(Iterables.getOnlyElement(row.getDimension("stddev"))), 0.0001); + + //append is not supported + Assert.assertEquals(Collections.emptyList(), row.getDimension("append")); + } + Assert.assertEquals(1, actualRowCount); + } + } + private InputEntityReader createReader( TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, diff --git a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json index 75caf256f2a..760dea7757b 100644 --- a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json +++ b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json @@ -46,6 +46,11 @@ "type": "path", "name": "list", "expr": "$.listDim" + }, + { + "type": "path", + "name": "listLength", + "expr": "$.listDim.length()" } ] }, diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java index 3ba15e41a98..3190c7e28ad 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java @@ -191,7 +191,7 @@ public class ParquetGroupJsonProvider implements JsonProvider @Override public Object unwrap(final Object o) { - throw new UnsupportedOperationException("Unused"); + return o; } } diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java index 6532ab222df..38a916c93e0 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java @@ -116,6 +116,7 @@ public class FlattenSpecParquetInputTest extends BaseParquetInputTest Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0)); Assert.assertEquals("listDim1v1", rows.get(0).getDimension("list").get(0)); Assert.assertEquals("listDim1v2", rows.get(0).getDimension("list").get(1)); + Assert.assertEquals("2", rows.get(0).getDimension("listLength").get(0)); Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue()); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 833a0ac5855..93c8d462165 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -362,7 +362,21 @@ public class ITIndexerTest extends AbstractITBatchIndexTest waitForAllTasksToCompleteForDataSource(datasourceName); } - } + @Test + public void testJsonFunctions() throws Exception + { + final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json"); + + submitTaskAndWait( + taskSpec, + "json_path_index_test", + false, + true, + new Pair<>(false, false) + ); + + doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json"); + } } diff --git a/integration-tests/src/test/resources/indexer/json_path_index_queries.json b/integration-tests/src/test/resources/indexer/json_path_index_queries.json new file mode 100644 index 00000000000..1940cc6ea02 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/json_path_index_queries.json @@ -0,0 +1,49 @@ +[ + { + "description": "timeseries", + "query": { + "queryType": "timeseries", + "dataSource": "json_path_index_test", + "intervals": [ + "1000/3000" + ], + "aggregations": [ + { + "type": "longSum", + "name": "len", + "fieldName": "len" + }, + { + "type": "longSum", + "name": "max", + "fieldName": "max" + }, + { + "type": "longSum", + "name": "min", + "fieldName": "min" + }, + { + "type": "longSum", + "name": "sum", + "fieldName": "sum" + } + ], + "granularity": { + "type": "all" + } + }, + "expectedResults": [ + { + "timestamp": "2013-08-31T01:02:33.000Z", + "result": { + "sum": 10, + "min": 0, + "len": 5, + "max": 4 + } + } + ] + } +] + diff --git a/integration-tests/src/test/resources/indexer/json_path_index_task.json b/integration-tests/src/test/resources/indexer/json_path_index_task.json new file mode 100644 index 00000000000..2fd6990b116 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/json_path_index_task.json @@ -0,0 +1,80 @@ +{ + "type": "index", + "dataSource": "json_path_index_test", + "spec": { + "dataSchema": { + "dataSource": "json_path_index_test", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "long", + "name": "len" + }, + { + "type": "long", + "name": "min" + }, + { + "type": "long", + "name": "max" + }, + { + "type": "long", + "name": "sum" + } + ], + "dimensionExclusions": [ + "__time", + "timestamp" + ] + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + } + } + }, + "ioConfig": { + "type": "index", + "inputSource": { + "type": "inline", + "data": "{\"timestamp\": \"2013-08-31T01:02:33Z\", \"values\": [0,1,2,3,4] }" + }, + "inputFormat": { + "type": "json", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "len", + "expr": "$.values.length()" + }, + { + "type": "path", + "name": "min", + "expr": "$.values.min()" + }, + { + "type": "path", + "name": "max", + "expr": "$.values.max()" + }, + { + "type": "path", + "name": "sum", + "expr": "$.values.sum()" + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/website/.spelling b/website/.spelling index 58e8f819100..e093cbc9cd0 100644 --- a/website/.spelling +++ b/website/.spelling @@ -38,6 +38,7 @@ BCP Base64 Base64-encoded ByteBuffer +concat CIDR CORS CNF