Fix typos, add tests for http() function (#13954)

This commit is contained in:
Paul Rogers 2023-03-28 14:41:06 -07:00 committed by GitHub
parent 2f98675285
commit 76fe26d4ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 143 additions and 22 deletions

View File

@ -78,8 +78,8 @@ FROM TABLE(
EXTERN(
inputSource => '<Druid input source>',
inputFormat => '<Druid input format>'
) (<columns>)
)
)) (<columns>)
```
The input source and format are as above. The columns are expressed as in a SQL `CREATE TABLE`.
@ -106,7 +106,7 @@ FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => ARRAY['http:example.com/foo.csv', 'http:example.com/bar.csv'],
uris => ARRAY['http://example.com/foo.csv', 'http://example.com/bar.csv'],
format => 'csv'
)
) EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
@ -129,7 +129,7 @@ FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => ARRAY['http:example.com/foo.csv', 'http:example.com/bar.csv'],
uris => ARRAY['http://example.com/foo.csv', 'http://example.com/bar.csv'],
format => 'csv'
)
) (x VARCHAR, y VARCHAR, z BIGINT)

View File

@ -169,4 +169,14 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
{
return true;
}
@Override
public String toString()
{
return "HttpInputSource{" +
"uris=\"" + uris +
"\", httpAuthenticationUsername=" + httpAuthenticationUsername +
", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider +
"}";
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class DefaultPasswordProvider implements PasswordProvider
{
public static final String TYPE_KEY = "default";
@ -64,8 +66,7 @@ public class DefaultPasswordProvider implements PasswordProvider
}
DefaultPasswordProvider that = (DefaultPasswordProvider) o;
return getPassword() != null ? getPassword().equals(that.getPassword()) : that.getPassword() == null;
return Objects.equals(getPassword(), that.getPassword());
}
@Override

View File

@ -29,6 +29,7 @@ import org.joda.time.Interval;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
*/
@ -76,12 +77,7 @@ public class MultipleIntervalSegmentSpec implements QuerySegmentSpec
}
MultipleIntervalSegmentSpec that = (MultipleIntervalSegmentSpec) o;
if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) {
return false;
}
return true;
return Objects.equals(intervals, that.intervals);
}
@Override

View File

@ -47,7 +47,7 @@ public class ExternalTableScanRule extends RelOptRule
return super.matches(call);
} else {
plannerContext.setPlanningError(
"Cannot use '%s' with SQL engine '%s'.",
"Cannot use [%s] with SQL engine [%s].",
ExternalOperatorConversion.FUNCTION_NAME,
plannerContext.getEngine().name()
);

View File

@ -34,6 +34,8 @@ import org.apache.druid.java.util.common.UOE;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
@ -73,7 +75,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
protected final ExternalDataSource httpDataSource = new ExternalDataSource(
new HttpInputSource(
Collections.singletonList(toURI("http:foo.com/bar.csv")),
Collections.singletonList(toURI("http://foo.com/bar.csv")),
"bob",
new DefaultPasswordProvider("secret"),
new HttpInputSourceConfig(null)
@ -159,10 +161,10 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
public void testHttpFn()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
.sql("INSERT INTO dst SELECT x, y, z\n" +
"FROM TABLE(http(userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http:foo.com/bar.csv'],\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://foo.com/bar.csv'],\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
@ -181,6 +183,119 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testHttpFn2()
{
final ExternalDataSource httpDataSource = new ExternalDataSource(
new HttpInputSource(
Arrays.asList(toURI("http://example.com/foo.csv"), toURI("http://example.com/bar.csv")),
"bob",
new DefaultPasswordProvider("secret"),
new HttpInputSourceConfig(null)
),
new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, false, false, 0),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("isRobot", ColumnType.STRING)
.build()
);
RowSignature expectedSig = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("isRobot", ColumnType.STRING)
.build();
testIngestionQuery()
.sql("INSERT INTO w000\n" +
"SELECT\n" +
" TIME_PARSE(\"timestamp\") AS __time,\n" +
" isRobot\n" +
"FROM TABLE(http(\n" +
" userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://example.com/foo.csv', 'http://example.com/bar.csv'],\n" +
" format => 'csv'\n" +
" )\n" +
") EXTEND (\"timestamp\" VARCHAR, isRobot VARCHAR)\n" +
"PARTITIONED BY HOUR")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("w000", expectedSig)
.expectResources(dataSourceWrite("w000"), Externals.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "timestamp_parse(\"timestamp\",null,'UTC')", ColumnType.LONG))
.columns("isRobot", "v0")
.build()
)
.verify();
}
@Test
public void testExplainHttpFn()
{
// Skip vectorization since otherwise the "context" will change for each subtest.
skipVectorize();
final String query =
"EXPLAIN PLAN FOR\n" +
"INSERT INTO dst SELECT x, y, z\n" +
"FROM TABLE(http(userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://foo.com/bar.csv'],\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME";
final String explanation = "[{" +
"\"query\":{\"queryType\":\"scan\"," +
"\"dataSource\":{\"type\":\"external\"," +
"\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"}}," +
"\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]}," +
"\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]}," +
"\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"legacy\":false," +
"\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"," +
"\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\"," +
"\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}," +
"\"granularity\":{\"type\":\"all\"}}," +
"\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]}]";
final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
testQuery(
PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN,
query,
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.of(
new Object[]{explanation, resources}
)
);
didTest = true;
}
@Test
public void testExplainHttpFnUnauthorized()
{
final String query =
"EXPLAIN PLAN FOR\n" +
"INSERT INTO dst SELECT x, y, z\n" +
"FROM TABLE(http(userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://foo.com/bar.csv'],\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME";
didTest = true; // Else the framework will complain
testBuilder()
.plannerConfig(PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN)
.sql(query)
// Regular user does not have permission on extern or other table functions
.authResult(CalciteTests.REGULAR_USER_AUTH_RESULT)
.expectedException(expected -> {
expected.expect(ForbiddenException.class);
expected.expectMessage(Access.DEFAULT_ERROR_MESSAGE);
})
.run();
}
@Test
public void testHttpFnWithParameters()
{
@ -193,7 +308,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.parameters(Collections.singletonList(new SqlParameter(SqlType.ARRAY, new String[] {"http:foo.com/bar.csv"})))
.parameters(Collections.singletonList(new SqlParameter(SqlType.ARRAY, new String[] {"http://foo.com/bar.csv"})))
.expectTarget("dst", httpDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), Externals.EXTERNAL_RESOURCE_ACTION)
.expectQuery(

View File

@ -395,7 +395,7 @@ public class QueryTestRunner
expectedQueries.add(BaseCalciteQueryTest.recursivelyClearContext(query, queryJsonMapper));
}
final List<Query> recordedQueries = queryResults.recordedQueries
final List<Query<?>> recordedQueries = queryResults.recordedQueries
.stream()
.map(q -> BaseCalciteQueryTest.recursivelyClearContext(q, queryJsonMapper))
.collect(Collectors.toList());
@ -609,7 +609,6 @@ public class QueryTestRunner
}
}
private final List<QueryTestRunner.QueryRunStep> runSteps = new ArrayList<>();
private final List<QueryTestRunner.QueryVerifyStep> verifySteps = new ArrayList<>();

View File

@ -85,7 +85,7 @@ public class ExternalTableScanRuleTest
ExternalTableScanRule rule = new ExternalTableScanRule(plannerContext);
rule.matches(EasyMock.createMock(RelOptRuleCall.class));
Assert.assertEquals(
"Cannot use 'EXTERN' with SQL engine 'native'.",
"Cannot use [EXTERN] with SQL engine [native].",
plannerContext.getPlanningError()
);
}

View File

@ -1,3 +1,3 @@
LogicalInsert(target=[dst], partitionedBy=[AllGranularity], clusteredBy=[<none>])
LogicalProject(x=[$0], y=[$1], z=[$2])
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http:foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])