Added TYPE(native) data type for external tables (#13958)

This commit is contained in:
Paul Rogers 2023-03-22 21:43:29 -07:00 committed by GitHub
parent 2ad133c06e
commit da42ee5bfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 5 deletions

View File

@ -402,6 +402,13 @@ FROM TABLE(
) (x VARCHAR, y VARCHAR)
```
The JSON function allows columns to be of type `TYPE('COMPLEX<json>')` which indicates that the column contains
some form of complex JSON: a JSON object, a JSON array, or an array of JSON objects or arrays.
Note that the case must exactly match that given: upper case `COMPLEX`, lower case `json`.
The SQL type simply names a native Druid type. However, the actual
segment column produced may be of some other type if Druid infers that it can use a simpler type
instead.
### Parameters
Starting with the Druid 26.0 release, you can use query parameters with MSQ queries. You may find

View File

@ -81,7 +81,11 @@ public class Columns
if (sqlType == null) {
return null;
}
return SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType));
ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType));
if (druidType != null) {
return druidType;
}
return ColumnType.fromString(sqlType);
}
public static void validateScalarColumn(String name, String type)
@ -105,7 +109,7 @@ public class Columns
for (ColumnSpec col : columns) {
ColumnType druidType = null;
if (col.sqlType() != null) {
druidType = Columns.SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(col.sqlType()));
druidType = Columns.druidType(col.sqlType());
}
if (druidType == null) {
druidType = ColumnType.STRING;

View File

@ -402,6 +402,7 @@ data: {
# Return type of method implementation should be "SqlTypeNameSpec".
# Example: SqlParseTimeStampZ().
dataTypeParserMethods: [
"DruidType()"
]
# List of methods for parsing builtin function calls.

View File

@ -92,3 +92,18 @@ SqlNodeList ClusterItems() :
return new SqlNodeList(list, s.addAll(list).pos());
}
}
SqlTypeNameSpec DruidType() :
{
String typeName;
}
{
<TYPE> <LPAREN> <QUOTED_STRING>
{
typeName = SqlParserUtil.trim(token.image, "'");
}
<RPAREN>
{
return new SqlUserDefinedTypeNameSpec(typeName, span().pos());
}
}

View File

@ -238,11 +238,15 @@ public class Externals
if (spec == null) {
throw unsupportedType(name, dataType);
}
SqlIdentifier typeName = spec.getTypeName();
if (typeName == null || !typeName.isSimple()) {
SqlIdentifier typeNameIdentifier = spec.getTypeName();
if (typeNameIdentifier == null || !typeNameIdentifier.isSimple()) {
throw unsupportedType(name, dataType);
}
SqlTypeName type = SqlTypeName.get(typeName.getSimple());
String simpleName = typeNameIdentifier.getSimple();
if (StringUtils.toLowerCase(simpleName).startsWith(("complex<"))) {
return simpleName;
}
SqlTypeName type = SqlTypeName.get(simpleName);
if (type == null) {
throw unsupportedType(name, dataType);
}

View File

@ -35,6 +35,8 @@ public class DruidTypeSystem implements RelDataTypeSystem
*/
public static final int DEFAULT_TIMESTAMP_PRECISION = 3;
public static final String VARIANT_TYPE_NAME = "VARIANT";
private DruidTypeSystem()
{
// Singleton.

View File

@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.UOE;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
import org.apache.druid.sql.calcite.filtration.Filtration;
@ -207,6 +208,45 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testHttpJson()
{
final ExternalDataSource httpDataSource = new ExternalDataSource(
new HttpInputSource(
Collections.singletonList(toURI("http://foo.com/bar.json")),
"bob",
new DefaultPasswordProvider("secret"),
new HttpInputSourceConfig(null)
),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", NestedDataComplexTypeSerde.TYPE)
.build()
);
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(http(userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://foo.com/bar.json'],\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z TYPE('COMPLEX<json>'))\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", httpDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), Externals.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Basic use of an inline input source via EXTERN
*/