Support JsonPath functions in JsonPath expressions (#11722)

* Add jsonPath functions support

* Add jsonPath function test for Avro

* Add jsonPath function length() to Orc

* Add jsonPath function length() to Parquet

* Add more tests to ORC format

* update doc

* Fix exception during ingestion

* Add IT test case

* Revert "Fix exception during ingestion"

This reverts commit 5a5484b9ea.

* update IT test case

* Add 'keys()'

* Commit IT test case

* Fix UT
This commit is contained in:
Frank Chen 2021-12-10 10:53:23 +08:00 committed by GitHub
parent a8b916576d
commit 58245b4617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 301 additions and 4 deletions

View File

@ -115,7 +115,17 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
}
@Nullable
private Object valueConversionFunction(JsonNode val)
private Object valueConversionFunction(Object val)
{
if (val instanceof JsonNode) {
return convertJsonNode((JsonNode) val);
} else {
return val;
}
}
@Nullable
private Object convertJsonNode(JsonNode val)
{
if (val == null || val.isNull()) {
return null;

View File

@ -27,6 +27,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -239,4 +240,36 @@ public class JSONPathParserTest
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
jsonParser.parseToMap(NOT_JSON);
}
@Test
public void testJSONPathFunctions()
{
List<JSONPathFieldSpec> fields = Arrays.asList(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-length", "$.met.a.length()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-min", "$.met.a.min()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-max", "$.met.a.max()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-avg", "$.met.a.avg()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-sum", "$.met.a.sum()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-stddev", "$.met.a.stddev()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "met-array-append", "$.met.a.append(10)"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "concat", "$.concat($.foo.bar1, $.foo.bar2)")
);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(NESTED_JSON);
// values of met.a array are: 7,8,9
Assert.assertEquals(3, jsonMap.get("met-array-length"));
Assert.assertEquals(7.0, jsonMap.get("met-array-min"));
Assert.assertEquals(9.0, jsonMap.get("met-array-max"));
Assert.assertEquals(8.0, jsonMap.get("met-array-avg"));
Assert.assertEquals(24.0, jsonMap.get("met-array-sum"));
//deviation of [7,8,9] is 1/3, stddev is sqrt(1/3), approximately 0.8165
Assert.assertEquals(0.8165, (double) jsonMap.get("met-array-stddev"), 0.00001);
Assert.assertEquals(ImmutableList.of(7L, 8L, 9L, 10L), jsonMap.get("met-array-append"));
Assert.assertEquals("aaabbb", jsonMap.get("concat"));
}
}

View File

