Enhanced MSQ table functions (#13360)

* Enhanced MSQ table functions
* HTTP, LOCALFILES and INLINE table functions powered by
catalog metadata.
* Documentation
This commit is contained in:
Paul Rogers 2022-12-08 13:56:02 -08:00 committed by GitHub
parent d8e27eaab4
commit 013a12e86f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 2333 additions and 357 deletions

View File

@ -76,7 +76,7 @@ public class HttpInputSourceConfig
public String toString()
{
return "HttpInputSourceConfig{" +
", allowedProtocols=" + allowedProtocols +
"allowedProtocols=" + allowedProtocols +
'}';
}
}

View File

@ -228,4 +228,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
{
return Objects.hash(baseDir, filter, files);
}
@Override
public String toString()
{
return "LocalInputSource{" +
"baseDir=\"" + baseDir +
"\", filter=" + filter +
", files=" + files +
"}";
}
}

View File

@ -26,7 +26,6 @@ import java.util.Objects;
*/
public class Pair<T1, T2>
{
public static <T1, T2> Pair<T1, T2> of(@Nullable T1 lhs, @Nullable T2 rhs)
{
return new Pair<>(lhs, rhs);
@ -56,7 +55,7 @@ public class Pair<T1, T2>
if (!(o instanceof Pair)) {
return false;
}
Pair pair = (Pair) o;
Pair<?, ?> pair = (Pair<?, ?>) o;
return Objects.equals(lhs, pair.lhs) && Objects.equals(rhs, pair.rhs);
}

View File

@ -32,9 +32,9 @@ sidebar_label: Reference
This topic is a reference guide for the multi-stage query architecture in Apache Druid. For examples of real-world
usage, refer to the [Examples](examples.md) page.
### EXTERN
### `EXTERN`
Use the EXTERN function to read external data.
Use the `EXTERN` function to read external data.
Function format:
@ -50,7 +50,7 @@ FROM TABLE(
)
```
EXTERN consists of the following parts:
`EXTERN` consists of the following parts:
1. Any [Druid input source](../ingestion/native-batch-input-source.md) as a JSON-encoded string.
2. Any [Druid input format](../ingestion/data-formats.md) as a JSON-encoded string.
@ -58,12 +58,134 @@ EXTERN consists of the following parts:
For more information, see [Read external data with EXTERN](concepts.md#extern).
### INSERT
### `HTTP`, `INLINE` and `LOCALFILES`
Use the INSERT statement to insert data.
While `EXTERN` allows you to specify an external table using JSON, other table functions allow you
describe the external table using SQL syntax. Each function works for one specific kind of input
source. You provide properties using SQL named arguments. The row signature is given using the
Druid SQL `EXTEND` keyword using SQL syntax and types. Function format:
Unlike standard SQL, INSERT loads data into the target table according to column name, not positionally. If necessary,
use `AS` in your SELECT column list to assign the correct names. Do not rely on their positions within the SELECT
```sql
SELECT
<column>
FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => 'http:foo.com/bar.csv',
format => 'csv'
)
) EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
```
Note that the `EXTEND` keyword is optional. The following is equally valid (and perhaps
more convenient):
```sql
SELECT
<column>
FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => 'http:foo.com/bar.csv',
format => 'csv'
)
) (x VARCHAR, y VARCHAR, z BIGINT)
```
The set of table functions and formats is preliminary in this release.
#### `HTTP`
The `HTTP` table function represents the `HttpInputSource` class in Druid which allows you to
read from an HTTP server. The function accepts the following arguments:
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `userName` | Basic authentication user name | `httpAuthenticationUsername` | No |
| `password` | Basic authentication password | `httpAuthenticationPassword` | No |
| `passwordEnvVar` | Environment variable that contains the basic authentication password| `httpAuthenticationPassword` | No |
| `uris` | Comma-separated list of URIs to read. | `uris` | Yes |
#### `INLINE`
The `INLINE` table function represents the `InlineInputSource` class in Druid which provides
data directly in the table function. The function accepts the following arguments:
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `data` | Text lines of inline data. Separate lines with a newline. | `data` | Yes |
#### `LOCALFILES`
The `LOCALFILES` table function represents the `LocalInputSource` class in Druid which reads
files from the file system of the node running Druid. This is most useful for single-node
installations. The function accepts the following arguments:
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `baseDir` | Directory to read from. | `baseDir` | No |
| `filter` | Filter pattern to read. Example: `*.csv`. | `filter` | No |
| `files` | Comma-separated list of files to read. | `files` | No |
You must either provide the `baseDir` or the list of `files`. You can provide both, in which case
the files are assumed relative to the `baseDir`. If you provide a `filter`, you must provide the
`baseDir`.
Note that, due to [Issue #13359](https://github.com/apache/druid/issues/13359), the functionality
described above is broken. Until that issue is resolved, you must provide one or more absolute
file paths in the `files` property and the other two properties are unavailable.
#### Table Function Format
Each of the table functions above requires that you specify a format.
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `format` | The input format, using the same names as for `EXTERN`. | `inputFormat.type` | Yes |
#### CSV Format
Use the `csv` format to read from CSV. This choice selects the Druid `CsvInputFormat` class.
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `listDelimiter` | The delimiter to use for fields that represent a list of strings. | `listDelimiter` | No |
| `skipRows` | The number of rows to skip at the start of the file. Default is 0. | `skipHeaderRows` | No |
MSQ does not have the ability to infer schema from a CSV, file, so the `findColumnsFromHeader` property
is unavailable. Instead, Columns are given using the `EXTEND` syntax described above.
#### Delimited Text Format
Use the `tsv` format to read from an arbitrary delimited (CSV-like) file such as tab-delimited,
pipe-delimited, etc. This choice selects the Druid `DelimitedInputFormat` class.
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `delimiter` | The delimiter which separates fields. | `delimiter` | Yes |
| `listDelimiter` | The delimiter to use for fields that represent a list of strings. | `listDelimiter` | No |
| `skipRows` | The number of rows to skip at the start of the file. Default is 0. | `skipHeaderRows` | No |
As noted above, MSQ cannot infer schema using headers. Use `EXTEND` instead.
#### JSON Format
Use the `json` format to read from a JSON input source. This choice selects the Druid `JsonInputFormat` class.
| Name | Description | JSON equivalent | Required |
| ---- | ----------- | --------------- | -------- |
| `keepNulls` | Whether to keep null values. Defaults to `false`. | `keepNullColumns` | No |
### `INSERT`
Use the `INSERT` statement to insert data.
Unlike standard SQL, `INSERT` loads data into the target table according to column name, not positionally. If necessary,
use `AS` in your `SELECT` column list to assign the correct names. Do not rely on their positions within the SELECT
clause.
Statement format:
@ -85,15 +207,15 @@ INSERT consists of the following parts:
For more information, see [Load data with INSERT](concepts.md#insert).
### REPLACE
### `REPLACE`
You can use the REPLACE function to replace all or some of the data.
You can use the `REPLACE` function to replace all or some of the data.
Unlike standard SQL, REPLACE loads data into the target table according to column name, not positionally. If necessary,
use `AS` in your SELECT column list to assign the correct names. Do not rely on their positions within the SELECT
Unlike standard SQL, `REPLACE` loads data into the target table according to column name, not positionally. If necessary,
use `AS` in your `SELECT` column list to assign the correct names. Do not rely on their positions within the SELECT
clause.
#### REPLACE all data
#### `REPLACE` all data
Function format to replace all data:
@ -105,7 +227,7 @@ PARTITIONED BY <time granularity>
[ CLUSTERED BY <column list> ]
```
#### REPLACE specific time ranges
#### `REPLACE` specific time ranges
Function format to replace specific time ranges:
@ -117,7 +239,7 @@ PARTITIONED BY <time granularity>
[ CLUSTERED BY <column list> ]
```
REPLACE consists of the following parts:
`REPLACE` consists of the following parts:
1. Optional [context parameters](./reference.md#context-parameters).
2. A `REPLACE INTO <dataSource>` clause at the start of your query, such as `REPLACE INTO "your-table".`
@ -132,7 +254,7 @@ REPLACE consists of the following parts:
For more information, see [Overwrite data with REPLACE](concepts.md#replace).
### PARTITIONED BY
### `PARTITIONED BY`
The `PARTITIONED BY <time granularity>` clause is required for [INSERT](#insert) and [REPLACE](#replace). See
[Partitioning](concepts.md#partitioning) for details.
@ -164,7 +286,7 @@ The following ISO 8601 periods are supported for `TIME_FLOOR`:
For more information about partitioning, see [Partitioning](concepts.md#partitioning).
### CLUSTERED BY
### `CLUSTERED BY`
The `CLUSTERED BY <column list>` clause is optional for [INSERT](#insert) and [REPLACE](#replace). It accepts a list of
column names or expressions.
@ -227,12 +349,13 @@ than 100 workers or if the combined sketch size among all workers is more than 1
`PARALLEL` is chosen.
## Durable Storage
This section enumerates the advantages and performance implications of enabling durable storage while executing MSQ tasks.
To prevent durable storage from getting filled up with temporary files in case the tasks fail to clean them up, a periodic
cleaner can be scheduled to clean the directories corresponding to which there isn't a controller task running. It utilizes
the storage connector to work upon the durable storage. The durable storage location should only be utilized to store the output
for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well.
the storage connector to work upon the durable storage. The durable storage location should only be utilized to store the output
for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well.
Following table lists the properties that can be set to control the behavior of the durable storage of the cluster.
|Parameter |Default | Description |

View File

@ -32,6 +32,9 @@ import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
import java.util.List;
@ -59,6 +62,9 @@ public class MSQSqlModule implements DruidModule
// Set up the EXTERN macro.
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class);
}
@Provides

View File

@ -21,15 +21,17 @@ package org.apache.druid.catalog.model;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.Period;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -46,8 +48,28 @@ public interface ModelProperties
{
interface PropertyDefn<T>
{
/**
* Name of the property as visible to catalog users. All properties are top-level within
* the {@code properties} object within a catalog spec.
*/
String name();
/**
* Metadata about properties, such as how they apply to SQL table functions.
*
* @see {@link PropertyAttributes} for details.
*/
Map<String, Object> attributes();
/**
* The name of the type of this property to be displayed in error messages.
*/
String typeName();
/**
* Validates that the object given is valid for this property. Provides the JSON
* mapper in case JSON decoding is required.
*/
void validate(Object value, ObjectMapper jsonMapper);
/**
@ -57,16 +79,32 @@ public interface ModelProperties
* value.
*/
Object merge(Object existing, Object update);
/**
* Decodes a JSON-encoded value into a corresponding Java value.
*/
T decode(Object value, ObjectMapper jsonMapper);
/**
* Decodes a SQL-encoded value into a corresponding Java value.
*/
T decodeSqlValue(Object value, ObjectMapper jsonMapper);
}
abstract class BasePropertyDefn<T> implements PropertyDefn<T>
{
protected final String name;
protected final Map<String, Object> attributes;
public BasePropertyDefn(final String name, Map<String, Object> attributes)
{
this.name = name;
this.attributes = attributes == null ? ImmutableMap.of() : attributes;
}
public BasePropertyDefn(final String name)
{
this.name = name;
this(name, null);
}
@Override
@ -75,41 +113,63 @@ public interface ModelProperties
return name;
}
@Override
public Map<String, Object> attributes()
{
return attributes;
}
@Override
public String typeName()
{
return PropertyAttributes.typeName(this);
}
@Override
public Object merge(Object existing, Object update)
{
return update == null ? existing : update;
}
@Override
public T decodeSqlValue(Object value, ObjectMapper jsonMapper)
{
return decode(value, jsonMapper);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{"
+ "name: " + name
+ ", type: " + typeName()
+ ", attributes: " + attributes()
+ "}";
}
}
class SimplePropertyDefn<T> extends BasePropertyDefn<T>
abstract class SimplePropertyDefn<T> extends BasePropertyDefn<T>
{
public final Class<T> valueClass;
public SimplePropertyDefn(
final String name,
final Class<T> valueClass
final Class<T> valueClass,
final Map<String, Object> attribs
)
{
super(name);
super(
name,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.TYPE_NAME,
valueClass.getSimpleName()
),
attribs
)
);
this.valueClass = valueClass;
}
@Override
public String typeName()
{
return valueClass.getSimpleName();
}
/**
* Convert the value from the deserialized JSON format to the type
* required by this field data type. Also used to decode values from
@ -144,30 +204,49 @@ public interface ModelProperties
{
decode(value, jsonMapper);
}
protected T decodeJson(Object value, ObjectMapper jsonMapper)
{
if (value == null) {
return null;
}
try {
return jsonMapper.readValue((String) value, valueClass);
}
catch (Exception e) {
throw new IAE(
"Value [%s] is not valid for property [%s]",
value,
name
);
}
}
}
class TypeRefPropertyDefn<T> extends BasePropertyDefn<T>
{
public final String typeName;
public final TypeReference<T> valueType;
public TypeRefPropertyDefn(
final String name,
final String typeName,
final TypeReference<T> valueType
final TypeReference<T> valueType,
final Map<String, Object> attribs
)
{
super(name);
this.typeName = Preconditions.checkNotNull(typeName);
super(
name,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.TYPE_NAME,
typeName
),
attribs
)
);
this.valueType = valueType;
}
@Override
public String typeName()
{
return typeName;
}
@Override
public T decode(Object value, ObjectMapper jsonMapper)
{
@ -202,17 +281,27 @@ public interface ModelProperties
class StringPropertyDefn extends SimplePropertyDefn<String>
{
public StringPropertyDefn(String name)
public StringPropertyDefn(String name, Map<String, Object> attribs)
{
super(name, String.class);
super(
name,
String.class,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.SQL_JAVA_TYPE,
String.class
),
attribs
)
);
}
}
class GranularityPropertyDefn extends StringPropertyDefn
{
public GranularityPropertyDefn(String name)
public GranularityPropertyDefn(String name, Map<String, Object> attribs)
{
super(name);
super(name, attribs);
}
@Override
@ -239,17 +328,37 @@ public interface ModelProperties
class IntPropertyDefn extends SimplePropertyDefn<Integer>
{
public IntPropertyDefn(String name)
public IntPropertyDefn(String name, Map<String, Object> attribs)
{
super(name, Integer.class);
super(
name,
Integer.class,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.SQL_JAVA_TYPE,
Integer.class
),
attribs
)
);
}
}
class BooleanPropertyDefn extends SimplePropertyDefn<Boolean>
{
public BooleanPropertyDefn(String name)
public BooleanPropertyDefn(String name, Map<String, Object> attribs)
{
super(name, Boolean.class);
super(
name,
Boolean.class,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.SQL_JAVA_TYPE,
Boolean.class
),
attribs
)
);
}
}
@ -258,10 +367,11 @@ public interface ModelProperties
public ListPropertyDefn(
final String name,
final String typeName,
final TypeReference<List<T>> valueType
final TypeReference<List<T>> valueType,
final Map<String, Object> attribs
)
{
super(name, typeName, valueType);
super(name, typeName, valueType, attribs);
}
@SuppressWarnings("unchecked")
@ -300,13 +410,33 @@ public interface ModelProperties
class StringListPropertyDefn extends ListPropertyDefn<String>
{
public StringListPropertyDefn(String name)
public StringListPropertyDefn(
final String name,
final Map<String, Object> attribs
)
{
super(
name,
"string list",
new TypeReference<List<String>>() {}
new TypeReference<List<String>>() {},
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.SQL_JAVA_TYPE,
String.class
),
attribs
)
);
}
@Override
public List<String> decodeSqlValue(Object value, ObjectMapper jsonMapper)
{
if (!(value instanceof String)) {
throw new IAE(StringUtils.format("Argument [%s] is not a VARCHAR", value));
}
String[] values = ((String) value).split(",\\s*");
return Arrays.asList(values);
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.catalog.model;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.table.ExternalTableSpec;
import java.util.List;
@ -33,37 +34,6 @@ import java.util.Map;
*/
public interface ParameterizedDefn
{
interface ParameterDefn
{
String name();
Class<?> valueClass();
}
class ParameterImpl implements ParameterDefn
{
private final String name;
private final Class<?> type;
public ParameterImpl(final String name, final Class<?> type)
{
this.name = name;
this.type = type;
}
@Override
public String name()
{
return name;
}
@Override
public Class<?> valueClass()
{
return type;
}
}
List<ParameterDefn> parameters();
ParameterDefn parameter(String name);
List<PropertyDefn<?>> parameters();
ExternalTableSpec applyParameters(ResolvedTable table, Map<String, Object> parameters);
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.model;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import java.util.HashMap;
import java.util.Map;
/**
* Definition and access of attributes of table definition properties. These
* are meta-attributes: attributes of attributes. These are primarily used to
* indicate the role of each table property when used in a SQL table function.
*/
public class PropertyAttributes
{
/**
* If set to {@code true}, then the property is also a SQL function parameter.
*/
public static final String IS_SQL_FN_PARAM_KEY = "sqlFnArg";
/**
* If set to {@code true}, then this SQL function parameter is optional. That is,
* it can take a SQL {@code NULL} value if parameters are listed in order, or can
* be ommited if parameters are provided by name.
*/
public static final String IS_SQL_FN_OPTIONAL = "optional";
public static final String IS_PARAMETER = "param";
/**
* The type name to display in error messages.
*/
public static final String TYPE_NAME = "typeName";
/**
* The type to use when creating a SQL function parameter.
*/
public static final String SQL_JAVA_TYPE = "sqlJavaType";
public static final Map<String, Object> SQL_FN_PARAM =
ImmutableMap.of(IS_SQL_FN_PARAM_KEY, true);
public static final Map<String, Object> OPTIONAL_SQL_FN_PARAM =
ImmutableMap.of(IS_SQL_FN_PARAM_KEY, true, IS_SQL_FN_OPTIONAL, true);
public static final Map<String, Object> TABLE_PARAM =
ImmutableMap.of(IS_PARAMETER, true);
public static final Map<String, Object> SQL_AND_TABLE_PARAM =
ImmutableMap.of(IS_SQL_FN_PARAM_KEY, true, IS_PARAMETER, true);
private static boolean getBoolean(PropertyDefn<?> defn, String key)
{
Object value = defn.attributes().get(key);
return value != null && (Boolean) value;
}
public static boolean isSqlFunctionParameter(PropertyDefn<?> defn)
{
return getBoolean(defn, IS_SQL_FN_PARAM_KEY);
}
public static boolean isOptional(PropertyDefn<?> defn)
{
return getBoolean(defn, IS_SQL_FN_OPTIONAL);
}
public static String typeName(PropertyDefn<?> defn)
{
return (String) defn.attributes().get(TYPE_NAME);
}
public static Class<?> sqlParameterType(PropertyDefn<?> defn)
{
return (Class<?>) defn.attributes().get(SQL_JAVA_TYPE);
}
public static boolean isExternTableParameter(PropertyDefn<?> defn)
{
return getBoolean(defn, IS_PARAMETER);
}
public static Map<String, Object> merge(Map<String, Object> attribs1, Map<String, Object> attribs2)
{
if (attribs1 == null) {
return attribs2;
}
if (attribs2 == null) {
return attribs1;
}
Map<String, Object> merged = new HashMap<>(attribs1);
merged.putAll(attribs2);
return ImmutableMap.copyOf(merged);
}
}

View File

@ -61,7 +61,7 @@ public class TableDefn extends ObjectDefn
typeValue,
CatalogUtils.concatLists(
Collections.singletonList(
new ModelProperties.StringPropertyDefn(DESCRIPTION_PROPERTY)
new ModelProperties.StringPropertyDefn(DESCRIPTION_PROPERTY, null)
),
properties
)

View File

@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.HttpTableDefn;
import org.apache.druid.catalog.model.table.InlineTableDefn;
import org.apache.druid.catalog.model.table.LocalTableDefn;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.IAE;
import javax.inject.Inject;
@ -79,7 +80,7 @@ public class TableDefnRegistry
@Inject
public TableDefnRegistry(
final ObjectMapper jsonMapper
@Json ObjectMapper jsonMapper
)
{
this(TABLE_DEFNS, jsonMapper);

View File

@ -70,7 +70,7 @@ public class AbstractDatasourceDefn extends TableDefn
{
public SegmentGranularityFieldDefn()
{
super(SEGMENT_GRANULARITY_PROPERTY);
super(SEGMENT_GRANULARITY_PROPERTY, null);
}
@Override
@ -88,7 +88,7 @@ public class AbstractDatasourceDefn extends TableDefn
{
public HiddenColumnsDefn()
{
super(HIDDEN_COLUMNS_PROPERTY);
super(HIDDEN_COLUMNS_PROPERTY, null);
}
@Override
@ -121,11 +121,12 @@ public class AbstractDatasourceDefn extends TableDefn
CatalogUtils.concatLists(
Arrays.asList(
new SegmentGranularityFieldDefn(),
new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY),
new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY, null),
new ModelProperties.ListPropertyDefn<ClusterKeySpec>(
CLUSTER_KEYS_PROPERTY,
"cluster keys",
new TypeReference<List<ClusterKeySpec>>() { }
new TypeReference<List<ClusterKeySpec>>() { },
null
),
new HiddenColumnsDefn()
),

View File

@ -26,9 +26,10 @@ import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ColumnDefn;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ModelProperties;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.ParameterizedDefn;
import org.apache.druid.catalog.model.ParameterizedDefn.ParameterDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.table.InputFormats.InputFormatDefn;
@ -36,13 +37,13 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Definition of an external input source, primarily for ingestion.
@ -70,16 +71,14 @@ public abstract class ExternalTableDefn extends TableDefn
final String typeValue,
final List<PropertyDefn<?>> properties,
final List<ColumnDefn> columnDefns,
final List<InputFormatDefn> formats,
final List<ParameterDefn> parameters
final List<InputFormatDefn> formats
)
{
super(
name,
typeValue,
addFormatProperties(properties, formats),
columnDefns,
parameters
columnDefns
);
ImmutableMap.Builder<String, InputFormatDefn> builder = ImmutableMap.builder();
for (InputFormatDefn format : formats) {
@ -99,6 +98,8 @@ public abstract class ExternalTableDefn extends TableDefn
)
{
List<PropertyDefn<?>> toAdd = new ArrayList<>();
PropertyDefn<?> formatProp = new ModelProperties.StringPropertyDefn(FORMAT_PROPERTY, PropertyAttributes.SQL_FN_PARAM);
toAdd.add(formatProp);
Map<String, PropertyDefn<?>> formatProps = new HashMap<>();
for (InputFormatDefn format : formats) {
for (PropertyDefn<?> prop : format.properties()) {
@ -180,41 +181,32 @@ public abstract class ExternalTableDefn extends TableDefn
}
protected static final ExternalColumnDefn INPUT_COLUMN_DEFN = new ExternalColumnDefn();
private final List<ParameterDefn> parameterList;
private final Map<String, ParameterDefn> parameterMap;
private final List<PropertyDefn<?>> fields;
public ExternalTableDefn(
final String name,
final String typeValue,
final List<PropertyDefn<?>> fields,
final List<ColumnDefn> columnDefns,
final List<ParameterDefn> parameters
final List<ColumnDefn> columnDefns
)
{
super(name, typeValue, fields, columnDefns);
if (CollectionUtils.isNullOrEmpty(parameters)) {
this.parameterMap = null;
this.parameterList = null;
} else {
this.parameterList = parameters;
Map<String, ParameterDefn> params = new HashMap<>();
for (ParameterDefn param : parameters) {
if (params.put(param.name(), param) != null) {
throw new ISE("Duplicate parameter: %s", param.name());
}
}
this.parameterMap = ImmutableMap.copyOf(params);
}
this.fields = fields;
}
public List<ParameterDefn> parameters()
public List<PropertyDefn<?>> parameters()
{
return parameterList;
return fields.stream()
.filter(f -> PropertyAttributes.isExternTableParameter(f))
.collect(Collectors.toList());
}
public ParameterDefn parameter(String key)
public List<PropertyDefn<?>> tableFunctionParameters()
{
return parameterMap.get(key);
return fields.stream()
.filter(f -> PropertyAttributes.isSqlFunctionParameter(f))
.collect(Collectors.toList());
}
/**

View File

@ -23,16 +23,19 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
/**
* Catalog form of an external table specification used to
* pass along the three components needed for an external table
* in MSQ ingest.
* Catalog form of an external table specification used to pass along the three
* components needed for an external table in MSQ ingest. Just like
* {@code ExternalTableSource}, except that the parameters are not required
* to be non-null.
*/
public class ExternalTableSpec
{
protected final InputSource inputSource;
protected final InputFormat inputFormat;
protected final RowSignature signature;
@Nullable public final InputSource inputSource;
@Nullable public final InputFormat inputFormat;
@Nullable public final RowSignature signature;
public ExternalTableSpec(
final InputSource inputSource,
@ -43,19 +46,4 @@ public class ExternalTableSpec
this.inputFormat = inputFormat;
this.signature = signature;
}
public InputSource inputSource()
{
return inputSource;
}
public InputFormat inputFormat()
{
return inputFormat;
}
public RowSignature signature()
{
return signature;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ModelProperties.StringListPropertyDefn;
import org.apache.druid.catalog.model.ModelProperties.StringPropertyDefn;
import org.apache.druid.catalog.model.ParameterizedDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ExternalTableDefn.FormattedExternalTableDefn;
import org.apache.druid.data.input.InputSource;
@ -52,7 +53,7 @@ import java.util.regex.Pattern;
* properties as the {@link HttpInputSource}, but as top-level properties
* that can be mapped to SQL function parameters. Property names are
* cleaned up for ease-of-use. The HTTP input source has multiple quirks,
* the conversion method smooths over those quirks for a simpler catalog
* the conversion method smoothes over those quirks for a simpler catalog
* experience. Provides a parameterized
* form where the user provides the partial URLs to use for a particular
* query.
@ -65,11 +66,13 @@ public class HttpTableDefn extends FormattedExternalTableDefn implements Paramet
// that class for the meaning of these properties.
public static final String URI_TEMPLATE_PROPERTY = "uriTemplate";
public static final String USER_PROPERTY = "user";
// Note, cannot be the simpler "user" since USER is a reserved word in SQL
// and we don't want to require users to quote "user" each time it is used.
public static final String USER_PROPERTY = "userName";
public static final String PASSWORD_PROPERTY = "password";
public static final String PASSWORD_ENV_VAR_PROPERTY = "passwordEnvVar";
public static final String URIS_PROPERTY = "uris";
public static final String URIS_PARAMETER = "uris";
public HttpTableDefn()
{
@ -77,27 +80,24 @@ public class HttpTableDefn extends FormattedExternalTableDefn implements Paramet
"HTTP input table",
TABLE_TYPE,
Arrays.asList(
new StringListPropertyDefn(URIS_PROPERTY),
new StringPropertyDefn(USER_PROPERTY),
new StringPropertyDefn(PASSWORD_PROPERTY),
new StringPropertyDefn(PASSWORD_ENV_VAR_PROPERTY),
new StringPropertyDefn(URI_TEMPLATE_PROPERTY)
new StringListPropertyDefn(URIS_PROPERTY, PropertyAttributes.SQL_AND_TABLE_PARAM),
new StringPropertyDefn(USER_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new StringPropertyDefn(PASSWORD_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new StringPropertyDefn(PASSWORD_ENV_VAR_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new StringPropertyDefn(URI_TEMPLATE_PROPERTY, null)
),
Collections.singletonList(INPUT_COLUMN_DEFN),
InputFormats.ALL_FORMATS,
Collections.singletonList(
new ParameterImpl(URIS_PARAMETER, String.class)
)
InputFormats.ALL_FORMATS
);
}
@Override
public ResolvedTable mergeParameters(ResolvedTable table, Map<String, Object> values)
{
String urisValue = CatalogUtils.safeGet(values, URIS_PARAMETER, String.class);
String urisValue = CatalogUtils.safeGet(values, URIS_PROPERTY, String.class);
List<String> uriValues = CatalogUtils.stringToList(urisValue);
if (CollectionUtils.isNullOrEmpty(uriValues)) {
throw new IAE("One or more values are required for parameter %s", URIS_PARAMETER);
throw new IAE("One or more values are required for parameter %s", URIS_PROPERTY);
}
String uriTemplate = table.stringProperty(URI_TEMPLATE_PROPERTY);
if (Strings.isNullOrEmpty(uriTemplate)) {

View File

@ -19,16 +19,20 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ModelProperties.StringListPropertyDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ExternalTableDefn.FormattedExternalTableDefn;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.utils.CollectionUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -43,17 +47,41 @@ public class InlineTableDefn extends FormattedExternalTableDefn
public static final String TABLE_TYPE = InlineInputSource.TYPE_KEY;
public static final String DATA_PROPERTY = "data";
/**
* Special handling of the data property which, in SQL, is a null-delimited
* list of rows. The user will usually provide a trailing newline which should
* not be interpreted as an empty data row.
*/
private static class DataPropertyDefn extends StringListPropertyDefn
{
public DataPropertyDefn(
final Map<String, Object> attribs
)
{
super(DATA_PROPERTY, attribs);
}
@Override
public List<String> decodeSqlValue(Object value, ObjectMapper jsonMapper)
{
if (!(value instanceof String)) {
throw new IAE(StringUtils.format("Argument [%s] is not a VARCHAR", value));
}
String[] values = ((String) value).trim().split("\n");
return Arrays.asList(values);
}
}
public InlineTableDefn()
{
super(
"Inline input table",
TABLE_TYPE,
Collections.singletonList(
new StringListPropertyDefn(DATA_PROPERTY)
new DataPropertyDefn(PropertyAttributes.SQL_FN_PARAM)
),
Collections.singletonList(INPUT_COLUMN_DEFN),
InputFormats.ALL_FORMATS,
null
InputFormats.ALL_FORMATS
);
}

View File

@ -19,6 +19,8 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.shaded.com.google.common.collect.ImmutableList;
import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ModelProperties.BooleanPropertyDefn;
@ -26,6 +28,7 @@ import org.apache.druid.catalog.model.ModelProperties.IntPropertyDefn;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.ModelProperties.SimplePropertyDefn;
import org.apache.druid.catalog.model.ModelProperties.StringPropertyDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CsvInputFormat;
@ -127,8 +130,8 @@ public class InputFormats
typeTag,
CatalogUtils.concatLists(
Arrays.asList(
new StringPropertyDefn(LIST_DELIMITER_PROPERTY),
new IntPropertyDefn(SKIP_ROWS_PROPERTY)
new StringPropertyDefn(LIST_DELIMITER_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new IntPropertyDefn(SKIP_ROWS_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM)
),
properties
)
@ -200,7 +203,7 @@ public class InputFormats
"Delimited Text",
DELIMITED_FORMAT_TYPE,
Collections.singletonList(
new StringPropertyDefn(DELIMITER_PROPERTY)
new StringPropertyDefn(DELIMITER_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM)
)
);
}
@ -236,7 +239,7 @@ public class InputFormats
"JSON",
JSON_FORMAT_TYPE,
Collections.singletonList(
new BooleanPropertyDefn(KEEP_NULLS_PROPERTY)
new BooleanPropertyDefn(KEEP_NULLS_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM)
)
);
}
@ -262,13 +265,37 @@ public class InputFormats
public static final String INPUT_FORMAT_SPEC_PROPERTY = "inputFormatSpec";
public static final String FORMAT_KEY = "generic";
private static class FormatPropertyDefn extends SimplePropertyDefn<InputFormat>
{
public FormatPropertyDefn()
{
super(
INPUT_FORMAT_SPEC_PROPERTY,
InputFormat.class,
PropertyAttributes.merge(
ImmutableMap.of(
PropertyAttributes.SQL_JAVA_TYPE,
String.class
),
PropertyAttributes.OPTIONAL_SQL_FN_PARAM
)
);
}
@Override
public InputFormat decodeSqlValue(Object value, ObjectMapper jsonMapper)
{
return decodeJson(value, jsonMapper);
}
}
public GenericFormatDefn()
{
super(
"Generic",
FORMAT_KEY,
Collections.singletonList(
new SimplePropertyDefn<InputFormat>(INPUT_FORMAT_SPEC_PROPERTY, InputFormat.class)
new FormatPropertyDefn()
)
);
}

View File

@ -23,6 +23,7 @@ import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ModelProperties.StringListPropertyDefn;
import org.apache.druid.catalog.model.ModelProperties.StringPropertyDefn;
import org.apache.druid.catalog.model.ParameterizedDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ExternalTableDefn.FormattedExternalTableDefn;
import org.apache.druid.data.input.InputSource;
@ -61,16 +62,12 @@ public class LocalTableDefn extends FormattedExternalTableDefn implements Parame
"Local file input table",
TABLE_TYPE,
Arrays.asList(
new StringPropertyDefn(BASE_DIR_PROPERTY),
new StringPropertyDefn(FILE_FILTER_PROPERTY),
new StringListPropertyDefn(FILES_PROPERTY)
new StringPropertyDefn(BASE_DIR_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new StringPropertyDefn(FILE_FILTER_PROPERTY, PropertyAttributes.OPTIONAL_SQL_FN_PARAM),
new StringListPropertyDefn(FILES_PROPERTY, PropertyAttributes.SQL_AND_TABLE_PARAM)
),
Collections.singletonList(INPUT_COLUMN_DEFN),
InputFormats.ALL_FORMATS,
Arrays.asList(
new ParameterImpl(FILE_FILTER_PROPERTY, String.class),
new ParameterImpl(FILES_PROPERTY, String.class)
)
InputFormats.ALL_FORMATS
);
}
@ -78,9 +75,9 @@ public class LocalTableDefn extends FormattedExternalTableDefn implements Parame
public ResolvedTable mergeParameters(ResolvedTable table, Map<String, Object> values)
{
// The safe get can only check
String filesParam = CatalogUtils.safeGet(values, FILES_PROPERTY, String.class);
String filterParam = CatalogUtils.safeGet(values, FILE_FILTER_PROPERTY, String.class);
Map<String, Object> revisedProps = new HashMap<>(table.properties());
final String filesParam = CatalogUtils.safeGet(values, FILES_PROPERTY, String.class);
final String filterParam = CatalogUtils.safeGet(values, FILE_FILTER_PROPERTY, String.class);
final Map<String, Object> revisedProps = new HashMap<>(table.properties());
if (filesParam != null) {
revisedProps.put(FILES_PROPERTY, CatalogUtils.stringToList(filesParam));
}
@ -93,11 +90,11 @@ public class LocalTableDefn extends FormattedExternalTableDefn implements Parame
@Override
protected InputSource convertSource(ResolvedTable table)
{
Map<String, Object> jsonMap = new HashMap<>();
final Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put(InputSource.TYPE_PROPERTY, LocalInputSource.TYPE_KEY);
String baseDir = table.stringProperty(BASE_DIR_PROPERTY);
final String baseDir = table.stringProperty(BASE_DIR_PROPERTY);
jsonMap.put("baseDir", baseDir);
List<String> files = table.stringListProperty(FILES_PROPERTY);
final List<String> files = table.stringListProperty(FILES_PROPERTY);
jsonMap.put("files", files);
// Note the odd semantics of this class.

View File

@ -39,6 +39,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@ -50,9 +51,11 @@ public class PropertyDefnTest
@Test
public void testString()
{
StringPropertyDefn prop = new StringPropertyDefn("prop");
StringPropertyDefn prop = new StringPropertyDefn("prop", ImmutableMap.of("foo", "bar"));
assertEquals("prop", prop.name());
assertEquals("String", prop.typeName());
assertEquals("bar", prop.attributes.get("foo"));
assertEquals(String.class, PropertyAttributes.sqlParameterType(prop));
assertNull(prop.decode(null, mapper));
assertEquals("value", prop.decode("value", mapper));
@ -67,12 +70,21 @@ public class PropertyDefnTest
assertThrows(Exception.class, () -> prop.validate(Arrays.asList("a", "b"), mapper));
}
@Test
public void testCustomTypeName()
{
// Custom type name
StringPropertyDefn prop = new StringPropertyDefn("prop", ImmutableMap.of(PropertyAttributes.TYPE_NAME, "MyType"));
assertEquals("MyType", prop.typeName());
}
@Test
public void testBoolean()
{
BooleanPropertyDefn prop = new BooleanPropertyDefn("prop");
BooleanPropertyDefn prop = new BooleanPropertyDefn("prop", null);
assertEquals("prop", prop.name());
assertEquals("Boolean", prop.typeName());
assertEquals(Boolean.class, PropertyAttributes.sqlParameterType(prop));
assertNull(prop.decode(null, mapper));
assertTrue(prop.decode("true", mapper));
@ -86,9 +98,10 @@ public class PropertyDefnTest
@Test
public void testInt()
{
IntPropertyDefn prop = new IntPropertyDefn("prop");
IntPropertyDefn prop = new IntPropertyDefn("prop", null);
assertEquals("prop", prop.name());
assertEquals("Integer", prop.typeName());
assertEquals(Integer.class, PropertyAttributes.sqlParameterType(prop));
assertNull(prop.decode(null, mapper));
assertEquals((Integer) 0, prop.decode(0, mapper));
@ -101,9 +114,10 @@ public class PropertyDefnTest
@Test
public void testStringList()
{
StringListPropertyDefn prop = new StringListPropertyDefn("prop");
StringListPropertyDefn prop = new StringListPropertyDefn("prop", null);
assertEquals("prop", prop.name());
assertEquals("string list", prop.typeName());
assertSame(String.class, PropertyAttributes.sqlParameterType(prop));
assertNull(prop.decode(null, mapper));
prop.validate(null, mapper);
@ -120,10 +134,12 @@ public class PropertyDefnTest
ListPropertyDefn<ClusterKeySpec> prop = new ListPropertyDefn<ClusterKeySpec>(
"prop",
"cluster key list",
new TypeReference<List<ClusterKeySpec>>() { }
new TypeReference<List<ClusterKeySpec>>() { },
null
);
assertEquals("prop", prop.name());
assertEquals("cluster key list", prop.typeName());
assertNull(PropertyAttributes.sqlParameterType(prop));
assertNull(prop.decode(null, mapper));
List<Map<String, Object>> value = Arrays.asList(

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import java.util.List;
public class BaseExternTableTest
{
protected final ObjectMapper mapper = new ObjectMapper();
protected PropertyDefn<?> findProperty(List<PropertyDefn<?>> props, String name)
{
for (PropertyDefn<?> prop : props) {
if (prop.name().equals(name)) {
return prop;
}
}
return null;
}
}

View File

@ -20,13 +20,15 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.CatalogTest;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.ParameterizedDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.ExternalTableDefn.FormattedExternalTableDefn;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
@ -40,16 +42,17 @@ import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@Category(CatalogTest.class)
public class HttpInputTableTest
public class HttpInputTableTest extends BaseExternTableTest
{
private final ObjectMapper mapper = new ObjectMapper();
private final HttpTableDefn tableDefn = new HttpTableDefn();
private final TableBuilder baseBuilder = TableBuilder.of(tableDefn)
.description("http input")
@ -84,16 +87,16 @@ public class HttpInputTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource();
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
assertEquals("http://foo.com/my.csv", sourceSpec.getUris().get(0).toString());
// Just a sanity check: details of CSV conversion are tested elsewhere.
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat();
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
RowSignature sig = externSpec.signature();
RowSignature sig = externSpec.signature;
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
@ -114,7 +117,7 @@ public class HttpInputTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource();
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
assertEquals("SECRET", ((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
}
@ -134,17 +137,17 @@ public class HttpInputTableTest
// Parameters
ParameterizedDefn parameterizedTable = tableDefn;
assertEquals(1, parameterizedTable.parameters().size());
assertNotNull(parameterizedTable.parameter(HttpTableDefn.URIS_PARAMETER));
assertNotNull(findProperty(parameterizedTable.parameters(), HttpTableDefn.URIS_PROPERTY));
// Apply parameters
Map<String, Object> params = ImmutableMap.of(
HttpTableDefn.URIS_PARAMETER, "foo.csv,bar.csv"
HttpTableDefn.URIS_PROPERTY, "foo.csv,bar.csv"
);
// Convert to an external spec
ExternalTableSpec externSpec = parameterizedTable.applyParameters(table, params);
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource();
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
assertEquals("SECRET", ((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
assertEquals(
@ -165,7 +168,7 @@ public class HttpInputTableTest
// Apply parameters
Map<String, Object> params = ImmutableMap.of(
HttpTableDefn.URIS_PARAMETER, "foo.csv,bar.csv"
HttpTableDefn.URIS_PROPERTY, "foo.csv,bar.csv"
);
// Convert to an external spec
@ -193,7 +196,7 @@ public class HttpInputTableTest
.buildResolved(mapper);
Map<String, Object> params = ImmutableMap.of(
HttpTableDefn.URIS_PARAMETER, "foo.csv"
HttpTableDefn.URIS_PROPERTY, "foo.csv"
);
assertThrows(IAE.class, () -> tableDefn.applyParameters(table, params));
}
@ -209,4 +212,48 @@ public class HttpInputTableTest
assertThrows(IAE.class, () -> table.validate());
}
@Test
public void testSqlFunction()
{
List<PropertyDefn<?>> params = tableDefn.tableFunctionParameters();
// Ensure the relevant properties are available as SQL function parameters
PropertyDefn<?> userProp = findProperty(params, HttpTableDefn.USER_PROPERTY);
assertNotNull(userProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(userProp));
PropertyDefn<?> pwdProp = findProperty(params, HttpTableDefn.PASSWORD_PROPERTY);
assertNotNull(pwdProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(pwdProp));
PropertyDefn<?> urisProp = findProperty(params, HttpTableDefn.URIS_PROPERTY);
assertNotNull(urisProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(urisProp));
assertNull(findProperty(params, HttpTableDefn.URI_TEMPLATE_PROPERTY));
PropertyDefn<?> formatProp = findProperty(params, FormattedExternalTableDefn.FORMAT_PROPERTY);
assertNotNull(formatProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(formatProp));
// Pretend to accept values for the SQL parameters.
final ResolvedTable table = TableBuilder.of(tableDefn)
.property(userProp.name(), userProp.decodeSqlValue("bob", mapper))
.property(pwdProp.name(), pwdProp.decodeSqlValue("secret", mapper))
.property(urisProp.name(), urisProp.decodeSqlValue("http://foo.com/foo.csv, http://foo.com/bar.csv", mapper))
.property(formatProp.name(), formatProp.decodeSqlValue(InputFormats.CSV_FORMAT_TYPE, mapper))
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.buildResolved(mapper);
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
assertEquals(
HttpTableDefn.convertUriList(Arrays.asList("http://foo.com/foo.csv", "http://foo.com/bar.csv")),
sourceSpec.getUris()
);
}
}

View File

@ -75,14 +75,14 @@ public class InlineTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
InlineInputSource inlineSpec = (InlineInputSource) externSpec.inputSource();
InlineInputSource inlineSpec = (InlineInputSource) externSpec.inputSource;
assertEquals("a,b\nc,d\n", inlineSpec.getData());
// Just a sanity check: details of CSV conversion are tested elsewhere.
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat();
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
RowSignature sig = externSpec.signature();
RowSignature sig = externSpec.signature;
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());

View File

@ -19,13 +19,15 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.CatalogTest;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.ParameterizedDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.ExternalTableDefn.FormattedExternalTableDefn;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.java.util.common.IAE;
@ -37,6 +39,7 @@ import org.junit.experimental.categories.Category;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -45,9 +48,8 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@Category(CatalogTest.class)
public class LocalTableTest
public class LocalTableTest extends BaseExternTableTest
{
private final ObjectMapper mapper = new ObjectMapper();
private final LocalTableDefn tableDefn = new LocalTableDefn();
private final TableBuilder baseBuilder = TableBuilder.of(tableDefn)
.description("local file input")
@ -74,16 +76,16 @@ public class LocalTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource();
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals("*.csv", sourceSpec.getFilter());
assertEquals("my.csv", sourceSpec.getFiles().get(0).toString());
// Just a sanity check: details of CSV conversion are tested elsewhere.
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat();
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
RowSignature sig = externSpec.signature();
RowSignature sig = externSpec.signature;
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
@ -103,7 +105,7 @@ public class LocalTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource();
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals("*", sourceSpec.getFilter());
assertEquals("my.csv", sourceSpec.getFiles().get(0).toString());
@ -123,7 +125,7 @@ public class LocalTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource();
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals("*.csv", sourceSpec.getFilter());
assertTrue(sourceSpec.getFiles().isEmpty());
@ -159,10 +161,7 @@ public class LocalTableTest
.buildResolved(mapper);
ParameterizedDefn parameterizedTable = tableDefn;
assertEquals(2, parameterizedTable.parameters().size());
assertNotNull(parameterizedTable.parameter(LocalTableDefn.FILE_FILTER_PROPERTY));
assertNotNull(parameterizedTable.parameter(LocalTableDefn.FILES_PROPERTY));
assertEquals(1, parameterizedTable.parameters().size());
// Apply files parameter
Map<String, Object> params = ImmutableMap.of(
@ -172,7 +171,7 @@ public class LocalTableTest
// Convert to an external spec
ExternalTableSpec externSpec = parameterizedTable.applyParameters(table, params);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource();
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals("*", sourceSpec.getFilter());
assertEquals(
@ -196,9 +195,45 @@ public class LocalTableTest
// Convert to an external spec
ExternalTableSpec externSpec = tableDefn.applyParameters(table, params);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource();
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals("Oct*.csv", sourceSpec.getFilter());
assertTrue(sourceSpec.getFiles().isEmpty());
}
@Test
public void testSqlFunction()
{
List<PropertyDefn<?>> params = tableDefn.tableFunctionParameters();
// Ensure the relevant properties are available as SQL function parameters
PropertyDefn<?> fileDirProp = findProperty(params, LocalTableDefn.BASE_DIR_PROPERTY);
assertNotNull(fileDirProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(fileDirProp));
PropertyDefn<?> filesProp = findProperty(params, LocalTableDefn.FILES_PROPERTY);
assertNotNull(filesProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(fileDirProp));
PropertyDefn<?> formatProp = findProperty(params, FormattedExternalTableDefn.FORMAT_PROPERTY);
assertNotNull(formatProp);
assertEquals(String.class, PropertyAttributes.sqlParameterType(formatProp));
// Pretend to accept values for the SQL parameters.
final ResolvedTable table = TableBuilder.of(tableDefn)
.property(fileDirProp.name(), fileDirProp.decodeSqlValue("/tmp", mapper))
.property(filesProp.name(), filesProp.decodeSqlValue("Oct.csv, Nov.csv", mapper))
.property(formatProp.name(), formatProp.decodeSqlValue(InputFormats.CSV_FORMAT_TYPE, mapper))
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.buildResolved(mapper);
ExternalTableSpec externSpec = tableDefn.convertToExtern(table);
LocalInputSource sourceSpec = (LocalInputSource) externSpec.inputSource;
assertEquals("/tmp", sourceSpec.getBaseDir().toString());
assertEquals(
Arrays.asList(new File("Oct.csv"), new File("Nov.csv")),
sourceSpec.getFiles()
);
}
}

134
sql/edit-parser.py Normal file
View File

@ -0,0 +1,134 @@
#! /bin/python3
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------------------
# Revise the parser to add two elements of Druid syntax to the FROM
# clause:
#
# id [ (<args>) ]
#
# And
#
# TABLE(<fn>(<args>)) (<schema>)
import os
import os.path
# Ensure this can run from the main pom, or within the
# module directory.
baseDir = ""
if os.path.isdir("sql"):
baseDir = "sql/"
source = baseDir + "target/codegen/templates/Parser.jj"
dest = baseDir + "target/codegen/templates/DruidParser.jj"
inFile = open(source)
outFile = open(dest, "w")
# Look for the rule to remove, copying lines as we go.
while True:
line = inFile.readline()
if not line:
break
outFile.write(line)
if line == "SqlNode TableRef2(boolean lateral) :\n":
break
# Find close of the rule, after the variable definitions
while True:
line = inFile.readline()
if not line:
break
if line == "}\n":
break
outFile.write(line)
outFile.write(
''' List<SqlNode> paramList;
}
''')
# Find the table identifier rule
while True:
line = inFile.readline()
if not line:
break
outFile.write(line)
if line == " tableRef = CompoundIdentifier()\n":
break
# Add the Druid parameterization
outFile.write(
''' [
paramList = FunctionParameterList(ExprContext.ACCEPT_NONCURSOR)
{
tableRef = ParameterizeOperator.PARAM.createCall(tableRef, paramList);
}
]
''')
# Skip over the unwanted EXTENDS clause
while True:
line = inFile.readline()
if not line:
break
if line == " over = TableOverOpt() {\n":
outFile.write(line)
break
# Find the table function rule
while True:
line = inFile.readline()
if not line:
break
outFile.write(line)
if line == " tableRef = TableFunctionCall(s.pos())\n":
break
# Find the closing paren
while True:
line = inFile.readline()
if not line:
break
outFile.write(line)
if line == " <RPAREN>\n":
break
# Add the additional clause
outFile.write(
''' [
[ <EXTEND> ]
extendList = ExtendList()
{
tableRef = ExtendOperator.EXTEND.createCall(
Span.of(tableRef, extendList).pos(), tableRef, extendList);
}
]
''')
# Copy everything else
while True:
line = inFile.readline()
if not line:
break
outFile.write(line)
inFile.close()
outFile.close()
# Switch the files.
os.remove(source)
os.rename(dest, source)

View File

@ -285,7 +285,7 @@
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<phase>generate-sources</phase>
<goals>
<goal>unpack</goal>
</goals>
@ -306,6 +306,30 @@
</executions>
</plugin>
<!-- Edit the parser. Add an additional clause to the table function
rule. This produces a new parser, DruidParser.jj.
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>edit-parser</id>
<phase>generate-sources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>python3</executable>
<workingDirectory>${project.basedir}</workingDirectory>
<arguments>
<argument>edit-parser.py</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
<!-- Copy the templates present in the codegen directory of druid-sql containing custom SQL rules to
${project.build.directory}/codegen -->
<plugin>
@ -313,7 +337,7 @@
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
@ -344,14 +368,14 @@
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>${project.build.directory}/generated-sources/annotations</outputDirectory>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Creates a Java class for the custom parser from the Parser.jj -->
<!-- Creates a Java class for the custom parser from Parser.jj -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
@ -363,13 +387,13 @@
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/annotations</sourceDirectory>
<sourceDirectory>${project.build.directory}/generated-sources/javacc</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
<include>Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/annotations</outputDirectory>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
</configuration>
</execution>
</executions>
@ -388,7 +412,7 @@
</goals>
<configuration>
<sources>
<source>src/generated-sources/annotations</source>
<source>src/generated-sources</source>
</sources>
</configuration>
</execution>

View File

@ -58,6 +58,8 @@ data: {
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
"org.apache.druid.sql.calcite.external.ExtendOperator"
"org.apache.druid.sql.calcite.external.ParameterizeOperator"
]
# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.ExternalTableDefn;
import org.apache.druid.catalog.model.table.ExternalTableSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.external.UserDefinedTableMacroFunction.ExtendedTableMacro;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* Base class for input-source-specfic table functions with arguments derived from
* a catalog external table definition. Such functions work in conjunction with the
* EXTERN key word to provide a schema. Example of the HTTP form:
* <code><pre>
* INSERT INTO myTable SELECT ...
* FROM TABLE(http(
* userName => 'bob',
* password => 'secret',
* uris => 'http:foo.com/bar.csv',
* format => 'csv'))
* EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
* PARTITIONED BY ...
* </pre></code>
*/
public abstract class CatalogExternalTableOperatorConversion implements SqlOperatorConversion
{
private final SqlUserDefinedTableMacro operator;
public CatalogExternalTableOperatorConversion(
final String name,
final TableDefnRegistry registry,
final String tableType,
final ObjectMapper jsonMapper
)
{
ExternalTableDefn tableDefn = (ExternalTableDefn) registry.defnFor(tableType);
this.operator = new CatalogExternalTableOperator(
new CatalogTableMacro(
name,
tableDefn,
jsonMapper
)
);
}
@Override
public SqlOperator calciteOperator()
{
return operator;
}
@Nullable
@Override
public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode)
{
return null;
}
public static class CatalogExternalTableOperator extends UserDefinedTableMacroFunction implements AuthorizableOperator
{
public CatalogExternalTableOperator(final CatalogTableMacro macro)
{
super(
new SqlIdentifier(macro.name, SqlParserPos.ZERO),
ReturnTypes.CURSOR,
null,
// Use our own definition of variadic since Calcite's doesn't allow
// optional parameters.
Externals.variadic(macro.parameters),
Externals.dataTypes(macro.parameters),
macro
);
}
@Override
public Set<ResourceAction> computeResources(final SqlCall call)
{
return Collections.singleton(ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION);
}
}
public static class CatalogTableMacro implements ExtendedTableMacro
{
private final String name;
private final List<FunctionParameter> parameters;
private final ExternalTableDefn tableDefn;
private final ObjectMapper jsonMapper;
public CatalogTableMacro(
final String name,
final ExternalTableDefn tableDefn,
final ObjectMapper jsonMapper
)
{
this.name = name;
this.tableDefn = tableDefn;
this.jsonMapper = jsonMapper;
this.parameters = Externals.convertParameters(tableDefn);
}
@Override
public TranslatableTable apply(final List<Object> arguments)
{
throw new IAE(
"The %s table function requires an EXTEND clause with a schema.",
name
);
}
@Override
public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
{
final ExternalTableSpec externSpec = Externals.convertArguments(
tableDefn,
parameters,
arguments,
schema,
jsonMapper
);
return Externals.buildExternalTable(externSpec, jsonMapper);
}
@Override
public List<FunctionParameter> getParameters()
{
return parameters;
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlInternalOperator;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.fun.SqlCollectionTableOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.druid.java.util.common.ISE;
/**
* Druid-specific implementation of the EXTEND operator in Calcite, which
* is said to have been added for Apache Phoenix, and which we repurpose to
* supply a schema for an ingest input table.
*
* @see {@link UserDefinedTableMacroFunction} for details
*/
public class ExtendOperator extends SqlInternalOperator
{
// private static final TableMacro macro = new ExtendsMacroWrapper();
public static final ExtendOperator EXTEND = new ExtendOperator();
ExtendOperator()
{
super("EXTEND", SqlKind.EXTEND, MDX_PRECEDENCE);
}
/**
* Rewrite the EXTEND node (which, in Druid, has a structure different
* than what Calcite expects), into a table macro, with the schema
* squirreled away in an ad-hoc instance of the macro. We must do it
* this way because we can't change Calcite to define a new node type
* that holds onto the schema.
*/
@Override
public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
{
SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0);
if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) {
throw new ISE("First argument to EXTEND must be a table function");
}
SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
if (!(tableFnCall.getOperator() instanceof UserDefinedTableMacroFunction)) {
// May be an unresolved function.
return call;
}
UserDefinedTableMacroFunction macro = (UserDefinedTableMacroFunction) tableFnCall.getOperator();
SqlNodeList schema = (SqlNodeList) call.operand(1);
SqlCall newCall = macro.rewriteCall(tableFnCall, schema);
return SqlStdOperatorTable.COLLECTION_TABLE.createCall(call.getParserPosition(), newCall);
}
}

View File

@ -19,8 +19,8 @@
package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
@ -28,9 +28,9 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
@ -48,7 +48,8 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* Registers the "EXTERN" operator, which is used in queries like "INSERT INTO dst SELECT * FROM TABLE(EXTERN(...))".
* Registers the "EXTERN" operator, which is used in queries like
* "INSERT INTO dst SELECT * FROM TABLE(EXTERN(...))".
*
* This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users.
*/
@ -60,14 +61,12 @@ public class ExternalOperatorConversion implements SqlOperatorConversion
public static final ResourceAction EXTERNAL_RESOURCE_ACTION =
new ResourceAction(new Resource("EXTERNAL", ResourceType.EXTERNAL), Action.READ);
private static final RelDataTypeFactory TYPE_FACTORY = new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
private final SqlUserDefinedTableMacro operator;
@Inject
public ExternalOperatorConversion(final ExternalTableMacro macro)
public ExternalOperatorConversion(@Json final ObjectMapper jsonMapper)
{
this.operator = new ExternalOperator(macro);
this.operator = new ExternalOperator(new ExternalTableMacro(jsonMapper));
}
@Override
@ -92,14 +91,14 @@ public class ExternalOperatorConversion implements SqlOperatorConversion
ReturnTypes.CURSOR,
null,
OperandTypes.sequence(
"(inputSource, inputFormat, signature)",
macro.signature(),
OperandTypes.family(SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING)
),
macro.getParameters()
.stream()
.map(parameter -> parameter.getType(TYPE_FACTORY))
.map(parameter -> parameter.getType(DruidTypeSystem.TYPE_FACTORY))
.collect(Collectors.toList()),
macro
);

View File

@ -22,63 +22,56 @@ package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.druid.catalog.model.table.ExternalTableSpec;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.ExternalTable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Used by {@link ExternalOperatorConversion} to generate {@link DruidTable} that reference {@link ExternalDataSource}.
* Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
* that references an {@link ExternalDataSource}.
*
* This class is exercised in CalciteInsertDmlTest but is not currently exposed to end users.
*/
public class ExternalTableMacro implements TableMacro
{
private final List<FunctionParameter> parameters = ImmutableList.of(
new FunctionParameterImpl(0, "inputSource", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class)),
new FunctionParameterImpl(1, "inputFormat", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class)),
new FunctionParameterImpl(2, "signature", DruidTypeSystem.TYPE_FACTORY.createJavaType(String.class))
);
private final ObjectMapper jsonMapper;
@Inject
public ExternalTableMacro(@Json final ObjectMapper jsonMapper)
public ExternalTableMacro(final ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
public String signature()
{
final List<String> names = parameters.stream().map(p -> p.getName()).collect(Collectors.toList());
return "(" + String.join(", ", names) + ")";
}
@Override
public TranslatableTable apply(final List<Object> arguments)
{
try {
final InputSource inputSource = jsonMapper.readValue((String) arguments.get(0), InputSource.class);
final InputFormat inputFormat = jsonMapper.readValue((String) arguments.get(1), InputFormat.class);
final RowSignature signature = jsonMapper.readValue((String) arguments.get(2), RowSignature.class);
// Prevent a RowSignature that has a ColumnSignature with name "__time" and type that is not LONG because it
// will be automatically casted to LONG while processing in RowBasedColumnSelectorFactory.
// This can cause an issue when the incorrectly typecasted data is ingested or processed upon. One such example
// of inconsistency is that functions such as TIME_PARSE evaluate incorrectly
Optional<ColumnType> timestampColumnTypeOptional = signature.getColumnType(ColumnHolder.TIME_COLUMN_NAME);
if (timestampColumnTypeOptional.isPresent() && !timestampColumnTypeOptional.get().equals(ColumnType.LONG)) {
throw new ISE("EXTERN function with __time column can be used when __time column is of type long. "
+ "Please change the column name to something other than __time");
}
return new ExternalTable(
new ExternalDataSource(inputSource, inputFormat, signature),
signature,
jsonMapper
ExternalTableSpec spec = new ExternalTableSpec(
jsonMapper.readValue((String) arguments.get(0), InputSource.class),
jsonMapper.readValue((String) arguments.get(1), InputFormat.class),
jsonMapper.readValue((String) arguments.get(2), RowSignature.class)
);
return Externals.buildExternalTable(spec, jsonMapper);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
@ -88,85 +81,6 @@ public class ExternalTableMacro implements TableMacro
@Override
public List<FunctionParameter> getParameters()
{
return ImmutableList.of(
new FunctionParameter()
{
@Override
public int getOrdinal()
{
return 0;
}
@Override
public String getName()
{
return "inputSource";
}
@Override
public RelDataType getType(RelDataTypeFactory typeFactory)
{
return typeFactory.createJavaType(String.class);
}
@Override
public boolean isOptional()
{
return false;
}
},
new FunctionParameter()
{
@Override
public int getOrdinal()
{
return 1;
}
@Override
public String getName()
{
return "inputFormat";
}
@Override
public RelDataType getType(RelDataTypeFactory typeFactory)
{
return typeFactory.createJavaType(String.class);
}
@Override
public boolean isOptional()
{
return false;
}
},
new FunctionParameter()
{
@Override
public int getOrdinal()
{
return 2;
}
@Override
public String getName()
{
return "signature";
}
@Override
public RelDataType getType(RelDataTypeFactory typeFactory)
{
return typeFactory.createJavaType(String.class);
}
@Override
public boolean isOptional()
{
return false;
}
}
);
return parameters;
}
}

View File

@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlTypeNameSpec;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.catalog.model.ModelProperties;
import org.apache.druid.catalog.model.ModelProperties.PropertyDefn;
import org.apache.druid.catalog.model.PropertyAttributes;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ExternalTableDefn;
import org.apache.druid.catalog.model.table.ExternalTableSpec;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.table.ExternalTable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Conversion functions to-from the SQL, catalog and MSQ representations
* of external tables.
*/
public class Externals
{
/**
* Convert parameters from Catalog external table definition form to the SQL form
* used for a table macro and its function.
*/
public static List<FunctionParameter> convertParameters(final ExternalTableDefn tableDefn)
{
List<ModelProperties.PropertyDefn<?>> props = tableDefn.tableFunctionParameters();
ImmutableList.Builder<FunctionParameter> params = ImmutableList.builder();
for (int i = 0; i < props.size(); i++) {
ModelProperties.PropertyDefn<?> prop = props.get(i);
params.add(new FunctionParameterImpl(
i,
prop.name(),
DruidTypeSystem.TYPE_FACTORY.createJavaType(PropertyAttributes.sqlParameterType(prop)),
PropertyAttributes.isOptional(prop)
));
}
return params.build();
}
/**
* Extract the data types (only) from a list of SQL parameters.
*/
public static List<RelDataType> dataTypes(List<FunctionParameter> parameters)
{
return parameters
.stream()
.map(parameter -> parameter.getType(DruidTypeSystem.TYPE_FACTORY))
.collect(Collectors.toList());
}
/**
* Define a variadic (variable arity) type checker that allows an argument
* count that ranges from the number of required parameters to the number of
* available parameters. We have to define this because the Calcite form does
* not allow optional parameters, but we allow any parameter to be optional.
* We are also not fussy about the type: we catch any type errors from the
* declared types. We catch missing required parameters at conversion time,
* where we also catch invalid values, incompatible values, and so on.
*/
public static SqlOperandTypeChecker variadic(List<FunctionParameter> params)
{
int min = 0;
for (FunctionParameter param : params) {
if (!param.isOptional()) {
min++;
}
}
SqlOperandCountRange range = SqlOperandCountRanges.between(min, params.size());
return new SqlOperandTypeChecker()
{
@Override
public boolean checkOperandTypes(
SqlCallBinding callBinding,
boolean throwOnFailure)
{
return range.isValidCount(callBinding.getOperandCount());
}
@Override
public SqlOperandCountRange getOperandCountRange()
{
return range;
}
@Override
public String getAllowedSignatures(SqlOperator op, String opName)
{
return opName + "(...)";
}
@Override
public boolean isOptional(int i)
{
return true;
}
@Override
public Consistency getConsistency()
{
return Consistency.NONE;
}
};
}
/**
* Convert the actual arguments to SQL external table function into a catalog
* resolved table, then convert that to an external table spec usable by MSQ.
*
* @param tableDefn catalog definition of the kind of external table
* @param parameters the parameters to the SQL table macro
* @param arguments the arguments that match the parameters. Optional arguments
* may be null
* @param schema the external table schema provided by the EXTEND clause
* @param jsonMapper the JSON mapper to use for value conversions
* @return a spec with the three values that MSQ needs to create an external table
*/
public static ExternalTableSpec convertArguments(
final ExternalTableDefn tableDefn,
final List<FunctionParameter> parameters,
final List<Object> arguments,
final SqlNodeList schema,
final ObjectMapper jsonMapper
)
{
final TableBuilder builder = TableBuilder.of(tableDefn);
for (int i = 0; i < parameters.size(); i++) {
String name = parameters.get(i).getName();
Object value = arguments.get(i);
if (value == null) {
continue;
}
PropertyDefn<?> prop = tableDefn.property(name);
builder.property(name, prop.decodeSqlValue(value, jsonMapper));
}
// Converts from a list of (identifier, type, ...) pairs to
// a Druid row signature. The schema itself comes from the
// Druid-specific EXTEND syntax added to the parser.
for (int i = 0; i < schema.size(); i += 2) {
final String name = convertName((SqlIdentifier) schema.get(i));
String sqlType = convertType(name, (SqlDataTypeSpec) schema.get(i + 1));
builder.column(name, sqlType);
}
ResolvedTable table = builder.buildResolved(jsonMapper);
return tableDefn.convertToExtern(table);
}
/**
* Define the Druid input schema from a name provided in the EXTEND
* clause. Calcite allows any form of name: a.b.c, say. But, Druid
* requires only simple names: "a", or "x".
*/
private static String convertName(SqlIdentifier ident)
{
if (!ident.isSimple()) {
throw new IAE(StringUtils.format(
"Column [%s] must have a simple name",
ident));
}
return ident.getSimple();
}
/**
* Define the SQL input column type from a type provided in the
* EXTEND clause. Calcite allows any form of type. But, Druid
* requires only the Druid supported types (and their aliases.)
* <p>
* Druid has its own rules for nullability. We ignore any nullability
* clause in the EXTEND list.
*/
private static String convertType(String name, SqlDataTypeSpec dataType)
{
SqlTypeNameSpec spec = dataType.getTypeNameSpec();
if (spec == null) {
throw unsupportedType(name, dataType);
}
SqlIdentifier typeName = spec.getTypeName();
if (typeName == null || !typeName.isSimple()) {
throw unsupportedType(name, dataType);
}
SqlTypeName type = SqlTypeName.get(typeName.getSimple());
if (type == null) {
throw unsupportedType(name, dataType);
}
if (SqlTypeName.CHAR_TYPES.contains(type)) {
return SqlTypeName.VARCHAR.name();
}
if (SqlTypeName.INT_TYPES.contains(type)) {
return SqlTypeName.BIGINT.name();
}
switch (type) {
case DOUBLE:
return SqlType.DOUBLE.name();
case FLOAT:
case REAL:
return SqlType.FLOAT.name();
default:
throw unsupportedType(name, dataType);
}
}
private static RuntimeException unsupportedType(String name, SqlDataTypeSpec dataType)
{
return new IAE(StringUtils.format(
"Column [%s] has an unsupported type: [%s]",
name,
dataType));
}
/**
* Create an MSQ ExternalTable given an external table spec. Enforces type restructions
* (which should be revisited.)
*/
public static ExternalTable buildExternalTable(ExternalTableSpec spec, ObjectMapper jsonMapper)
{
// Prevent a RowSignature that has a ColumnSignature with name "__time" and type that is not LONG because it
// will be automatically cast to LONG while processing in RowBasedColumnSelectorFactory.
// This can cause an issue when the incorrectly type-casted data is ingested or processed upon. One such example
// of inconsistency is that functions such as TIME_PARSE evaluate incorrectly
//
// TODO: Fix the underlying problem: we should not make assumptions about the input
// data, nor restrict the form of that data.
Optional<ColumnType> timestampColumnTypeOptional = spec.signature.getColumnType(ColumnHolder.TIME_COLUMN_NAME);
if (timestampColumnTypeOptional.isPresent() && !timestampColumnTypeOptional.get().equals(ColumnType.LONG)) {
throw new ISE("EXTERN function with __time column can be used when __time column is of type long. "
+ "Please change the column name to something other than __time");
}
return new ExternalTable(
new ExternalDataSource(spec.inputSource, spec.inputFormat, spec.signature),
spec.signature,
jsonMapper
);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.FunctionParameter;
public class FunctionParameterImpl implements FunctionParameter
{
private final int ordinal;
private final String name;
private final RelDataType type;
private final boolean isOptional;
public FunctionParameterImpl(int ordinal, String name, RelDataType type, boolean isOptional)
{
this.ordinal = ordinal;
this.name = name;
this.type = type;
this.isOptional = isOptional;
}
public FunctionParameterImpl(int ordinal, String name, RelDataType type)
{
this(ordinal, name, type, false);
}
@Override
public int getOrdinal()
{
return ordinal;
}
@Override
public String getName()
{
return name;
}
@Override
public RelDataType getType(RelDataTypeFactory typeFactory)
{
return type;
}
@Override
public boolean isOptional()
{
return isOptional;
}
@Override
public String toString()
{
return "FunctionParameter{" +
"name=\"" + name + "\"" +
", ordinal=" + ordinal +
", type=" + type +
", optional=" + isOptional +
"}";
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.HttpTableDefn;
public class HttpOperatorConversion extends CatalogExternalTableOperatorConversion
{
public static final String FUNCTION_NAME = "http";
@Inject
public HttpOperatorConversion(
final TableDefnRegistry registry
)
{
super(FUNCTION_NAME, registry, HttpTableDefn.TABLE_TYPE, registry.jsonMapper());
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.InlineTableDefn;
public class InlineOperatorConversion extends CatalogExternalTableOperatorConversion
{
public static final String FUNCTION_NAME = "inline";
@Inject
public InlineOperatorConversion(
final TableDefnRegistry registry
)
{
super(FUNCTION_NAME, registry, InlineTableDefn.TABLE_TYPE, registry.jsonMapper());
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.table.LocalTableDefn;
public class LocalOperatorConversion extends CatalogExternalTableOperatorConversion
{
// Cannot use "local" because it is a SQL keyword and the user would
// be required to quote the name.
public static final String FUNCTION_NAME = "localfiles";
@Inject
public LocalOperatorConversion(
final TableDefnRegistry registry
)
{
super(FUNCTION_NAME, registry, LocalTableDefn.TABLE_TYPE, registry.jsonMapper());
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlInternalOperator;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.Span;
import org.apache.calcite.sql.validate.SqlValidator;
import java.util.List;
/**
* Internal operator used to create an on-the-fly table function from a
* partial table definition within the catalog. Represents a table reference
* of the form: <i>table_name</i>( <i>arguments</i> ). That, is we treat the
* table name as a function name, then pass arguments that represent the additional
* information needed to convert a partial table into a completed table.
* For example, for a local input source, we might pass the list of files to
* read.
* <p>
* Calcite doesn't understand this form. So, early in the process, we rewrite
* nodes of this type into a table macro node which Calcite does understand.
*/
public class ParameterizeOperator extends SqlInternalOperator
{
public static final ParameterizeOperator PARAM = new ParameterizeOperator();
ParameterizeOperator()
{
super("PARAMETERS", SqlKind.OTHER, MDX_PRECEDENCE);
}
public SqlNode createCall(SqlNode tableRef, List<SqlNode> paramList)
{
SqlNode[] argArray = new SqlNode[paramList.size()];
// Not entirely valid to use this operator for two purposes. But, since
// we're going to rewrite the clause, should be OK.
SqlBasicCall args = new SqlBasicCall(this, paramList.toArray(argArray), Span.of(paramList).pos());
// TODO Auto-generated method stub
return createCall(Span.of(tableRef, args).pos(), ImmutableList.of(tableRef, args));
}
@Override
public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
{
return call;
}
}

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.external;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.SqlWriter.Frame;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlOperandTypeInference;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import java.util.List;
import java.util.Set;
/**
* Table macro designed for use with the Druid EXTEND operator. Example:
* <code><pre>
* INSERT INTO dst
* SELECT *
* FROM TABLE(staged(
* source => 'inline',
* format => 'csv',
* data => 'a,b,1
* c,d,2
* '
* ))
* EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
* PARTITIONED BY ALL TIME
* </pre></code>
* <p>
* Calcite supports the Apache Phoenix EXTEND operator of the form:
* <code><pre>
* SELECT ..
* FROM myTable EXTEND (x VARCHAR, ...)
* </pre></code>
* Though, oddly, a search of Apache Phoenix itself does not find a hit for
* EXTEND, so perhaps the feature was never completed?
* <p>
* For Druid, we want the above form: extend a table function, not a
* literal table. Since we can't change the Calcite parser, we instead use
* tricks within the constraints of the parser.
* <ul>
* <li>First, use use a Python script to modify the parser to add the
* EXTEND rule for a table function.</li>
* <li>Calcite expects the EXTEND operator to have two arguments: an identifier
* and the column list. Since our case has a function call as the first argument,
* we can't let Calcite see our AST. So, we use a rewrite trick to convert the
* EXTEND node into the usual TABLE(.) node, and we modify the associated macro
* to hold onto the schema, which is now out of sight of Calcite, and so will not
* cause problems with rules that don't understand our usage.</li>
* <li>Calcite will helpfully rewrite calls, replacing our modified operator with
* the original. So, we override those to keep our modified operator.</li>
* <li>When asked to produce a table ({@code apply(.)}), we call a Druid-specific
* version that passes along the schema saved previously.</li>
* <li>The extended {@code DruidTableMacro} uses the schema to define the
* input source.</li>
* <li>Care is taken that the same {@code DruidTableMacro} can be used without
* EXTEND. In this case, the schema will be empty and the input source must have
* a way of providing the schema. The batch ingest feature does not yet support
* this use case, but it seems a reasonable extension. Example: CSV that has a
* header row, or a "classic" lookup table that, by definition, has only two
* columns.</li>
* </ul>
* <p>
* Note that unparsing is a bit of a nuisance. Our trick places the EXTEND
* list in the wrong place, and we'll unparse SQL as:
* <code><pre>
* FROM TABLE(fn(arg1, arg2) EXTEND (x VARCHAR, ...))
* </pre></code>
* Since we seldom use unparse, we can perhaps live with this limitation for now.
*/
public abstract class UserDefinedTableMacroFunction extends SqlUserDefinedTableMacro implements AuthorizableOperator
{
protected final ExtendedTableMacro macro;
public UserDefinedTableMacroFunction(
SqlIdentifier opName,
SqlReturnTypeInference returnTypeInference,
SqlOperandTypeInference operandTypeInference,
SqlOperandTypeChecker operandTypeChecker,
List<RelDataType> paramTypes,
ExtendedTableMacro tableMacro
)
{
super(opName, returnTypeInference, operandTypeInference, operandTypeChecker, paramTypes, tableMacro);
// Because Calcite's copy of the macro is private
this.macro = tableMacro;
}
/**
* Rewrite the call to the original table macro function to a new "shim" version that
* holds both the original one and the schema from EXTEND.
*/
public SqlBasicCall rewriteCall(SqlBasicCall oldCall, SqlNodeList schema)
{
return new ExtendedCall(oldCall, new ShimTableMacroFunction(this, schema));
}
private static class ShimTableMacroFunction extends SqlUserDefinedTableMacro implements AuthorizableOperator
{
protected final UserDefinedTableMacroFunction base;
protected final SqlNodeList schema;
public ShimTableMacroFunction(final UserDefinedTableMacroFunction base, final SqlNodeList schema)
{
super(
base.getNameAsId(),
ReturnTypes.CURSOR,
null,
base.getOperandTypeChecker(),
base.getParamTypes(),
new ShimTableMacro(base.macro, schema)
);
this.base = base;
this.schema = schema;
}
@Override
public Set<ResourceAction> computeResources(final SqlCall call)
{
return base.computeResources(call);
}
}
/**
* Call primarily to (nearly) recreate the EXTEND clause during unparse.
*/
private static class ExtendedCall extends SqlBasicCall
{
private final SqlNodeList schema;
public ExtendedCall(SqlBasicCall oldCall, ShimTableMacroFunction macro)
{
super(
macro,
oldCall.getOperands(),
oldCall.getParserPosition(),
false,
oldCall.getFunctionQuantifier()
);
this.schema = macro.schema;
}
public ExtendedCall(ExtendedCall from, SqlParserPos pos)
{
super(
from.getOperator(),
from.getOperands(),
pos,
false,
from.getFunctionQuantifier()
);
this.schema = from.schema;
}
/**
* Politely decline to revise the operator: we want the one we
* constructed to hold the schema, not the one re-resolved during
* validation.
*/
@Override
public void setOperator(SqlOperator operator)
{
// Do nothing: do not call super.setOperator().
}
@Override
public SqlNode clone(SqlParserPos pos)
{
return new ExtendedCall(this, pos);
}
@Override
public void unparse(
SqlWriter writer,
int leftPrec,
int rightPrec)
{
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("EXTEND");
Frame frame = writer.startList("(", ")");
schema.unparse(writer, leftPrec, rightPrec);
writer.endList(frame);
}
}
public interface ExtendedTableMacro extends TableMacro
{
TranslatableTable apply(List<Object> arguments, SqlNodeList schema);
}
/**
* Calcite table macro created dynamically to squirrel away the
* schema provided by the EXTEND clause to allow <pre><code>
* SELECT ... FROM TABLE(fn(arg => value, ...)) (col1 <type1>, ...)
* </code></pre>
* This macro wraps the actual input table macro, which does the
* actual work to build the Druid table. This macro also caches the
* translated table to avoid the need to recompute the table multiple
* times.
*/
protected static class ShimTableMacro implements TableMacro
{
private final ExtendedTableMacro delegate;
private final SqlNodeList schema;
private TranslatableTable table;
public ShimTableMacro(ExtendedTableMacro delegate, SqlNodeList schema)
{
this.delegate = delegate;
this.schema = schema;
}
@Override
public TranslatableTable apply(List<Object> arguments)
{
if (table == null) {
table = delegate.apply(arguments, schema);
}
return table;
}
@Override
public List<FunctionParameter> getParameters()
{
return delegate.getParameters();
}
}
}

View File

@ -424,7 +424,7 @@ public class DruidOperatorTable implements SqlOperatorTable
for (SqlAggregator aggregator : aggregators) {
final OperatorKey operatorKey = OperatorKey.of(aggregator.calciteFunction());
if (this.aggregators.put(operatorKey, aggregator) != null) {
throw new ISE("Cannot have two operators with key[%s]", operatorKey);
throw new ISE("Cannot have two operators with key [%s]", operatorKey);
}
}
@ -439,7 +439,7 @@ public class DruidOperatorTable implements SqlOperatorTable
final OperatorKey operatorKey = OperatorKey.of(operatorConversion.calciteOperator());
if (this.aggregators.containsKey(operatorKey)
|| this.operatorConversions.put(operatorKey, operatorConversion) != null) {
throw new ISE("Cannot have two operators with key[%s]", operatorKey);
throw new ISE("Cannot have two operators with key [%s]", operatorKey);
}
}
@ -557,16 +557,6 @@ public class DruidOperatorTable implements SqlOperatorTable
return new OperatorKey(operator.getName(), operator.getSyntax());
}
public String getName()
{
return name;
}
public SqlSyntax getSyntax()
{
return syntax;
}
@Override
public boolean equals(final Object o)
{

View File

@ -22,11 +22,13 @@ package org.apache.druid.sql.calcite.planner;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
public class DruidTypeSystem implements RelDataTypeSystem
{
public static final DruidTypeSystem INSTANCE = new DruidTypeSystem();
public static final RelDataTypeFactory TYPE_FACTORY = new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
/**
* Druid uses millisecond precision for timestamps internally. This is also the default at the SQL layer.

View File

@ -35,13 +35,18 @@ import org.apache.druid.sql.calcite.external.ExternalTableScan;
* Each such table represents one of Druid's {@link DataSource} types. Since SQL
* requires knowledge of the schema of that input source, the user must provide
* that information in SQL (via the `EXTERN` or up-coming `STAGED` function) or
* from the upcoming Druid Catalog.
* from the Druid Catalog.
*/
public class ExternalTable extends DruidTable
{
private final DataSource dataSource;
private final ObjectMapper objectMapper;
/**
* Cached row type, to avoid recreating types multiple times.
*/
private RelDataType rowType;
public ExternalTable(
final DataSource dataSource,
final RowSignature rowSignature,
@ -74,11 +79,14 @@ public class ExternalTable extends DruidTable
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory)
{
// For external datasources, the row type should be determined by whatever the row signature has been explicitly
// passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions
// For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource
// would lead to an exception because __time would be interpreted as timestamp if we typecast it.
return RowSignatures.toRelDataType(getRowSignature(), typeFactory, true);
if (rowType == null) {
// For external datasources, the row type should be determined by whatever the row signature has been explicitly
// passed in. Typecasting directly to SqlTypeName.TIMESTAMP will lead to inconsistencies with the Calcite functions
// For example, TIME_PARSE(__time) where __time is specified to be a string field in the external datasource
// would lead to an exception because __time would be interpreted as timestamp if we typecast it.
rowType = RowSignatures.toRelDataType(getRowSignature(), typeFactory, true);
}
return rowType;
}
@Override

View File

@ -26,6 +26,7 @@ import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.NativeQuery;
@ -101,6 +102,8 @@ public class SqlModule implements Module
NoopDruidSchemaManager.TYPE
);
binder.bind(TableDefnRegistry.class).in(LazySingleton.class);
binder.install(new DruidCalciteSchemaModule());
binder.install(new CalcitePlannerModule());
binder.install(new SqlAggregationModule());

View File

@ -48,6 +48,9 @@ import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -146,6 +149,9 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
// Set up the EXTERN macro.
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class);
}
});
}

View File

@ -60,7 +60,7 @@ import java.util.Map;
public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
{
public static final Map<String, Object> PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
protected static final Map<String, Object> PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}"
);

View File

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
/**
* Tests the input-source-specific table functions: http, inline and localfiles.
* Each of these use meta-metadata defined by the catalog to identify the allowed
* function arguments. The table functions work best with by-name argument syntax.
* <p>
* The tests first verify the baseline EXTERN form, then do the same ingest using
* the simpler functions. Verification against both the logical plan and native
* query ensure that the resulting MSQ task is identical regardless of the path
* taken.
*/
public class CatalogIngestionTest extends CalciteIngestionDmlTest
{
protected static URI toURI(String uri)
{
try {
return new URI(uri);
}
catch (URISyntaxException e) {
throw new ISE("Bad URI: %s", uri);
}
}
protected final ExternalDataSource httpDataSource = new ExternalDataSource(
new HttpInputSource(
Collections.singletonList(toURI("http:foo.com/bar.csv")),
"bob",
new DefaultPasswordProvider("secret"),
new HttpInputSourceConfig(null)
),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", ColumnType.LONG)
.build()
);
/**
* Basic use of EXTERN
*/
@Test
public void testHttpExtern()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(httpDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", httpDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
.verify();
}
protected String externSqlByName(final ExternalDataSource externalDataSource)
{
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
try {
return StringUtils.format(
"TABLE(extern(inputSource => %s,\n" +
" inputFormat => %s,\n" +
" signature => %s))",
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* EXTERN with parameters by name. Logical plan and native query are identical
* to the basic EXTERN.
*/
@Test
public void testHttpExternByName()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\nFROM %s\nPARTITIONED BY ALL TIME", externSqlByName(httpDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", httpDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
.verify();
}
/**
* HTTP with parameters by name. Logical plan and native query are identical
* to the basic EXTERN.
*/
@Test
public void testHttpFn()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(http(userName => 'bob', password => 'secret',\n" +
" uris => 'http:foo.com/bar.csv', format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", httpDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("httpExtern")
.verify();
}
/**
* Basic use of INLINE
*/
@Test
public void testInlineExtern()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("insertFromExternal")
.verify();
}
/**
* Inline with parameters by name. Logical plan and native query are identical
* to the basic EXTERN.
*/
@Test
public void testInlineFn()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(inline(data => 'a,b,1\nc,d,2\n',\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("insertFromExternal")
.verify();
}
protected final ExternalDataSource localDataSource = new ExternalDataSource(
// The preferred form for this test. But, does not work.
// See Apache Druid issue #13359.
//new LocalInputSource(
// new File("/tmp"),
// "*.csv",
// Arrays.asList(new File("foo.csv"), new File("bar.csv"))
//),
new LocalInputSource(
null,
null,
Arrays.asList(new File("/tmp/foo.csv"), new File("/tmp/bar.csv"))
),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", ColumnType.LONG)
.build()
);
/**
* Basic use of LOCAL
*/
@Test
public void testLocalExtern()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(localDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", localDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
.verify();
}
/**
* Local with parameters by name. Logical plan and native query are identical
* to the basic EXTERN.
*/
@Test
public void testLocalFn()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(localfiles(files => '/tmp/foo.csv, /tmp/bar.csv',\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", localDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
.verify();
}
/**
* Local with parameters by name. Shows that the EXTERN keyword is optional.
* Logical plan and native query are identical to the basic EXTERN.
*/
@Test
public void testLocalFnOmitExtend()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(localfiles(files => '/tmp/foo.csv, /tmp/bar.csv',\n" +
" format => 'csv'))\n" +
" (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", localDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(localDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.expectLogicalPlanFrom("localExtern")
.verify();
}
}