mirror of https://github.com/apache/druid.git
Refinements to input-source specific table functions (#13780)
Refinements to table functions Fixes various bugs Improves the structure of the table function classes Adds unit and integration tests
This commit is contained in:
parent
306997be87
commit
842ee554de
|
@ -36,8 +36,8 @@ import org.apache.druid.guice.LifecycleModule;
|
|||
import org.apache.druid.guice.ManageLifecycle;
|
||||
import org.apache.druid.guice.annotations.LoadScope;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.metadata.input.InputSourceModule;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -82,12 +82,17 @@ public class CatalogCoordinatorModule implements DruidModule
|
|||
|
||||
// Public REST API and private cache sync API.
|
||||
Jerseys.addResource(binder, CatalogResource.class);
|
||||
|
||||
// The HTTP input source requires a HttpInputSourceConfig instance
|
||||
// which is defined by the InputSourceModule. Note that MSQ also includes
|
||||
// this module, but MSQ is not included in the Coordinator.
|
||||
binder.install(new InputSourceModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
// We want this module to bring input sources along for the ride.
|
||||
return new InputSourceModule().getJacksonModules();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,13 +19,10 @@
|
|||
|
||||
package org.apache.druid.testsEx.msq;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.msq.sql.SqlTaskStatus;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||
import org.apache.druid.testing.clients.SqlResourceTestClient;
|
||||
import org.apache.druid.testing.utils.DataLoaderHelper;
|
||||
import org.apache.druid.testing.utils.MsqTestQueryHelper;
|
||||
import org.apache.druid.testsEx.categories.MultiStageQuery;
|
||||
|
@ -42,15 +39,6 @@ public class ITMultiStageQuery
|
|||
@Inject
|
||||
private MsqTestQueryHelper msqHelper;
|
||||
|
||||
@Inject
|
||||
private SqlResourceTestClient msqClient;
|
||||
|
||||
@Inject
|
||||
private IntegrationTestingConfig config;
|
||||
|
||||
@Inject
|
||||
private ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
private DataLoaderHelper dataLoaderHelper;
|
||||
|
||||
|
@ -102,8 +90,72 @@ public class ITMultiStageQuery
|
|||
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
|
||||
+ " )\n"
|
||||
+ ")\n"
|
||||
+ "PARTITIONED BY DAY\n"
|
||||
+ "CLUSTERED BY \"__time\"",
|
||||
+ "PARTITIONED BY DAY\n",
|
||||
datasource
|
||||
);
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(queryLocal);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
|
||||
|
||||
msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
|
||||
{
|
||||
String datasource = "dst";
|
||||
|
||||
// Clear up the datasource from the previous runs
|
||||
coordinatorClient.unloadSegmentsForDataSource(datasource);
|
||||
|
||||
String queryLocal =
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
|
||||
+ " isRobot,\n"
|
||||
+ " diffUrl,\n"
|
||||
+ " added,\n"
|
||||
+ " countryIsoCode,\n"
|
||||
+ " regionName,\n"
|
||||
+ " channel,\n"
|
||||
+ " flags,\n"
|
||||
+ " delta,\n"
|
||||
+ " isUnpatrolled,\n"
|
||||
+ " isNew,\n"
|
||||
+ " deltaBucket,\n"
|
||||
+ " isMinor,\n"
|
||||
+ " isAnonymous,\n"
|
||||
+ " deleted,\n"
|
||||
+ " cityName,\n"
|
||||
+ " metroCode,\n"
|
||||
+ " namespace,\n"
|
||||
+ " comment,\n"
|
||||
+ " page,\n"
|
||||
+ " commentLength,\n"
|
||||
+ " countryName,\n"
|
||||
+ " user,\n"
|
||||
+ " regionIsoCode\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " LOCALFILES(\n"
|
||||
+ " files => ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
|
||||
+ " format => 'json'\n"
|
||||
+ " ))\n"
|
||||
+ " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
|
||||
+ " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
|
||||
+ " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
|
||||
+ " comment VARCHAR, page VARCHAR, commentLength BIGINT, countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
|
||||
+ "PARTITIONED BY DAY\n",
|
||||
datasource
|
||||
);
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableDefnRegistry;
|
|||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.InputSource;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -40,6 +41,8 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class BaseInputSourceDefn implements InputSourceDefn
|
||||
{
|
||||
private static final Logger LOG = new Logger(BaseInputSourceDefn.class);
|
||||
|
||||
/**
|
||||
* The "from-scratch" table function for this input source. The parameters
|
||||
* are those defined by the subclass, and the apply simply turns around and
|
||||
|
@ -238,6 +241,7 @@ public abstract class BaseInputSourceDefn implements InputSourceDefn
|
|||
return jsonMapper.convertValue(jsonMap, inputSourceClass());
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug(e, "Invalid input source specification");
|
||||
throw new IAE(e, "Invalid input source specification");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,15 @@ public abstract class BaseTableFunction implements TableFunction
|
|||
{
|
||||
return optional;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "Parameter{name=" + name
|
||||
+ ", type=" + type
|
||||
+ ", optional=" + optional
|
||||
+ "}";
|
||||
}
|
||||
}
|
||||
|
||||
private final List<ParameterDefn> parameters;
|
||||
|
@ -76,11 +85,11 @@ public abstract class BaseTableFunction implements TableFunction
|
|||
return parameters;
|
||||
}
|
||||
|
||||
protected void requireSchema(String fnName, List<ColumnSpec> columns)
|
||||
protected static void requireSchema(String fnName, List<ColumnSpec> columns)
|
||||
{
|
||||
if (columns == null) {
|
||||
throw new IAE(
|
||||
"The %s table function requires an EXTEND clause with a schema",
|
||||
"Function requires a schema: TABLE(%s(...)) (<col> <type>...)",
|
||||
fnName
|
||||
);
|
||||
}
|
||||
|
|
|
@ -95,7 +95,10 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
|
|||
)
|
||||
{
|
||||
final List<ParameterDefn> toAdd = new ArrayList<>();
|
||||
final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER, ParameterType.VARCHAR, false);
|
||||
// While the format parameter is required, we mark it as optional. Else
|
||||
// if the source defines optional parameters, they will still be ignored
|
||||
// as Calcite treats (optional, optional, required) as (required, required, required)
|
||||
final ParameterDefn formatProp = new Parameter(FORMAT_PARAMETER, ParameterType.VARCHAR, true);
|
||||
toAdd.add(formatProp);
|
||||
final Map<String, ParameterDefn> formatProps = new HashMap<>();
|
||||
for (InputFormatDefn format : formats.values()) {
|
||||
|
@ -122,7 +125,7 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
|
|||
{
|
||||
final String formatTag = CatalogUtils.getString(table.inputFormatMap, InputFormat.TYPE_PROPERTY);
|
||||
if (formatTag == null) {
|
||||
throw new IAE("%s property must be set", InputFormat.TYPE_PROPERTY);
|
||||
throw new IAE("[%s] property must be provided", InputFormat.TYPE_PROPERTY);
|
||||
}
|
||||
final InputFormatDefn formatDefn = formats.get(formatTag);
|
||||
if (formatDefn == null) {
|
||||
|
@ -140,12 +143,12 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
|
|||
{
|
||||
final String formatTag = CatalogUtils.getString(args, FORMAT_PARAMETER);
|
||||
if (formatTag == null) {
|
||||
throw new IAE("%s parameter must be set", FORMAT_PARAMETER);
|
||||
throw new IAE("Must provide a value for the [%s] parameter", FORMAT_PARAMETER);
|
||||
}
|
||||
final InputFormatDefn formatDefn = formats.get(formatTag);
|
||||
if (formatDefn == null) {
|
||||
throw new IAE(
|
||||
"Format type [%s] for property %s is not valid",
|
||||
"Format type [%s] for property [%s] is not valid",
|
||||
formatTag,
|
||||
FORMAT_PARAMETER
|
||||
);
|
||||
|
|
|
@ -104,9 +104,9 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
);
|
||||
|
||||
// Field names in the HttpInputSource
|
||||
private static final String URIS_FIELD = "uris";
|
||||
private static final String PASSWORD_FIELD = "httpAuthenticationPassword";
|
||||
private static final String USERNAME_FIELD = "httpAuthenticationUsername";
|
||||
protected static final String URIS_FIELD = "uris";
|
||||
protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
|
||||
protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
|
||||
|
||||
@Override
|
||||
public String typeValue()
|
||||
|
@ -172,6 +172,18 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
super.validate(table);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void auditInputSource(Map<String, Object> jsonMap)
|
||||
{
|
||||
// A partial table may not include the URI parameter. For example, we might
|
||||
// define an HTTP input source "with URIs to be named later." Even though the
|
||||
// input source is partial, we still want to validate the other parameters.
|
||||
// The HttpInputSource will fail if the URIs is not set. So, we have to make
|
||||
// up a fake one just so we can validate the other fields by asking the
|
||||
// input source to deserialize itself from the jsonMap.
|
||||
jsonMap.putIfAbsent(URIS_PARAMETER, "http://bogus.com");
|
||||
}
|
||||
|
||||
private Matcher templateMatcher(String uriTemplate)
|
||||
{
|
||||
Pattern p = Pattern.compile("\\{}");
|
||||
|
|
|
@ -181,10 +181,11 @@ public class LocalInputSourceDefn extends FormattedInputSourceDefn
|
|||
public TableFunction partialTableFn(ResolvedExternalTable table)
|
||||
{
|
||||
final Map<String, Object> sourceMap = new HashMap<>(table.inputSourceMap);
|
||||
final boolean hasBaseDir = !Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, BASE_DIR_FIELD));
|
||||
final boolean hasFiles = !CollectionUtils.isNullOrEmpty(CatalogUtils.safeGet(sourceMap, FILES_FIELD, List.class));
|
||||
final boolean hasFilter = !Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
|
||||
List<ParameterDefn> params = new ArrayList<>();
|
||||
if (!hasFiles && !hasFilter) {
|
||||
if (hasBaseDir && !hasFiles && !hasFilter) {
|
||||
params.add(FILES_PARAM_DEFN);
|
||||
params.add(FILTER_PARAM_DEFN);
|
||||
}
|
||||
|
@ -219,8 +220,8 @@ public class LocalInputSourceDefn extends FormattedInputSourceDefn
|
|||
}
|
||||
final boolean hasFilter = !Strings.isNullOrEmpty(CatalogUtils.getString(sourceMap, FILTER_FIELD));
|
||||
final List<String> filesParam = CatalogUtils.getStringArray(args, FILES_PARAMETER);
|
||||
final String filterParam = CatalogUtils.getString(args, FILTER_PARAMETER);
|
||||
final boolean hasFilesParam = !CollectionUtils.isNullOrEmpty(filesParam);
|
||||
final String filterParam = CatalogUtils.getString(args, FILTER_PARAMETER);
|
||||
final boolean hasFilterParam = !Strings.isNullOrEmpty(filterParam);
|
||||
if (!hasFilter && !hasFilesParam && !hasFilterParam) {
|
||||
throw new IAE(
|
||||
|
|
|
@ -199,7 +199,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
args.put(HttpInputSourceDefn.PASSWORD_PARAMETER, "secret");
|
||||
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, CsvFormatDefn.TYPE_KEY);
|
||||
ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
|
||||
validateHappyPath(externSpec);
|
||||
validateHappyPath(externSpec, true);
|
||||
|
||||
// But, it fails if there are no columns.
|
||||
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), mapper));
|
||||
|
@ -231,7 +231,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
// Convert to an external spec
|
||||
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
|
||||
ExternalTableSpec externSpec = externDefn.convert(resolved);
|
||||
validateHappyPath(externSpec);
|
||||
validateHappyPath(externSpec, true);
|
||||
|
||||
// Get the partial table function
|
||||
TableFunction fn = externDefn.tableFn(resolved);
|
||||
|
@ -239,23 +239,17 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
|
||||
// Convert to an external table.
|
||||
externSpec = fn.apply("x", Collections.emptyMap(), Collections.emptyList(), mapper);
|
||||
validateHappyPath(externSpec);
|
||||
validateHappyPath(externSpec, true);
|
||||
|
||||
// But, it fails columns are provided since the table already has them.
|
||||
assertThrows(IAE.class, () -> fn.apply("x", Collections.emptyMap(), COLUMNS, mapper));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTemplateSpecWithFormatHappyPath() throws URISyntaxException
|
||||
public void testTemplateSpecWithFormatHappyPath()
|
||||
{
|
||||
HttpInputSource inputSource = new HttpInputSource(
|
||||
Collections.singletonList(new URI("http://foo.com/my.csv")), // removed
|
||||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
new HttpInputSourceConfig(null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(httpToMap(inputSource))
|
||||
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
|
||||
.inputFormat(CSV_FORMAT)
|
||||
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
|
||||
.column("x", Columns.VARCHAR)
|
||||
|
@ -264,8 +258,54 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
|
||||
// Check validation
|
||||
table.validate();
|
||||
ResolvedTable resolved = registry.resolve(table.spec());
|
||||
assertNotNull(resolved);
|
||||
|
||||
// Check registry
|
||||
// Not a full table, can't directly convert
|
||||
// Convert to an external spec
|
||||
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
|
||||
assertThrows(IAE.class, () -> externDefn.convert(resolved));
|
||||
|
||||
// Get the partial table function
|
||||
TableFunction fn = externDefn.tableFn(resolved);
|
||||
assertEquals(4, fn.parameters().size());
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
|
||||
|
||||
// Convert to an external table.
|
||||
ExternalTableSpec externSpec = fn.apply(
|
||||
"x",
|
||||
ImmutableMap.of(
|
||||
HttpInputSourceDefn.URIS_PARAMETER,
|
||||
Collections.singletonList("my.csv")
|
||||
),
|
||||
Collections.emptyList(),
|
||||
mapper
|
||||
);
|
||||
validateHappyPath(externSpec, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTemplateSpecWithFormatAndPassword()
|
||||
{
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(ImmutableMap.of(
|
||||
"type", HttpInputSource.TYPE_KEY,
|
||||
HttpInputSourceDefn.USERNAME_FIELD, "bob",
|
||||
HttpInputSourceDefn.PASSWORD_FIELD, ImmutableMap.of(
|
||||
"type", "default",
|
||||
"password", "secret"
|
||||
)
|
||||
))
|
||||
.inputFormat(CSV_FORMAT)
|
||||
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
|
||||
.column("x", Columns.VARCHAR)
|
||||
.column("y", Columns.BIGINT)
|
||||
.build();
|
||||
|
||||
table.validate();
|
||||
ResolvedTable resolved = registry.resolve(table.spec());
|
||||
assertNotNull(resolved);
|
||||
|
||||
|
@ -289,7 +329,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
Collections.emptyList(),
|
||||
mapper
|
||||
);
|
||||
validateHappyPath(externSpec);
|
||||
validateHappyPath(externSpec, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -326,7 +366,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
args.put(HttpInputSourceDefn.URIS_PARAMETER, Collections.singletonList("my.csv"));
|
||||
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, CsvFormatDefn.TYPE_KEY);
|
||||
ExternalTableSpec externSpec = fn.apply("x", args, COLUMNS, mapper);
|
||||
validateHappyPath(externSpec);
|
||||
validateHappyPath(externSpec, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -455,11 +495,13 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
assertEquals("SECRET", ((EnvironmentVariablePasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable());
|
||||
}
|
||||
|
||||
private void validateHappyPath(ExternalTableSpec externSpec)
|
||||
private void validateHappyPath(ExternalTableSpec externSpec, boolean withUser)
|
||||
{
|
||||
HttpInputSource sourceSpec = (HttpInputSource) externSpec.inputSource;
|
||||
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
|
||||
assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
|
||||
if (withUser) {
|
||||
assertEquals("bob", sourceSpec.getHttpAuthenticationUsername());
|
||||
assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
|
||||
}
|
||||
assertEquals("http://foo.com/my.csv", sourceSpec.getUris().get(0).toString());
|
||||
|
||||
// Just a sanity check: details of CSV conversion are tested elsewhere.
|
||||
|
|
|
@ -1,178 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.sql.calcite.external;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.schema.FunctionParameter;
|
||||
import org.apache.calcite.schema.TranslatableTable;
|
||||
import org.apache.calcite.sql.SqlCall;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlOperator;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.type.ReturnTypes;
|
||||
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
|
||||
import org.apache.druid.catalog.model.TableDefnRegistry;
|
||||
import org.apache.druid.catalog.model.table.ExternalTableSpec;
|
||||
import org.apache.druid.catalog.model.table.InputSourceDefn;
|
||||
import org.apache.druid.catalog.model.table.TableFunction;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
|
||||
import org.apache.druid.sql.calcite.expression.DruidExpression;
|
||||
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
|
||||
import org.apache.druid.sql.calcite.external.UserDefinedTableMacroFunction.ExtendedTableMacro;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Base class for input-source-specific table functions with arguments derived from
|
||||
* a catalog external table definition. Such functions work in conjunction with the
|
||||
* EXTERN key word to provide a schema. Example of the HTTP form:
|
||||
* <code><pre>
|
||||
* INSERT INTO myTable SELECT ...
|
||||
* FROM TABLE(http(
|
||||
* userName => 'bob',
|
||||
* password => 'secret',
|
||||
* uris => ARRAY['http:foo.com/bar.csv'],
|
||||
* format => 'csv'))
|
||||
* EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
|
||||
* PARTITIONED BY ...
|
||||
* </pre></code>
|
||||
*/
|
||||
public abstract class CatalogExternalTableOperatorConversion implements SqlOperatorConversion
|
||||
{
|
||||
private final SqlUserDefinedTableMacro operator;
|
||||
|
||||
public CatalogExternalTableOperatorConversion(
|
||||
final String name,
|
||||
final TableDefnRegistry registry,
|
||||
final String tableType,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this(
|
||||
name,
|
||||
((InputSourceDefn) registry.inputSourceDefnFor(tableType)).adHocTableFn(),
|
||||
jsonMapper
|
||||
);
|
||||
}
|
||||
|
||||
public CatalogExternalTableOperatorConversion(
|
||||
final String name,
|
||||
final TableFunction fn,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.operator = new CatalogExternalTableOperator(
|
||||
new CatalogTableMacro(name, fn, jsonMapper)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlOperator calciteOperator()
|
||||
{
|
||||
return operator;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class CatalogExternalTableOperator extends UserDefinedTableMacroFunction implements AuthorizableOperator
|
||||
{
|
||||
public CatalogExternalTableOperator(final CatalogTableMacro macro)
|
||||
{
|
||||
super(
|
||||
new SqlIdentifier(macro.name, SqlParserPos.ZERO),
|
||||
ReturnTypes.CURSOR,
|
||||
null,
|
||||
// Use our own definition of variadic since Calcite's doesn't allow
|
||||
// optional parameters.
|
||||
Externals.variadic(macro.parameters),
|
||||
Externals.dataTypes(macro.parameters),
|
||||
macro
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ResourceAction> computeResources(final SqlCall call)
|
||||
{
|
||||
return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CatalogTableMacro implements ExtendedTableMacro
|
||||
{
|
||||
private final String name;
|
||||
private final List<FunctionParameter> parameters;
|
||||
private final TableFunction fn;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public CatalogTableMacro(
|
||||
final String name,
|
||||
final TableFunction fn,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.fn = fn;
|
||||
this.parameters = Externals.convertParameters(fn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the function is used without an {@code EXTEND} clause.
|
||||
* {@code EXTERN} allows this, most others do not.
|
||||
*/
|
||||
@Override
|
||||
public TranslatableTable apply(final List<Object> arguments)
|
||||
{
|
||||
return apply(arguments, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
|
||||
{
|
||||
final ExternalTableSpec externSpec = fn.apply(
|
||||
name,
|
||||
Externals.convertArguments(fn, arguments),
|
||||
schema == null ? null : Externals.convertColumns(schema),
|
||||
jsonMapper
|
||||
);
|
||||
return Externals.buildExternalTable(externSpec, jsonMapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FunctionParameter> getParameters()
|
||||
{
|
||||
return parameters;
|
||||
}
|
||||
}
|
||||
}
|
101
sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
vendored
Normal file
101
sql/src/main/java/org/apache/druid/sql/calcite/external/DruidTableMacro.java
vendored
Normal file
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.sql.calcite.external;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.calcite.schema.FunctionParameter;
|
||||
import org.apache.calcite.schema.TranslatableTable;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.druid.catalog.model.ResolvedTable;
|
||||
import org.apache.druid.catalog.model.table.ExternalTableDefn;
|
||||
import org.apache.druid.catalog.model.table.ExternalTableSpec;
|
||||
import org.apache.druid.catalog.model.table.TableFunction;
|
||||
import org.apache.druid.sql.calcite.external.SchemaAwareUserDefinedTableMacro.ExtendedTableMacro;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Table macro which wraps a catalog table function and which accepts
|
||||
* a schema from an EXTENDS clause. This macro is wrapped by the
|
||||
* {@link DruidUserDefinedTableMacro} operator that itself extends
|
||||
* {@link SchemaAwareUserDefinedTableMacro} which interfaces with the
|
||||
* extend operator to pass the schema via a "back channel." The plumbing
|
||||
* is complex because we're adding functionality a bit outside the SQL
|
||||
* standard, and we have to fit our logic into the Calcite stack.
|
||||
*/
|
||||
public class DruidTableMacro implements ExtendedTableMacro
|
||||
{
|
||||
protected final String name;
|
||||
final List<FunctionParameter> parameters;
|
||||
private final TableFunction fn;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public DruidTableMacro(
|
||||
final String name,
|
||||
final TableFunction fn,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.fn = fn;
|
||||
this.parameters = Externals.convertParameters(fn);
|
||||
}
|
||||
|
||||
public DruidTableMacro(
|
||||
final String tableName,
|
||||
final ResolvedTable externalTable
|
||||
)
|
||||
{
|
||||
this.name = tableName;
|
||||
ExternalTableDefn tableDefn = (ExternalTableDefn) externalTable.defn();
|
||||
this.fn = tableDefn.tableFn(externalTable);
|
||||
this.parameters = Externals.convertParameters(fn);
|
||||
this.jsonMapper = externalTable.jsonMapper();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the function is used without an {@code EXTEND} clause.
|
||||
* {@code EXTERN} allows this, most others do not.
|
||||
*/
|
||||
@Override
|
||||
public TranslatableTable apply(final List<Object> arguments)
|
||||
{
|
||||
return apply(arguments, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslatableTable apply(List<Object> arguments, SqlNodeList schema)
|
||||
{
|
||||
final ExternalTableSpec externSpec = fn.apply(
|
||||
name,
|
||||
Externals.convertArguments(fn, arguments),
|
||||
schema == null ? null : Externals.convertColumns(schema),
|
||||
jsonMapper
|
||||
);
|
||||
return Externals.buildExternalTable(externSpec, jsonMapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FunctionParameter> getParameters()
|
||||
{
|
||||
return parameters;
|
||||
}
|
||||
}
|
70
sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
vendored
Normal file
70
sql/src/main/java/org/apache/druid/sql/calcite/external/DruidUserDefinedTableMacro.java
vendored
Normal file
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.sql.calcite.external;
|
||||
|
||||
import org.apache.calcite.sql.SqlCall;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.type.ReturnTypes;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Base class for input-source-specific table functions with arguments derived from
|
||||
* a catalog external table definition. Such functions work in conjunction with the
|
||||
* EXTEND key word to provide a schema. Example of the HTTP form:
|
||||
* <code><pre>
|
||||
* INSERT INTO myTable SELECT ...
|
||||
* FROM TABLE(http(
|
||||
* userName => 'bob',
|
||||
* password => 'secret',
|
||||
* uris => ARRAY['http:foo.com/bar.csv'],
|
||||
* format => 'csv'))
|
||||
* EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
|
||||
* PARTITIONED BY ...
|
||||
* </pre></code>
|
||||
* <p>
|
||||
* This version
|
||||
*/
|
||||
public class DruidUserDefinedTableMacro extends SchemaAwareUserDefinedTableMacro implements AuthorizableOperator
|
||||
{
|
||||
public DruidUserDefinedTableMacro(final DruidTableMacro macro)
|
||||
{
|
||||
super(
|
||||
new SqlIdentifier(macro.name, SqlParserPos.ZERO),
|
||||
ReturnTypes.CURSOR,
|
||||
null,
|
||||
// Use our own definition of variadic since Calcite's doesn't allow
|
||||
// optional parameters.
|
||||
Externals.variadic(macro.parameters),
|
||||
Externals.dataTypes(macro.parameters),
|
||||
macro
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ResourceAction> computeResources(final SqlCall call)
|
||||
{
|
||||
return Collections.singleton(Externals.EXTERNAL_RESOURCE_ACTION);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.sql.calcite.external;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlOperator;
|
||||
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
|
||||
import org.apache.druid.catalog.model.TableDefnRegistry;
|
||||
import org.apache.druid.catalog.model.table.InputSourceDefn;
|
||||
import org.apache.druid.catalog.model.table.TableFunction;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.sql.calcite.expression.DruidExpression;
|
||||
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Operator conversion for user-defined table macros (functions) based on the
|
||||
* {@link TableFunction} abstraction defined by the catalog.
|
||||
*/
|
||||
public abstract class DruidUserDefinedTableMacroConversion implements SqlOperatorConversion
|
||||
{
|
||||
private final SqlUserDefinedTableMacro operator;
|
||||
|
||||
public DruidUserDefinedTableMacroConversion(
|
||||
final String name,
|
||||
final TableDefnRegistry registry,
|
||||
final String tableType,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this(
|
||||
name,
|
||||
((InputSourceDefn) registry.inputSourceDefnFor(tableType)).adHocTableFn(),
|
||||
jsonMapper
|
||||
);
|
||||
}
|
||||
|
||||
public DruidUserDefinedTableMacroConversion(
|
||||
final String name,
|
||||
final TableFunction fn,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.operator = new DruidUserDefinedTableMacro(
|
||||
new DruidTableMacro(name, fn, jsonMapper)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlOperator calciteOperator()
|
||||
{
|
||||
return operator;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlNodeList;
|
|||
import org.apache.calcite.sql.fun.SqlCollectionTableOperator;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.validate.SqlValidator;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
||||
/**
|
||||
|
@ -35,7 +36,7 @@ import org.apache.druid.java.util.common.ISE;
|
|||
* is said to have been added for Apache Phoenix, and which we repurpose to
|
||||
* supply a schema for an ingest input table.
|
||||
*
|
||||
* @see {@link UserDefinedTableMacroFunction} for details
|
||||
* @see {@link SchemaAwareUserDefinedTableMacro} for details
|
||||
*/
|
||||
public class ExtendOperator extends SqlInternalOperator
|
||||
{
|
||||
|
@ -52,23 +53,47 @@ public class ExtendOperator extends SqlInternalOperator
|
|||
* squirreled away in an ad-hoc instance of the macro. We must do it
|
||||
* this way because we can't change Calcite to define a new node type
|
||||
* that holds onto the schema.
|
||||
* <p>
|
||||
* The node structure is:<pre>{@code
|
||||
* EXTEND(
|
||||
* TABLE(
|
||||
* <table fn>(
|
||||
* <table macro>
|
||||
* )
|
||||
* ),
|
||||
* <schema>
|
||||
* )}</pre>
|
||||
* <p>
|
||||
* Note that the table macro is not an operand: it is an implicit
|
||||
* member of the table macro function operator.
|
||||
*/
|
||||
@Override
|
||||
public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
|
||||
{
|
||||
SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0);
|
||||
if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) {
|
||||
throw new ISE("First argument to EXTEND must be a table function");
|
||||
throw new ISE("First argument to EXTEND must be TABLE");
|
||||
}
|
||||
SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
|
||||
if (!(tableFnCall.getOperator() instanceof UserDefinedTableMacroFunction)) {
|
||||
// May be an unresolved function.
|
||||
return call;
|
||||
}
|
||||
UserDefinedTableMacroFunction macro = (UserDefinedTableMacroFunction) tableFnCall.getOperator();
|
||||
|
||||
// The table function must be a Druid-defined table macro function
|
||||
// which is aware of the EXTEND schema.
|
||||
SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
|
||||
if (!(tableFnCall.getOperator() instanceof SchemaAwareUserDefinedTableMacro)) {
|
||||
// May be an unresolved function.
|
||||
throw new IAE(
|
||||
"Function %s does not accept an EXTEND clause (or a schema list)",
|
||||
tableFnCall.getOperator().getName()
|
||||
);
|
||||
}
|
||||
|
||||
// Move the schema from the second operand of EXTEND into a member
|
||||
// function of a shim table macro.
|
||||
SchemaAwareUserDefinedTableMacro tableFn = (SchemaAwareUserDefinedTableMacro) tableFnCall.getOperator();
|
||||
SqlNodeList schema = (SqlNodeList) call.operand(1);
|
||||
SqlCall newCall = macro.rewriteCall(tableFnCall, schema);
|
||||
SqlCall newCall = tableFn.rewriteCall(tableFnCall, schema);
|
||||
|
||||
// Create a new TABLE(table_fn) node to replace the EXTEND node. After this,
|
||||
// the table macro function acts just like a standard Calcite version.
|
||||
return SqlStdOperatorTable.COLLECTION_TABLE.createCall(call.getParserPosition(), newCall);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ import java.util.Map;
|
|||
* a Druid JSON signature, or an SQL {@code EXTEND} list of columns.
|
||||
* As with all table functions, the {@code EXTEND} is optional.
|
||||
*/
|
||||
public class ExternalOperatorConversion extends CatalogExternalTableOperatorConversion
|
||||
public class ExternalOperatorConversion extends DruidUserDefinedTableMacroConversion
|
||||
{
|
||||
public static final String FUNCTION_NAME = "EXTERN";
|
||||
|
||||
|
@ -72,12 +72,12 @@ public class ExternalOperatorConversion extends CatalogExternalTableOperatorConv
|
|||
public ExternFunction()
|
||||
{
|
||||
super(Arrays.asList(
|
||||
new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, true),
|
||||
new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, true),
|
||||
new Parameter(INPUT_SOURCE_PARAM, ParameterType.VARCHAR, false),
|
||||
new Parameter(INPUT_FORMAT_PARAM, ParameterType.VARCHAR, false),
|
||||
|
||||
// Not required: the user can either provide the signature OR
|
||||
// Optional: the user can either provide the signature OR
|
||||
// an EXTEND clause. Checked in the implementation.
|
||||
new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, false)
|
||||
new Parameter(SIGNATURE_PARAM, ParameterType.VARCHAR, true)
|
||||
));
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.google.inject.Inject;
|
|||
import org.apache.druid.catalog.model.TableDefnRegistry;
|
||||
import org.apache.druid.catalog.model.table.HttpInputSourceDefn;
|
||||
|
||||
public class HttpOperatorConversion extends CatalogExternalTableOperatorConversion
|
||||
public class HttpOperatorConversion extends DruidUserDefinedTableMacroConversion
|
||||
{
|
||||
public static final String FUNCTION_NAME = "http";
|
||||
|
||||
|
|
|
@ -23,14 +23,12 @@ import com.google.inject.Inject;
|
|||
import org.apache.druid.catalog.model.TableDefnRegistry;
|
||||
import org.apache.druid.catalog.model.table.InlineInputSourceDefn;
|
||||
|
||||
public class InlineOperatorConversion extends CatalogExternalTableOperatorConversion
|
||||
public class InlineOperatorConversion extends DruidUserDefinedTableMacroConversion
|
||||
{
|
||||
public static final String FUNCTION_NAME = "inline";
|
||||
|
||||
@Inject
|
||||
public InlineOperatorConversion(
|
||||
final TableDefnRegistry registry
|
||||
)
|
||||
public InlineOperatorConversion(final TableDefnRegistry registry)
|
||||
{
|
||||
super(FUNCTION_NAME, registry, InlineInputSourceDefn.TYPE_KEY, registry.jsonMapper());
|
||||
}
|
||||
|
|
|
@ -23,16 +23,14 @@ import com.google.inject.Inject;
|
|||
import org.apache.druid.catalog.model.TableDefnRegistry;
|
||||
import org.apache.druid.catalog.model.table.LocalInputSourceDefn;
|
||||
|
||||
public class LocalOperatorConversion extends CatalogExternalTableOperatorConversion
|
||||
public class LocalOperatorConversion extends DruidUserDefinedTableMacroConversion
|
||||
{
|
||||
// Cannot use "local" because it is a SQL keyword and the user would
|
||||
// be required to quote the name.
|
||||
public static final String FUNCTION_NAME = "localfiles";
|
||||
|
||||
@Inject
|
||||
public LocalOperatorConversion(
|
||||
final TableDefnRegistry registry
|
||||
)
|
||||
public LocalOperatorConversion(final TableDefnRegistry registry)
|
||||
{
|
||||
super(FUNCTION_NAME, registry, LocalInputSourceDefn.TYPE_KEY, registry.jsonMapper());
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.sql.calcite.external;
|
||||
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rel.type.RelDataTypeFactory;
|
||||
import org.apache.calcite.schema.FunctionParameter;
|
||||
import org.apache.calcite.schema.TableMacro;
|
||||
import org.apache.calcite.schema.TranslatableTable;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
|
|||
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
|
||||
import org.apache.calcite.sql.type.SqlOperandTypeInference;
|
||||
import org.apache.calcite.sql.type.SqlReturnTypeInference;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
|
||||
|
||||
|
@ -63,14 +65,12 @@ import java.util.Set;
|
|||
* SELECT ..
|
||||
* FROM myTable EXTEND (x VARCHAR, ...)
|
||||
* </pre></code>
|
||||
* Though, oddly, a search of Apache Phoenix itself does not find a hit for
|
||||
* EXTEND, so perhaps the feature was never completed?
|
||||
* <p>
|
||||
* For Druid, we want the above form: extend a table function, not a
|
||||
* literal table. Since we can't change the Calcite parser, we instead use
|
||||
* tricks within the constraints of the parser.
|
||||
* <ul>
|
||||
* <li>First, use use a Python script to modify the parser to add the
|
||||
* <li>The Calcite parser is revised to add the
|
||||
* EXTEND rule for a table function.</li>
|
||||
* <li>Calcite expects the EXTEND operator to have two arguments: an identifier
|
||||
* and the column list. Since our case has a function call as the first argument,
|
||||
|
@ -99,9 +99,10 @@ import java.util.Set;
|
|||
* </pre></code>
|
||||
* Since we seldom use unparse, we can perhaps live with this limitation for now.
|
||||
*/
|
||||
public abstract class UserDefinedTableMacroFunction extends BaseUserDefinedTableMacro implements AuthorizableOperator
|
||||
public abstract class SchemaAwareUserDefinedTableMacro
|
||||
extends BaseUserDefinedTableMacro implements AuthorizableOperator
|
||||
{
|
||||
public UserDefinedTableMacroFunction(
|
||||
public SchemaAwareUserDefinedTableMacro(
|
||||
SqlIdentifier opName,
|
||||
SqlReturnTypeInference returnTypeInference,
|
||||
SqlOperandTypeInference operandTypeInference,
|
||||
|
@ -119,15 +120,20 @@ public abstract class UserDefinedTableMacroFunction extends BaseUserDefinedTable
|
|||
*/
|
||||
public SqlBasicCall rewriteCall(SqlBasicCall oldCall, SqlNodeList schema)
|
||||
{
|
||||
return new ExtendedCall(oldCall, new ShimTableMacroFunction(this, schema));
|
||||
return new ExtendedCall(oldCall, new ShimUserDefinedTableMacro(this, schema));
|
||||
}
|
||||
|
||||
private static class ShimTableMacroFunction extends BaseUserDefinedTableMacro implements AuthorizableOperator
|
||||
// Note the confusing use of "table macro". A TablMacro is a non-SqlNode that does the
|
||||
// actual translation to a table. A *UserDefinedTableMacro is a function that wraps
|
||||
// a table macro. The result is that "macro" by itself is ambiguous: it can be the
|
||||
// implementation (TableMacro) or the function that wraps the implementation.
|
||||
private static class ShimUserDefinedTableMacro extends BaseUserDefinedTableMacro implements AuthorizableOperator
|
||||
{
|
||||
protected final UserDefinedTableMacroFunction base;
|
||||
protected final SchemaAwareUserDefinedTableMacro base;
|
||||
protected final SqlNodeList schema;
|
||||
private TranslatableTable table;
|
||||
|
||||
public ShimTableMacroFunction(final UserDefinedTableMacroFunction base, final SqlNodeList schema)
|
||||
public ShimUserDefinedTableMacro(final SchemaAwareUserDefinedTableMacro base, final SqlNodeList schema)
|
||||
{
|
||||
super(
|
||||
base.getNameAsId(),
|
||||
|
@ -141,6 +147,21 @@ public abstract class UserDefinedTableMacroFunction extends BaseUserDefinedTable
|
|||
this.schema = schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslatableTable getTable(
|
||||
RelDataTypeFactory typeFactory,
|
||||
List<SqlNode> operandList
|
||||
)
|
||||
{
|
||||
if (table == null) {
|
||||
// Cache the table to avoid multiple conversions
|
||||
// Possible because each call has a distinct instance
|
||||
// of this operator.
|
||||
table = super.getTable(typeFactory, operandList);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ResourceAction> computeResources(final SqlCall call)
|
||||
{
|
||||
|
@ -155,7 +176,7 @@ public abstract class UserDefinedTableMacroFunction extends BaseUserDefinedTable
|
|||
{
|
||||
private final SqlNodeList schema;
|
||||
|
||||
public ExtendedCall(SqlBasicCall oldCall, ShimTableMacroFunction macro)
|
||||
public ExtendedCall(SqlBasicCall oldCall, ShimUserDefinedTableMacro macro)
|
||||
{
|
||||
super(
|
||||
macro,
|
||||
|
@ -208,6 +229,16 @@ public abstract class UserDefinedTableMacroFunction extends BaseUserDefinedTable
|
|||
schema.unparse(writer, leftPrec, rightPrec);
|
||||
writer.endList(frame);
|
||||
}
|
||||
|
||||
/**
|
||||
* Required by GHA CodeQL even though Calcite doesn't use this
|
||||
* particular method.
|
||||
*/
|
||||
@Override
|
||||
public Object clone()
|
||||
{
|
||||
throw new UOE("Not supported");
|
||||
}
|
||||
}
|
||||
|
||||
public interface ExtendedTableMacro extends TableMacro
|
|
@ -114,7 +114,12 @@ public class SqlParameterizerShuttle extends SqlShuttle
|
|||
*/
|
||||
private SqlNode createArrayLiteral(Object value)
|
||||
{
|
||||
List<?> list = Arrays.asList((Object[]) value);
|
||||
List<?> list;
|
||||
if (value instanceof List) {
|
||||
list = (List<?>) value;
|
||||
} else {
|
||||
list = Arrays.asList((Object[]) value);
|
||||
}
|
||||
List<SqlNode> args = new ArrayList<>(list.size());
|
||||
for (Object element : list) {
|
||||
if (element == null) {
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
|
|||
import org.apache.druid.sql.calcite.external.Externals;
|
||||
import org.apache.druid.sql.calcite.filtration.Filtration;
|
||||
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
|
||||
import org.apache.druid.sql.calcite.planner.Calcites;
|
||||
import org.apache.druid.sql.calcite.planner.IngestHandler;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerConfig;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
|
@ -306,6 +307,50 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
|||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertFromExternalWithSchema()
|
||||
{
|
||||
String extern;
|
||||
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
|
||||
try {
|
||||
extern = StringUtils.format(
|
||||
"TABLE(extern(%s, %s))",
|
||||
Calcites.escapeStringLiteral(
|
||||
queryJsonMapper.writeValueAsString(
|
||||
new InlineInputSource("a,b,1\nc,d,2\n")
|
||||
)
|
||||
),
|
||||
Calcites.escapeStringLiteral(
|
||||
queryJsonMapper.writeValueAsString(
|
||||
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
testIngestionQuery()
|
||||
.sql("INSERT INTO dst SELECT * FROM %s\n" +
|
||||
" (x VARCHAR, y VARCHAR, z BIGINT)\n" +
|
||||
"PARTITIONED BY ALL TIME",
|
||||
extern
|
||||
)
|
||||
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
|
||||
.expectTarget("dst", externalDataSource.getSignature())
|
||||
.expectResources(dataSourceWrite("dst"), Externals.EXTERNAL_RESOURCE_ACTION)
|
||||
.expectQuery(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(externalDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("insertFromExternal")
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertWithPartitionedBy()
|
||||
{
|
||||
|
|
|
@ -101,7 +101,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(httpDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("httpExtern")
|
||||
|
@ -143,7 +143,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(httpDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("httpExtern")
|
||||
|
@ -173,7 +173,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(httpDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("httpExtern")
|
||||
|
@ -200,7 +200,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(httpDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("httpExtern")
|
||||
|
@ -223,7 +223,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(externalDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("insertFromExternal")
|
||||
|
@ -291,7 +291,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(externalDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("insertFromExternal")
|
||||
|
@ -319,7 +319,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(externalDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("insertFromExternal")
|
||||
|
@ -356,7 +356,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(localDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("localExtern")
|
||||
|
@ -384,7 +384,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(localDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("localExtern")
|
||||
|
@ -412,7 +412,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(localDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("localExtern")
|
||||
|
@ -442,7 +442,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(localDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("localExtern")
|
||||
|
@ -472,7 +472,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
.dataSource(localDataSource)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("x", "y", "z")
|
||||
.context(CalciteInsertDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
|
||||
.build()
|
||||
)
|
||||
.expectLogicalPlanFrom("localExtern")
|
||||
|
|
Loading…
Reference in New Issue