@ -620,6 +620,20 @@ Each entry in the `fields` list can have the following components:
* If `useFieldDiscovery` is enabled, any discovered field with the same name as one already defined in the `fields` list will be skipped, rather than added twice.
* [http://jsonpath.herokuapp.com/](http://jsonpath.herokuapp.com/) is useful for testing `path`-type expressions.
* jackson-jq supports a subset of the full [jq](https://stedolan.github.io/jq/) syntax. Please refer to the [jackson-jq documentation](https://github.com/eiiches/jackson-jq) for details.
* [JsonPath](https://github.com/jayway/JsonPath) supports a bunch of functions, but not all of these functions are supported by Druid now. Following matrix shows the current supported JsonPath functions and corresponding data formats. Please also note the output data type of these functions.
| Function | Description | Output type | json | orc | avro | parquet |
| :----------| :------------------------------------------------------------------ |:----------- |:-----|:----|:-----|:-----|
| min() | Provides the min value of an array of numbers | Double | &#10003; | &#10003; | &#10003; | &#10003; |
| max() | Provides the max value of an array of numbers | Double | &#10003; | &#10003; | &#10003; | &#10003; |
| avg() | Provides the average value of an array of numbers | Double | &#10003; | &#10003; | &#10003; | &#10003; |
| stddev() | Provides the standard deviation value of an array of numbers | Double | &#10003; | &#10003; | &#10003; | &#10003; |
| length() | Provides the length of an array | Integer | &#10003; | &#10003; | &#10003; | &#10003; |
| sum() | Provides the sum value of an array of numbers | Double | &#10003; | &#10003; | &#10003; | &#10003; |
| concat(X) | Provides a concatenated version of the path output with a new item | like input | &#10003; | &#10007; | &#10007; | &#10007; |
| append(X) | add an item to the json path output array | like input | &#10003; | &#10007; | &#10007; | &#10007; |
| keys() | Provides the property keys (An alternative for terminal tilde ~) | Set<E> | &#10007; | &#10007; | &#10007; | &#10007; |
## Parser

View File

@ -262,6 +262,37 @@ public class AvroFlattenerMakerTest
record.getSomeIntArray(),
flattener.makeJsonPathExtractor("$.someIntArray").apply(record)
);
Assert.assertEquals(
(double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).min().getAsInt(),
//return type of min is double
flattener.makeJsonPathExtractor("$.someIntArray.min()").apply(record)
);
Assert.assertEquals(
(double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).max().getAsInt(),
//return type of max is double
flattener.makeJsonPathExtractor("$.someIntArray.max()").apply(record)
);
Assert.assertEquals(
record.getSomeIntArray().stream().mapToInt(Integer::intValue).average().getAsDouble(),
flattener.makeJsonPathExtractor("$.someIntArray.avg()").apply(record)
);
Assert.assertEquals(
record.getSomeIntArray().size(),
flattener.makeJsonPathExtractor("$.someIntArray.length()").apply(record)
);
Assert.assertEquals(
(double) record.getSomeIntArray().stream().mapToInt(Integer::intValue).sum(),
//return type of sum is double
flattener.makeJsonPathExtractor("$.someIntArray.sum()").apply(record)
);
Assert.assertEquals(
2.681,
(double) flattener.makeJsonPathExtractor("$.someIntArray.stddev()").apply(record),
0.0001
);
Assert.assertEquals(
record.getSomeStringArray(),
flattener.makeJsonPathExtractor("$.someStringArray").apply(record)

View File

@ -185,6 +185,6 @@ public class OrcStructJsonProvider implements JsonProvider
@Override
public Object unwrap(final Object o)
{
throw new UnsupportedOperationException("Unused");
return o;
}
}

View File

@ -112,6 +112,11 @@ public class OrcReaderTest
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_int", "$.middle.list[1].int1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_intlist", "$.middle.list[*].int1"),
new JSONPathFieldSpec(
JSONPathFieldType.PATH,
"struct_list_struct_middleListLength",
"$.middle.list.length()"
),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "list_struct_string", "$.list[0].string1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "map_struct_int", "$.map.chani.int1")
)
@ -145,6 +150,8 @@ public class OrcReaderTest
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
Assert.assertEquals(ImmutableList.of("1", "2"), row.getDimension("struct_list_struct_intlist"));
Assert.assertEquals("good", Iterables.getOnlyElement(row.getDimension("list_struct_string")));
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_middleListLength")));
Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), row.getTimestamp());
while (iterator.hasNext()) {
@ -253,6 +260,58 @@ public class OrcReaderTest
}
}
/**
* schema: struct<string1:string, list:array<int>, ts:timestamp>
* data: {"dim1","[7,8,9]","2000-03-12 15:00:00"}
*/
@Test
public void testJsonPathFunctions() throws IOException
{
final OrcInputFormat inputFormat = new OrcInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "min", "$.list.min()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "max", "$.list.max()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "avg", "$.list.avg()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "len", "$.list.length()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "sum", "$.list.sum()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "stddev", "$.list.stddev()"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "append", "$.list.append(10)")
)
),
null,
new Configuration()
);
final InputEntityReader reader = createReader(
new TimestampSpec("ts", "millis", null),
new DimensionsSpec(null),
inputFormat,
"example/test_json_path_functions.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
int actualRowCount = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
actualRowCount++;
Assert.assertEquals("7.0", Iterables.getOnlyElement(row.getDimension("min")));
Assert.assertEquals("8.0", Iterables.getOnlyElement(row.getDimension("avg")));
Assert.assertEquals("9.0", Iterables.getOnlyElement(row.getDimension("max")));
Assert.assertEquals("24.0", Iterables.getOnlyElement(row.getDimension("sum")));
Assert.assertEquals("3", Iterables.getOnlyElement(row.getDimension("len")));
//deviation of [7,8,9] is 1/3, stddev is sqrt(1/3), approximately 0.8165
Assert.assertEquals(0.8165, Double.parseDouble(Iterables.getOnlyElement(row.getDimension("stddev"))), 0.0001);
//append is not supported
Assert.assertEquals(Collections.emptyList(), row.getDimension("append"));
}
Assert.assertEquals(1, actualRowCount);
}
}
private InputEntityReader createReader(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,

View File

@ -46,6 +46,11 @@
"type": "path",
"name": "list",
"expr": "$.listDim"
},
{
"type": "path",
"name": "listLength",
"expr": "$.listDim.length()"
}
]
},

View File

@ -191,7 +191,7 @@ public class ParquetGroupJsonProvider implements JsonProvider
@Override
public Object unwrap(final Object o)
{
throw new UnsupportedOperationException("Unused");
return o;
}
}

View File

@ -116,6 +116,7 @@ public class FlattenSpecParquetInputTest extends BaseParquetInputTest
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v1", rows.get(0).getDimension("list").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("list").get(1));
Assert.assertEquals("2", rows.get(0).getDimension("listLength").get(0));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
}

View File

@ -362,7 +362,21 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
waitForAllTasksToCompleteForDataSource(datasourceName);
}
}
@Test
public void testJsonFunctions() throws Exception
{
final String taskSpec = getResourceAsString("/indexer/json_path_index_task.json");
submitTaskAndWait(
taskSpec,
"json_path_index_test",
false,
true,
new Pair<>(false, false)
);
doTestQuery("json_path_index_test", "/indexer/json_path_index_queries.json");
}
}

View File

@ -0,0 +1,49 @@
[
{
"description": "timeseries",
"query": {
"queryType": "timeseries",
"dataSource": "json_path_index_test",
"intervals": [
"1000/3000"
],
"aggregations": [
{
"type": "longSum",
"name": "len",
"fieldName": "len"
},
{
"type": "longSum",
"name": "max",
"fieldName": "max"
},
{
"type": "longSum",
"name": "min",
"fieldName": "min"
},
{
"type": "longSum",
"name": "sum",
"fieldName": "sum"
}
],
"granularity": {
"type": "all"
}
},
"expectedResults": [
{
"timestamp": "2013-08-31T01:02:33.000Z",
"result": {
"sum": 10,
"min": 0,
"len": 5,
"max": 4
}
}
]
}
]

View File

@ -0,0 +1,80 @@
{
"type": "index",
"dataSource": "json_path_index_test",
"spec": {
"dataSchema": {
"dataSource": "json_path_index_test",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
{
"type": "long",
"name": "len"
},
{
"type": "long",
"name": "min"
},
{
"type": "long",
"name": "max"
},
{
"type": "long",
"name": "sum"
}
],
"dimensionExclusions": [
"__time",
"timestamp"
]
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": {
"type": "none"
}
}
},
"ioConfig": {
"type": "index",
"inputSource": {
"type": "inline",
"data": "{\"timestamp\": \"2013-08-31T01:02:33Z\", \"values\": [0,1,2,3,4] }"
},
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "len",
"expr": "$.values.length()"
},
{
"type": "path",
"name": "min",
"expr": "$.values.min()"
},
{
"type": "path",
"name": "max",
"expr": "$.values.max()"
},
{
"type": "path",
"name": "sum",
"expr": "$.values.sum()"
}
]
}
}
}
}
}

View File

@ -38,6 +38,7 @@ BCP
Base64
Base64-encoded
ByteBuffer
concat
CIDR
CORS
CNF