Allow broker to use catalog for datasource schemas for SQL queries (#15469)

* Allow broker to use catalog for datasource schemas

* More PR comments

* PR comments
This commit is contained in:
Jonathan Wei 2024-01-08 13:46:08 -06:00 committed by GitHub
parent 141c214b46
commit 5d1e66b8f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 1510 additions and 373 deletions

View File

@ -48,44 +48,22 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
@ -142,11 +120,6 @@
<artifactId>curator-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
@ -162,11 +135,6 @@
<artifactId>jsr311-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
@ -177,36 +145,6 @@
<artifactId>jersey-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-memory</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
@ -214,31 +152,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
@ -262,4 +180,34 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<!-- tool gets confused between these two items. -->
<usedDependencies>
<dependency>javax.inject:javax.inject</dependency>
</usedDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>javax.inject:javax.inject</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>jakarta.inject:jakarta.inject-api</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.guice;
import com.google.inject.Binder;
import org.apache.druid.catalog.http.CatalogListenerResource;
import org.apache.druid.catalog.model.SchemaRegistry;
import org.apache.druid.catalog.model.SchemaRegistryImpl;
import org.apache.druid.catalog.sql.LiveCatalogResolver;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.CatalogClient;
import org.apache.druid.catalog.sync.CatalogUpdateListener;
import org.apache.druid.catalog.sync.CatalogUpdateReceiver;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog.CatalogSource;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
/**
* Configures the metadata catalog on the Broker to use a cache
* and network communications for pull and push updates.
*/
@LoadScope(roles = NodeRole.BROKER_JSON_NAME)
public class CatalogBrokerModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
// The Broker (catalog client) uses a cached metadata catalog.
binder
.bind(CachedMetadataCatalog.class)
.in(LazySingleton.class);
// Broker code accesses he catalog through the
// MetadataCatalog interface.
binder
.bind(MetadataCatalog.class)
.to(CachedMetadataCatalog.class)
.in(LazySingleton.class);
// The cached metadata catalog needs a "pull" source,
// which is the network client.
binder
.bind(CatalogSource.class)
.to(CatalogClient.class)
.in(LazySingleton.class);
// The cached metadata catalog is the listener for"push" events.
binder
.bind(CatalogUpdateListener.class)
.to(CachedMetadataCatalog.class)
.in(LazySingleton.class);
// At present, the set of schemas is fixed.
binder
.bind(SchemaRegistry.class)
.to(SchemaRegistryImpl.class)
.in(LazySingleton.class);
// Lifecycle-managed class to prime the metadata cache
binder
.bind(CatalogUpdateReceiver.class)
.in(ManageLifecycle.class);
LifecycleModule.register(binder, CatalogUpdateReceiver.class);
// Catalog resolver for the planner. This will override the
// base binding.
binder
.bind(CatalogResolver.class)
.to(LiveCatalogResolver.class)
.in(LazySingleton.class);
// The listener resource sends to the catalog
// listener (the cached catalog.)
Jerseys.addResource(binder, CatalogListenerResource.class);
}
}

View File

@ -67,4 +67,22 @@ public class CatalogListenerResource
listener.updated(event);
return Response.status(Response.Status.ACCEPTED).build();
}
@POST
@Path("flush")
@ResourceFilters(ConfigResourceFilter.class)
public Response flush()
{
listener.flush();
return Response.status(Response.Status.ACCEPTED).build();
}
@POST
@Path("resync")
@ResourceFilters(ConfigResourceFilter.class)
public Response resync()
{
listener.resync();
return Response.status(Response.Status.ACCEPTED).build();
}
}

View File

@ -278,8 +278,7 @@ public class CatalogResource
// Retrieval
/**
* Retrieves the list of all Druid schema names, all table names, or
* all table metadata.
* Retrieves the list of all Druid schema names.
*
* @param format the format of the response. See the code for the
* available formats

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveColumnMetadata;
import org.apache.druid.sql.calcite.table.DatasourceTable.EffectiveMetadata;
import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata;
import org.apache.druid.sql.calcite.table.DruidTable;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* A catalog resolver that uses the catalog stored on the Druid coordinator.
*/
public class LiveCatalogResolver implements CatalogResolver
{
private final MetadataCatalog catalog;
@Inject
public LiveCatalogResolver(final MetadataCatalog catalog)
{
this.catalog = catalog;
}
@Nullable
private DatasourceFacade datasourceSpec(String name)
{
TableId tableId = TableId.datasource(name);
ResolvedTable table = catalog.resolveTable(tableId);
if (table == null) {
return null;
}
if (!DatasourceDefn.isDatasource(table)) {
return null;
}
return new DatasourceFacade(table);
}
/**
* Create a {@link DruidTable} based on the physical segments, catalog entry, or both.
*/
@Override
public DruidTable resolveDatasource(String name, PhysicalDatasourceMetadata dsMetadata)
{
DatasourceFacade dsSpec = datasourceSpec(name);
// No catalog metadata. If there is no physical metadata, then the
// datasource does not exist. Else, if there is physical metadata, the
// datasource is based entirely on the physical information.
if (dsSpec == null) {
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
if (dsMetadata == null) {
// Datasource exists only in the catalog: no physical segments.
return emptyDatasource(name, dsSpec);
} else {
// Datasource exists as both segments and a catalog entry.
return mergeDatasource(dsMetadata, dsSpec);
}
}
private DruidTable emptyDatasource(String name, DatasourceFacade dsSpec)
{
RowSignature.Builder builder = RowSignature.builder();
Map<String, EffectiveColumnMetadata> columns = new HashMap<>();
boolean hasTime = false;
for (ColumnSpec col : dsSpec.columns()) {
EffectiveColumnMetadata colMetadata = columnFromCatalog(col, null);
if (colMetadata.name().equals(Columns.TIME_COLUMN)) {
hasTime = true;
}
builder.add(col.name(), colMetadata.druidType());
columns.put(col.name(), colMetadata);
}
if (!hasTime) {
columns.put(Columns.TIME_COLUMN, new EffectiveColumnMetadata(
Columns.TIME_COLUMN,
ColumnType.LONG
));
builder = RowSignature.builder()
.add(Columns.TIME_COLUMN, ColumnType.LONG)
.addAll(builder.build());
}
final PhysicalDatasourceMetadata mergedMetadata = new PhysicalDatasourceMetadata(
new TableDataSource(name),
builder.build(),
false, // Cannot join to an empty table
false // Cannot broadcast an empty table
);
return new DatasourceTable(
mergedMetadata.getRowSignature(),
mergedMetadata,
new EffectiveMetadata(dsSpec, columns, true)
);
}
private EffectiveColumnMetadata columnFromCatalog(ColumnSpec col, ColumnType physicalType)
{
ColumnType type = Columns.druidType(col);
if (type != null) {
// Use the type that the user provided.
} else if (physicalType == null) {
// Corner case: the user has defined a column in the catalog, has
// not specified a type (meaning the user wants Druid to decide), but
// there is no data at this moment. Guess String as the type for the
// null values. If new segments appear between now and execution, we'll
// convert the values to string, which is always safe.
type = ColumnType.STRING;
} else {
type = physicalType;
}
return new EffectiveColumnMetadata(col.name(), type);
}
private DruidTable mergeDatasource(
final PhysicalDatasourceMetadata dsMetadata,
final DatasourceFacade dsSpec)
{
final RowSignature physicalSchema = dsMetadata.getRowSignature();
Set<String> physicalCols = new HashSet<>(physicalSchema.getColumnNames());
// Merge columns. All catalog-defined columns come first,
// in the order defined in the catalog.
final RowSignature.Builder builder = RowSignature.builder();
Map<String, EffectiveColumnMetadata> columns = new HashMap<>();
for (ColumnSpec col : dsSpec.columns()) {
ColumnType physicalType = null;
if (physicalCols.remove(col.name())) {
physicalType = dsMetadata.getRowSignature().getColumnType(col.name()).get();
}
EffectiveColumnMetadata colMetadata = columnFromCatalog(col, physicalType);
builder.add(col.name(), colMetadata.druidType());
columns.put(col.name(), colMetadata);
}
// Mark any hidden columns. Assumes that the hidden columns are a disjoint set
// from the defined columns.
if (dsSpec.hiddenColumns() != null) {
for (String colName : dsSpec.hiddenColumns()) {
physicalCols.remove(colName);
}
}
// Any remaining columns follow, if not marked as hidden
// in the catalog.
for (int i = 0; i < physicalSchema.size(); i++) {
String colName = physicalSchema.getColumnName(i);
if (!physicalCols.contains(colName)) {
continue;
}
ColumnType physicalType = dsMetadata.getRowSignature().getColumnType(colName).get();
EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, physicalType);
columns.put(colName, colMetadata);
builder.add(colName, physicalType);
}
EffectiveMetadata effectiveMetadata = new EffectiveMetadata(dsSpec, columns, false);
return new DatasourceTable(builder.build(), dsMetadata, effectiveMetadata);
}
@Override
public boolean ingestRequiresExistingTable()
{
return false;
}
@Override
public Set<String> getTableNames(Set<String> datasourceNames)
{
Set<String> catalogTableNames = catalog.tableNames(TableId.DRUID_SCHEMA);
if (catalogTableNames.isEmpty()) {
return datasourceNames;
}
return ImmutableSet.<String>builder()
.addAll(datasourceNames)
.addAll(catalogTableNames)
.build();
}
}

View File

@ -277,6 +277,22 @@ public class CachedMetadataCatalog implements MetadataCatalog, CatalogUpdateList
});
return tables;
}
/**
* Populate the cache by asking the catalog source for all tables for
* this schema.
*/
public synchronized void resync(CatalogSource source)
{
List<TableMetadata> tables = source.tablesForSchema(schema.name());
cache.clear();
for (TableMetadata table : tables) {
cache.compute(
table.id().name(),
(k, v) -> computeCreate(v, table)
);
}
}
}
private final ConcurrentHashMap<String, SchemaEntry> schemaCache = new ConcurrentHashMap<>();
@ -326,6 +342,12 @@ public class CachedMetadataCatalog implements MetadataCatalog, CatalogUpdateList
}
}
/**
* Get the list of table names <i>in the cache</i>. Does not attempt to
* lazy load the list since doing so is costly: we don't know when it
* might be out of date. Rely on priming the cache, and update notifications
* to keep the list accurate.
*/
@Override
public Set<String> tableNames(String schemaName)
{
@ -333,8 +355,13 @@ public class CachedMetadataCatalog implements MetadataCatalog, CatalogUpdateList
return schemaEntry == null ? Collections.emptySet() : schemaEntry.tableNames();
}
/**
* Clear the cache. Primarily for testing.
*/
@Override
public void flush()
{
LOG.info("Flush requested");
schemaCache.clear();
}
@ -347,4 +374,18 @@ public class CachedMetadataCatalog implements MetadataCatalog, CatalogUpdateList
return schema == null ? null : new SchemaEntry(schema);
});
}
/**
* Discard any existing cached tables and reload directly from the
* catalog source. Manages the two schemas which the catalog manages.
* If the catalog were to manage others, add those here as well.
* Done both at Broker startup, and on demand for testing.
*/
@Override
public void resync()
{
LOG.info("Resync requested");
entryFor(TableId.DRUID_SCHEMA).resync(base);
entryFor(TableId.EXTERNAL_SCHEMA).resync(base);
}
}

View File

@ -86,7 +86,7 @@ public class CatalogClient implements CatalogSource
@Override
public List<TableMetadata> tablesForSchema(String dbSchema)
{
String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{dbSchema}", dbSchema);
String url = StringUtils.replace(SCHEMA_SYNC_PATH, "{schema}", dbSchema);
List<TableMetadata> results = send(url, LIST_OF_TABLE_METADATA_TYPE);
// Not found for a list is an empty list.
@ -96,7 +96,7 @@ public class CatalogClient implements CatalogSource
@Override
public TableMetadata table(TableId id)
{
String url = StringUtils.replace(TABLE_SYNC_PATH, "{dbSchema}", id.schema());
String url = StringUtils.replace(TABLE_SYNC_PATH, "{schema}", id.schema());
url = StringUtils.replace(url, "{name}", id.name());
return send(url, TABLE_METADATA_TYPE);
}

View File

@ -28,4 +28,6 @@ package org.apache.druid.catalog.sync;
public interface CatalogUpdateListener
{
void updated(UpdateEvent event);
void flush();
void resync();
}

View File

@ -87,14 +87,14 @@ public class CatalogUpdateNotifier implements CatalogUpdateListener
public void start()
{
notifier.start();
LOG.info("Catalog catalog update notifier started");
LOG.info("Catalog update notifier started");
}
@LifecycleStop
public void stop()
{
notifier.stop();
LOG.info("Catalog catalog update notifier stopped");
LOG.info("Catalog update notifier stopped");
}
@Override
@ -102,4 +102,16 @@ public class CatalogUpdateNotifier implements CatalogUpdateListener
{
notifier.send(JacksonUtils.toBytes(smileMapper, event));
}
@Override
public void flush()
{
// Not generated on this path
}
@Override
public void resync()
{
// Not generated on this path
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sync;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.emitter.EmittingLogger;
import javax.inject.Inject;
/**
* Receiver which runs in the Broker to listen for catalog updates from the
* Coordinator. To prevent slowing initial queries, this class loads the
* current catalog contents into the local cache on lifecycle start, which
* avoids the on-demand reads that would otherwise occur. After the first load,
* events from the Coordinator keep the local cache evergreen.
*/
@ManageLifecycle
public class CatalogUpdateReceiver
{
private static final EmittingLogger LOG = new EmittingLogger(CatalogUpdateReceiver.class);
private final CachedMetadataCatalog cachedCatalog;
@Inject
public CatalogUpdateReceiver(
final CachedMetadataCatalog cachedCatalog
)
{
this.cachedCatalog = cachedCatalog;
}
@LifecycleStart
public void start()
{
cachedCatalog.resync();
LOG.info("Catalog update receiver started");
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License.
org.apache.druid.catalog.guice.CatalogCoordinatorModule
org.apache.druid.catalog.guice.CatalogBrokerModule

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlSchema;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.fail;
public class CatalogQueryTest extends BaseCalciteQueryTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private CatalogTests.DbFixture dbFixture;
private CatalogStorage storage;
@Test
public void testCatalogSchema()
{
SqlSchema schema = SqlSchema.builder()
.column("__time", "TIMESTAMP(3) NOT NULL")
.column("extra1", "VARCHAR")
.column("dim2", "VARCHAR")
.column("dim1", "VARCHAR")
.column("cnt", "BIGINT NOT NULL")
.column("m1", "DOUBLE NOT NULL")
.column("extra2", "BIGINT NOT NULL")
.column("extra3", "VARCHAR")
.column("m2", "DOUBLE NOT NULL")
.build();
testBuilder()
.sql("SELECT * FROM foo ORDER BY __time LIMIT 1")
.expectedResources(Collections.singletonList(dataSourceRead("foo")))
//.expectedSqlSchema(schema)
.run();
}
@After
public void catalogTearDown()
{
CatalogTests.tearDown(dbFixture);
}
@Override
public CatalogResolver createCatalogResolver()
{
dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
storage = dbFixture.storage;
MetadataCatalog catalog = new CachedMetadataCatalog(
storage,
storage.schemaRegistry(),
storage.jsonMapper()
);
return new LiveCatalogResolver(catalog);
}
@Override
public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
{
super.finalizeTestFramework(sqlTestFramework);
buildFooDatasource();
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (CatalogException e) {
fail(e.getMessage());
}
}
public void buildFooDatasource()
{
TableMetadata spec = TableBuilder.datasource("foo", "ALL")
.timeColumn()
.column("extra1", null)
.column("dim2", null)
.column("dim1", null)
.column("cnt", null)
.column("m1", Columns.DOUBLE)
.column("extra2", Columns.LONG)
.column("extra3", Columns.STRING)
.hiddenColumns(Arrays.asList("dim3", "unique_dim1"))
.build();
createTableMetadata(spec);
}
}

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException.DuplicateKeyException;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.LocalMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable.PhysicalDatasourceMetadata;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
/**
* Test for the datasource resolution aspects of the live catalog resolver.
* Too tedious to test the insert resolution in its current state.
*/
public class LiveCatalogTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private CatalogTests.DbFixture dbFixture;
private CatalogStorage storage;
private CatalogResolver resolver;
@Before
public void setUp()
{
dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
storage = dbFixture.storage;
MetadataCatalog catalog = new LocalMetadataCatalog(storage, storage.schemaRegistry());
resolver = new LiveCatalogResolver(catalog);
}
@After
public void tearDown()
{
CatalogTests.tearDown(dbFixture);
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (DuplicateKeyException e) {
fail(e.getMessage());
}
}
/**
* Populate the catalog with a few items using the REST resource.
*/
private void populateCatalog(boolean withTimeCol)
{
TableMetadata table = TableBuilder.datasource("trivial", "PT1D")
.build();
createTableMetadata(table);
TableBuilder builder = TableBuilder.datasource("merge", "PT1D");
if (withTimeCol) {
builder.timeColumn();
}
table = builder
.column("dsa", null)
.column("dsb", Columns.STRING)
.column("dsc", Columns.LONG)
.column("dsd", Columns.FLOAT)
.column("dse", Columns.DOUBLE)
.column("newa", null)
.column("newb", Columns.STRING)
.column("newc", Columns.LONG)
.column("newd", Columns.FLOAT)
.column("newe", Columns.DOUBLE)
.hiddenColumns(Arrays.asList("dsf", "dsg"))
.build();
createTableMetadata(table);
}
private PhysicalDatasourceMetadata mockDatasource()
{
RowSignature sig = RowSignature.builder()
.add(Columns.TIME_COLUMN, ColumnType.LONG)
.add("dsa", ColumnType.DOUBLE)
.add("dsb", ColumnType.LONG)
.add("dsc", ColumnType.STRING)
.add("dsd", ColumnType.LONG)
.add("dse", ColumnType.FLOAT)
.add("dsf", ColumnType.STRING)
.add("dsg", ColumnType.LONG)
.add("dsh", ColumnType.DOUBLE)
.build();
return new PhysicalDatasourceMetadata(
new TableDataSource("merge"),
sig,
true,
true
);
}
@Test
public void testUnknownTable()
{
// No catalog, no datasource
assertNull(resolver.resolveDatasource("bogus", null));
// No catalog entry
PhysicalDatasourceMetadata dsMetadata = mockDatasource();
DruidTable table = resolver.resolveDatasource("merge", dsMetadata);
assertSame(dsMetadata.getRowSignature(), table.getRowSignature());
}
@Test
public void testKnownTableNoTime()
{
populateCatalog(false);
// Catalog, no datasource
DruidTable table = resolver.resolveDatasource("merge", null);
assertEquals(11, table.getRowSignature().size());
assertEquals("merge", ((TableDataSource) table.getDataSource()).getName());
// Spot check
assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
assertColumnEquals(table, 1, "dsa", ColumnType.STRING);
assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
// Catalog, with datasource, result is merged
// Catalog has no time column
PhysicalDatasourceMetadata dsMetadata = mockDatasource();
table = resolver.resolveDatasource("merge", dsMetadata);
assertEquals(12, table.getRowSignature().size());
assertSame(dsMetadata.dataSource(), table.getDataSource());
assertEquals(dsMetadata.isBroadcast(), table.isBroadcast());
assertEquals(dsMetadata.isJoinable(), table.isJoinable());
// dsa uses Druid's type, others coerce the type
assertColumnEquals(table, 0, "dsa", ColumnType.DOUBLE);
assertColumnEquals(table, 1, "dsb", ColumnType.STRING);
assertColumnEquals(table, 2, "dsc", ColumnType.LONG);
assertColumnEquals(table, 3, "dsd", ColumnType.FLOAT);
assertColumnEquals(table, 4, "dse", ColumnType.DOUBLE);
assertColumnEquals(table, 5, "newa", ColumnType.STRING);
assertColumnEquals(table, 9, "newe", ColumnType.DOUBLE);
assertColumnEquals(table, 10, Columns.TIME_COLUMN, ColumnType.LONG);
assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE);
}
@Test
public void testKnownTableWithTime()
{
populateCatalog(true);
// Catalog, no datasource
DruidTable table = resolver.resolveDatasource("merge", null);
assertEquals(11, table.getRowSignature().size());
assertEquals("merge", ((TableDataSource) table.getDataSource()).getName());
// Spot check
assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
assertColumnEquals(table, 1, "dsa", ColumnType.STRING);
assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
// Catalog, with datasource, result is merged
PhysicalDatasourceMetadata dsMetadata = mockDatasource();
table = resolver.resolveDatasource("merge", dsMetadata);
assertEquals(12, table.getRowSignature().size());
assertSame(dsMetadata.dataSource(), table.getDataSource());
assertEquals(dsMetadata.isBroadcast(), table.isBroadcast());
assertEquals(dsMetadata.isJoinable(), table.isJoinable());
assertColumnEquals(table, 0, Columns.TIME_COLUMN, ColumnType.LONG);
// dsa uses Druid's type, others coerce the type
assertColumnEquals(table, 1, "dsa", ColumnType.DOUBLE);
assertColumnEquals(table, 2, "dsb", ColumnType.STRING);
assertColumnEquals(table, 3, "dsc", ColumnType.LONG);
assertColumnEquals(table, 4, "dsd", ColumnType.FLOAT);
assertColumnEquals(table, 5, "dse", ColumnType.DOUBLE);
assertColumnEquals(table, 6, "newa", ColumnType.STRING);
assertColumnEquals(table, 10, "newe", ColumnType.DOUBLE);
assertColumnEquals(table, 11, "dsh", ColumnType.DOUBLE);
}
private void assertColumnEquals(DruidTable table, int i, String name, ColumnType type)
{
RowSignature sig = table.getRowSignature();
assertEquals(name, sig.getColumnName(i));
assertEquals(type, sig.getColumnType(i).get());
}
}

View File

@ -207,8 +207,8 @@ public class TableManagerTest
DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000
);
List<ColumnSpec> cols = Arrays.asList(
new ColumnSpec("a", Columns.VARCHAR, null),
new ColumnSpec("b", Columns.BIGINT, null)
new ColumnSpec("a", Columns.STRING, null),
new ColumnSpec("b", Columns.LONG, null)
);
ColumnSpec colC = new ColumnSpec("c", Columns.DOUBLE, null);

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sync;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.catalog.CatalogException.DuplicateKeyException;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.metadata.TestDerbyConnector;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Builds on the generic {@link CatalogSyncTest} to focus on cache-specific
* operations.
*/
public class CatalogCacheTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private CatalogTests.DbFixture dbFixture;
private CatalogStorage storage;
private ObjectMapper jsonMapper;
@Before
public void setUp()
{
dbFixture = new CatalogTests.DbFixture(derbyConnectorRule);
storage = dbFixture.storage;
jsonMapper = new ObjectMapper();
}
@After
public void tearDown()
{
CatalogTests.tearDown(dbFixture);
}
/**
* Test overall cache lifecycle. Detailed checks of contents is done
* in {@link CatalogSyncTest} and is not repeated here.
*/
@Test
public void testLifecycle() throws DuplicateKeyException
{
// Create entries with no listener.
TableMetadata table1 = TableBuilder.datasource("table1", "P1D")
.timeColumn()
.column("a", Columns.STRING)
.build();
storage.validate(table1);
storage.tables().create(table1);
// Create a listener. Starts with the cache empty
CachedMetadataCatalog cache1 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
storage.register(cache1);
assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
// Load table on demand.
assertNotNull(cache1.getTable(table1.id()));
assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size());
// Flush to empty the cache.
cache1.flush();
assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
// Resync to reload the cache.
cache1.resync();
assertEquals(1, cache1.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache1.getTable(table1.id()));
// Add a table: cache is updated.
TableMetadata table2 = TableBuilder.datasource("table2", "P1D")
.timeColumn()
.column("dim", Columns.STRING)
.column("measure", Columns.LONG)
.build();
storage.validate(table2);
storage.tables().create(table2);
assertEquals(2, cache1.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache1.getTable(table2.id()));
// Second listener. Starts with the cache empty.
CachedMetadataCatalog cache2 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
storage.register(cache2);
assertTrue(cache2.tableNames(TableId.DRUID_SCHEMA).isEmpty());
// Second listener resyncs.
cache2.resync();
assertEquals(2, cache2.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache2.getTable(table1.id()));
assertNotNull(cache2.getTable(table2.id()));
// Add a third table: both caches updated.
TableMetadata table3 = TableBuilder.datasource("table3", "PT1H")
.timeColumn()
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
storage.validate(table3);
storage.tables().create(table3);
assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache1.getTable(table3.id()));
assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache2.getTable(table3.id()));
// Another resync puts us back where we are.
cache1.flush();
assertTrue(cache1.tableNames(TableId.DRUID_SCHEMA).isEmpty());
cache1.resync();
assertEquals(3, cache1.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache1.getTable(table3.id()));
cache2.resync();
assertEquals(3, cache2.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache2.getTable(table3.id()));
// Third cache, managed by the receiver.
CachedMetadataCatalog cache3 = new CachedMetadataCatalog(storage, storage.schemaRegistry(), jsonMapper);
storage.register(cache3);
CatalogUpdateReceiver receiver = new CatalogUpdateReceiver(cache3);
receiver.start();
assertEquals(3, cache3.tableNames(TableId.DRUID_SCHEMA).size());
assertNotNull(cache3.getTable(table3.id()));
}
}

View File

@ -96,7 +96,7 @@ public class CatalogSyncTest
TableMetadata table = TableBuilder.external("externTable")
.inputSource(toMap(new InlineInputSource("a\nc")))
.inputFormat(BaseExternTableTest.CSV_FORMAT)
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
storage.validate(table);
}
@ -114,7 +114,7 @@ public class CatalogSyncTest
{
TableMetadata table = TableBuilder.external("externTable")
.inputSource(toMap(new InlineInputSource("a\nc")))
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
assertThrows(IAE.class, () -> storage.validate(table));
}
@ -195,15 +195,15 @@ public class CatalogSyncTest
{
TableMetadata table1 = TableBuilder.datasource("table1", "P1D")
.timeColumn()
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
storage.validate(table1);
storage.tables().create(table1);
TableMetadata table2 = TableBuilder.datasource("table2", "P1D")
.timeColumn()
.column("dim", Columns.VARCHAR)
.column("measure", "BIGINT")
.column("dim", Columns.STRING)
.column("measure", Columns.LONG)
.build();
storage.validate(table2);
storage.tables().create(table2);
@ -211,7 +211,7 @@ public class CatalogSyncTest
TableMetadata table3 = TableBuilder.external("table3")
.inputFormat(BaseExternTableTest.CSV_FORMAT)
.inputSource(toMap(new InlineInputSource("a\nc")))
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
storage.validate(table3);
storage.tables().create(table3);
@ -230,9 +230,9 @@ public class CatalogSyncTest
List<ColumnSpec> cols = dsSpec.columns();
assertEquals(2, cols.size());
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType());
assertEquals(Columns.LONG, cols.get(0).dataType());
assertEquals("a", cols.get(1).name());
assertEquals(Columns.VARCHAR, cols.get(1).sqlType());
assertEquals(Columns.STRING, cols.get(1).dataType());
DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id));
assertEquals("P1D", ds.segmentGranularityString());
@ -249,11 +249,11 @@ public class CatalogSyncTest
assertEquals(3, cols.size());
assertEquals("__time", cols.get(0).name());
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
assertEquals(Columns.TIMESTAMP, cols.get(0).sqlType());
assertEquals(Columns.LONG, cols.get(0).dataType());
assertEquals("dim", cols.get(1).name());
assertEquals(Columns.VARCHAR, cols.get(1).sqlType());
assertEquals(Columns.STRING, cols.get(1).dataType());
assertEquals("measure", cols.get(2).name());
assertEquals("BIGINT", cols.get(2).sqlType());
assertEquals(Columns.LONG, cols.get(2).dataType());
DatasourceFacade ds = new DatasourceFacade(catalog.resolveTable(id));
assertEquals("P1D", ds.segmentGranularityString());
@ -273,7 +273,7 @@ public class CatalogSyncTest
List<ColumnSpec> cols = inputSpec.columns();
assertEquals(1, cols.size());
assertEquals("a", cols.get(0).name());
assertEquals(Columns.VARCHAR, cols.get(0).sqlType());
assertEquals(Columns.STRING, cols.get(0).dataType());
assertNotNull(inputSpec.properties());
}
@ -303,7 +303,7 @@ public class CatalogSyncTest
// Create a table 3
TableMetadata table3 = TableBuilder.datasource("table3", "P1D")
.timeColumn()
.column("x", "FLOAT")
.column("x", Columns.FLOAT)
.build();
storage.tables().create(table3);
}
@ -320,7 +320,7 @@ public class CatalogSyncTest
assertEquals(Columns.TIME_COLUMN, cols.get(0).name());
assertEquals("a", cols.get(1).name());
assertEquals("b", cols.get(2).name());
assertEquals(Columns.DOUBLE, cols.get(2).sqlType());
assertEquals(Columns.DOUBLE, cols.get(2).dataType());
}
{
TableId id = TableId.datasource("table3");

View File

@ -55,4 +55,14 @@ public class MockCatalogSync implements CatalogUpdateListener
{
return catalog;
}
@Override
public void flush()
{
}
@Override
public void resync()
{
}
}

View File

@ -150,9 +150,9 @@ public class CatalogResourceTest
TableSpec inputSpec = TableBuilder.external("inline")
.inputSource(toMap(new InlineInputSource("a,b,1\nc,d,2\n")))
.inputFormat(BaseExternTableTest.CSV_FORMAT)
.column("a", Columns.VARCHAR)
.column("b", Columns.VARCHAR)
.column("c", Columns.BIGINT)
.column("a", Columns.STRING)
.column("b", Columns.STRING)
.column("c", Columns.LONG)
.buildSpec();
resp = resource.postTable(TableId.EXTERNAL_SCHEMA, "inline", inputSpec, 0, false, postBy(CatalogTests.WRITER_USER));
assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());

View File

@ -401,7 +401,7 @@ public class EditorTest
// Add a column
cmd = new UpdateColumns(
Collections.singletonList(
new ColumnSpec("d", Columns.VARCHAR, null)
new ColumnSpec("d", Columns.STRING, null)
)
);
TableMetadata revised = doEdit(tableName, cmd);
@ -411,14 +411,14 @@ public class EditorTest
);
ColumnSpec colD = revised.spec().columns().get(3);
assertEquals("d", colD.name());
assertEquals(Columns.VARCHAR, colD.sqlType());
assertEquals(Columns.STRING, colD.dataType());
// Update a column
cmd = new UpdateColumns(
Collections.singletonList(
new ColumnSpec(
"a",
Columns.BIGINT,
Columns.LONG,
ImmutableMap.of("foo", "bar")
)
)
@ -430,13 +430,13 @@ public class EditorTest
);
ColumnSpec colA = revised.spec().columns().get(0);
assertEquals("a", colA.name());
assertEquals(Columns.BIGINT, colA.sqlType());
assertEquals(Columns.LONG, colA.dataType());
assertEquals(ImmutableMap.of("foo", "bar"), colA.properties());
// Duplicates
UpdateColumns cmd2 = new UpdateColumns(
Arrays.asList(
new ColumnSpec("e", Columns.VARCHAR, null),
new ColumnSpec("e", Columns.STRING, null),
new ColumnSpec("e", null, null)
)
);
@ -445,7 +445,7 @@ public class EditorTest
// Valid time column type
cmd = new UpdateColumns(
Collections.singletonList(
new ColumnSpec(Columns.TIME_COLUMN, Columns.TIMESTAMP, null)
new ColumnSpec(Columns.TIME_COLUMN, Columns.LONG, null)
)
);
revised = doEdit(tableName, cmd);

View File

@ -0,0 +1,3 @@
LogicalInsert(target=[dst], partitionedBy=['ALL TIME'], clusteredBy=[<none>])
LogicalProject(__time=[$0], extra1=[$1], dim2=[$2], dim1=[$3], cnt=[$4], m1=[$5], extra2=[$6], extra3=[$7], m2=[$8])
LogicalTableScan(table=[[druid, foo]])

View File

@ -0,0 +1,4 @@
LogicalInsert(target=[druid.clusterBy], partitionedBy=[<none>], clusteredBy=[<none>])
LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
LogicalTableScan(table=[[druid, foo]])

View File

@ -0,0 +1,4 @@
LogicalInsert(target=[druid.clusterBy], partitionedBy=[<none>], clusteredBy=[`floor_m1`, `dim1` DESC])
LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
LogicalTableScan(table=[[druid, foo]])

View File

@ -0,0 +1,4 @@
LogicalInsert(target=[druid.clusterBy], partitionedBy=[<none>], clusteredBy=[<none>])
LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
LogicalTableScan(table=[[druid, foo]])

View File

@ -0,0 +1,4 @@
LogicalInsert(target=[dst], partitionedBy=[FLOOR(`__time` TO DAY)], clusteredBy=[`floor_m1`, `dim1` DESC])
LogicalSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[DESC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$3], ceil_m2=[CEIL($8)])
LogicalTableScan(table=[[druid, foo]])

View File

@ -529,7 +529,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
new PlannerConfig(),
viewManager,
new NoopDruidSchemaManager(),
CalciteTests.TEST_AUTHORIZER_MAPPER
CalciteTests.TEST_AUTHORIZER_MAPPER,
CatalogResolver.NULL_RESOLVER
);
final SqlEngine engine = new MSQTaskSqlEngine(

View File

@ -68,8 +68,8 @@ import static org.junit.Assert.assertTrue;
public class S3InputSourceDefnTest
{
private static final List<ColumnSpec> COLUMNS = Arrays.asList(
new ColumnSpec("x", Columns.VARCHAR, null),
new ColumnSpec("y", Columns.BIGINT, null)
new ColumnSpec("x", Columns.STRING, null),
new ColumnSpec("y", Columns.LONG, null)
);
/**
@ -110,7 +110,7 @@ public class S3InputSourceDefnTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -176,7 +176,7 @@ public class S3InputSourceDefnTest
Collections.singletonList("s3://foo/bar/file.csv"), null, null, null);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -215,7 +215,7 @@ public class S3InputSourceDefnTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -228,7 +228,7 @@ public class S3InputSourceDefnTest
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "s3://foo.com")
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -243,7 +243,7 @@ public class S3InputSourceDefnTest
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -262,7 +262,7 @@ public class S3InputSourceDefnTest
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -281,7 +281,7 @@ public class S3InputSourceDefnTest
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -298,7 +298,7 @@ public class S3InputSourceDefnTest
)
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -564,8 +564,8 @@ public class S3InputSourceDefnTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -602,8 +602,8 @@ public class S3InputSourceDefnTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(s3InputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -646,8 +646,8 @@ public class S3InputSourceDefnTest
.inputSource(ImmutableMap.of("type", S3StorageDruidModule.SCHEME))
.inputFormat(CSV_FORMAT)
.property(S3InputSourceDefn.BUCKET_PROPERTY, "foo.com")
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation

View File

@ -98,7 +98,7 @@ public class CatalogUtils
return type.cast(value);
}
catch (ClassCastException e) {
throw new IAE("Value [%s] is not valid for property %s, expected type %s",
throw new IAE("Value [%s] is not valid for property [%s], expected type [%s]",
value,
key,
type.getSimpleName()

View File

@ -35,8 +35,7 @@ import java.util.Map;
import java.util.Objects;
/**
* Specification of table columns. Columns have multiple types
* represented via the type field.
* Specification of table columns.
*/
@UnstableApi
public class ColumnSpec
@ -49,14 +48,10 @@ public class ColumnSpec
private final String name;
/**
* The data type of the column expressed as a supported SQL type. The data type here must
* directly match a Druid storage type. So, {@code BIGINT} for {code long}, say.
* This usage does not support Druid's usual "fudging": one cannot use {@code INTEGER}
* to mean {@code long}. The type will likely encode complex and aggregation types
* in the future, though that is not yet supported. The set of valid mappings is
* defined in the {@link Columns} class.
* The data type of the column expressed as a supported Druid type. The data type here must
* directly match a Druid storage type.
*/
private final String sqlType;
private final String dataType;
/**
* Properties for the column. At present, these are all user and application defined.
@ -69,18 +64,18 @@ public class ColumnSpec
@JsonCreator
public ColumnSpec(
@JsonProperty("name")final String name,
@JsonProperty("sqlType") @Nullable final String sqlType,
@JsonProperty("dataType") @Nullable final String dataType,
@JsonProperty("properties") @Nullable final Map<String, Object> properties
)
{
this.name = name;
this.sqlType = sqlType;
this.dataType = dataType;
this.properties = properties == null ? Collections.emptyMap() : properties;
}
public ColumnSpec(ColumnSpec from)
{
this(from.name, from.sqlType, from.properties);
this(from.name, from.dataType, from.properties);
}
@JsonProperty("name")
@ -89,11 +84,11 @@ public class ColumnSpec
return name;
}
@JsonProperty("sqlType")
@JsonProperty("dataType")
@JsonInclude(Include.NON_NULL)
public String sqlType()
public String dataType()
{
return sqlType;
return dataType;
}
@JsonProperty("properties")
@ -108,6 +103,16 @@ public class ColumnSpec
if (Strings.isNullOrEmpty(name)) {
throw new IAE("Column name is required");
}
if (Columns.isTimeColumn(name)) {
if (dataType != null && !Columns.LONG.equalsIgnoreCase(dataType)) {
throw new IAE(
"[%s] column must have type [%s] or no type. Found [%s]",
name,
Columns.LONG,
dataType
);
}
}
// Validate type in the next PR
}
@ -126,7 +131,7 @@ public class ColumnSpec
final ColumnSpec update
)
{
String revisedType = update.sqlType() == null ? sqlType() : update.sqlType();
String revisedType = update.dataType() == null ? dataType() : update.dataType();
Map<String, Object> revisedProps = CatalogUtils.mergeProperties(
columnProperties,
properties(),
@ -152,7 +157,7 @@ public class ColumnSpec
}
ColumnSpec other = (ColumnSpec) o;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.sqlType, other.sqlType)
&& Objects.equals(this.dataType, other.dataType)
&& Objects.equals(this.properties, other.properties);
}
@ -161,7 +166,7 @@ public class ColumnSpec
{
return Objects.hash(
name,
sqlType,
dataType,
properties
);
}

View File

@ -20,94 +20,89 @@
package org.apache.druid.catalog.model;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class Columns
{
public static final String TIME_COLUMN = "__time";
public static final String VARCHAR = "VARCHAR";
public static final String BIGINT = "BIGINT";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String VARCHAR_ARRAY = "VARCHAR ARRAY";
public static final String BIGINT_ARRAY = "BIGINT ARRAY";
public static final String FLOAT_ARRAY = "FLOAT ARRAY";
public static final String DOUBLE_ARRAY = "DOUBLE ARRAY";
public static final String TIMESTAMP = "TIMESTAMP";
public static final String STRING = ValueType.STRING.name();
public static final String LONG = ValueType.LONG.name();
public static final String FLOAT = ValueType.FLOAT.name();
public static final String DOUBLE = ValueType.DOUBLE.name();
public static final Set<String> NUMERIC_TYPES =
ImmutableSet.of(BIGINT, FLOAT, DOUBLE);
public static final Set<String> SCALAR_TYPES =
ImmutableSet.of(TIMESTAMP, VARCHAR, BIGINT, FLOAT, DOUBLE);
public static final String SQL_VARCHAR = "VARCHAR";
public static final String SQL_BIGINT = "BIGINT";
public static final String SQL_FLOAT = "FLOAT";
public static final String SQL_DOUBLE = "DOUBLE";
public static final String SQL_VARCHAR_ARRAY = "VARCHAR ARRAY";
public static final String SQL_BIGINT_ARRAY = "BIGINT ARRAY";
public static final String SQL_FLOAT_ARRAY = "FLOAT ARRAY";
public static final String SQL_DOUBLE_ARRAY = "DOUBLE ARRAY";
public static final String SQL_TIMESTAMP = "TIMESTAMP";
public static final Map<String, ColumnType> SQL_TO_DRUID_TYPES =
new ImmutableMap.Builder<String, ColumnType>()
.put(TIMESTAMP, ColumnType.LONG)
.put(BIGINT, ColumnType.LONG)
.put(FLOAT, ColumnType.FLOAT)
.put(DOUBLE, ColumnType.DOUBLE)
.put(VARCHAR, ColumnType.STRING)
.put(VARCHAR_ARRAY, ColumnType.STRING_ARRAY)
.put(BIGINT_ARRAY, ColumnType.LONG_ARRAY)
.put(FLOAT_ARRAY, ColumnType.FLOAT_ARRAY)
.put(DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY)
.put(SQL_TIMESTAMP, ColumnType.LONG)
.put(SQL_BIGINT, ColumnType.LONG)
.put(SQL_FLOAT, ColumnType.FLOAT)
.put(SQL_DOUBLE, ColumnType.DOUBLE)
.put(SQL_VARCHAR, ColumnType.STRING)
.put(SQL_VARCHAR_ARRAY, ColumnType.STRING_ARRAY)
.put(SQL_BIGINT_ARRAY, ColumnType.LONG_ARRAY)
.put(SQL_FLOAT_ARRAY, ColumnType.FLOAT_ARRAY)
.put(SQL_DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY)
.build();
public static final Map<ColumnType, String> DRUID_TO_SQL_TYPES =
new ImmutableMap.Builder<ColumnType, String>()
.put(ColumnType.LONG, BIGINT)
.put(ColumnType.LONG, SQL_BIGINT)
.put(ColumnType.FLOAT, FLOAT)
.put(ColumnType.DOUBLE, DOUBLE)
.put(ColumnType.STRING, VARCHAR)
.put(ColumnType.STRING_ARRAY, VARCHAR_ARRAY)
.put(ColumnType.LONG_ARRAY, BIGINT_ARRAY)
.put(ColumnType.FLOAT_ARRAY, FLOAT_ARRAY)
.put(ColumnType.DOUBLE_ARRAY, DOUBLE_ARRAY)
.put(ColumnType.STRING, SQL_VARCHAR)
.put(ColumnType.STRING_ARRAY, SQL_VARCHAR_ARRAY)
.put(ColumnType.LONG_ARRAY, SQL_BIGINT_ARRAY)
.put(ColumnType.FLOAT_ARRAY, SQL_FLOAT_ARRAY)
.put(ColumnType.DOUBLE_ARRAY, SQL_DOUBLE_ARRAY)
.build();
private Columns()
{
}
public static boolean isTimestamp(String type)
public static ColumnType druidType(ColumnSpec spec)
{
return TIMESTAMP.equalsIgnoreCase(type.trim());
if (isTimeColumn(spec.name())) {
return ColumnType.LONG;
}
public static boolean isScalar(String type)
{
return SCALAR_TYPES.contains(StringUtils.toUpperCase(type.trim()));
}
public static ColumnType druidType(String sqlType)
{
if (sqlType == null) {
String dataType = spec.dataType();
if (dataType == null) {
return null;
}
ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(sqlType));
ColumnType druidType = SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(dataType));
if (druidType != null) {
return druidType;
}
return ColumnType.fromString(sqlType);
return ColumnType.fromString(dataType);
}
public static void validateScalarColumn(String name, String type)
public static String sqlType(ColumnSpec spec)
{
if (type == null) {
return;
if (isTimeColumn(spec.name())) {
return SQL_TIMESTAMP;
}
if (!Columns.isScalar(type)) {
throw new IAE("Not a supported SQL type: " + type);
ColumnType druidType = druidType(spec);
if (druidType == null) {
return null;
}
String sqlType = DRUID_TO_SQL_TYPES.get(druidType);
return sqlType == null ? druidType.asTypeString() : sqlType;
}
public static boolean isTimeColumn(String name)
@ -119,10 +114,7 @@ public class Columns
{
RowSignature.Builder builder = RowSignature.builder();
for (ColumnSpec col : columns) {
ColumnType druidType = null;
if (col.sqlType() != null) {
druidType = Columns.druidType(col.sqlType());
}
ColumnType druidType = druidType(col);
if (druidType == null) {
druidType = ColumnType.STRING;
}

View File

@ -85,6 +85,11 @@ public class TableId
return StringUtils.format("\"%s\".\"%s\"", schema, name);
}
public String unquoted()
{
return StringUtils.format("%s.%s", schema, name);
}
@Override
public String toString()
{

View File

@ -19,15 +19,21 @@
package org.apache.druid.catalog.model.facade;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Convenience wrapper on top of a resolved table (a table spec and its corresponding
@ -38,10 +44,77 @@ public class DatasourceFacade extends TableFacade
{
private static final Logger LOG = new Logger(DatasourceFacade.class);
public static class ColumnFacade
{
public enum Kind
{
ANY,
TIME,
DIMENSION,
MEASURE
}
private final ColumnSpec spec;
private final String sqlType;
public ColumnFacade(ColumnSpec spec)
{
this.spec = spec;
if (Columns.isTimeColumn(spec.name()) && spec.dataType() == null) {
// For __time only, force a type if type is null.
this.sqlType = Columns.LONG;
} else {
this.sqlType = Columns.sqlType(spec);
}
}
public ColumnSpec spec()
{
return spec;
}
public boolean hasType()
{
return sqlType != null;
}
public boolean isTime()
{
return Columns.isTimeColumn(spec.name());
}
public ColumnType druidType()
{
return Columns.druidType(spec);
}
public String sqlStorageType()
{
return sqlType;
}
@Override
public String toString()
{
return "{spec=" + spec + ", sqlTtype=" + sqlType + "}";
}
}
private final List<ColumnFacade> columns;
private final Map<String, ColumnFacade> columnIndex;
public DatasourceFacade(ResolvedTable resolved)
{
super(resolved);
this.columns = resolved.spec().columns()
.stream()
.map(col -> new ColumnFacade(col))
.collect(Collectors.toList());
ImmutableMap.Builder<String, ColumnFacade> builder = ImmutableMap.builder();
for (ColumnFacade col : columns) {
builder.put(col.spec.name(), col);
}
columnIndex = builder.build();
}
public String segmentGranularityString()
@ -89,4 +162,14 @@ public class DatasourceFacade extends TableFacade
{
return booleanProperty(DatasourceDefn.SEALED_PROPERTY);
}
public List<ColumnFacade> columnFacades()
{
return columns;
}
public ColumnFacade column(String name)
{
return columnIndex.get(name);
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.catalog.model.facade;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -40,7 +39,7 @@ public class ExternalTableFacade extends TableFacade
List<ColumnSpec> columns = spec().columns();
RowSignature.Builder builder = RowSignature.builder();
for (ColumnSpec col : columns) {
ColumnType druidType = Columns.SQL_TO_DRUID_TYPES.get(StringUtils.toUpperCase(col.sqlType()));
ColumnType druidType = Columns.druidType(col);
if (druidType == null) {
druidType = ColumnType.STRING;
}

View File

@ -61,11 +61,7 @@ public class TableFacade extends ObjectFacade
public static ColumnType druidType(ColumnSpec col)
{
if (Columns.isTimeColumn(col.name())) {
return ColumnType.LONG;
}
final String sqlType = col.sqlType();
return sqlType == null ? null : Columns.druidType(sqlType);
return Columns.druidType(col);
}
public ObjectMapper jsonMapper()

View File

@ -275,7 +275,7 @@ public abstract class BaseInputSourceDefn implements InputSourceDefn
return columns;
} else if (!CollectionUtils.isNullOrEmpty(columns)) {
throw new IAE(
"Catalog definition for the %s input source already contains column definitions",
"Catalog definition for the [%s] input source already contains column definitions",
typeValue()
);
} else {

View File

@ -68,16 +68,6 @@ public class DatasourceDefn extends TableDefn
*/
public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns";
/**
* By default: columns are optional hints. If a datasource has columns defined,
* well validate them, but MSQ and other tools are free to create additional columns.
* That is, we assume "auto-discovered" columns by default. However, in some use cases,
* the schema may be carefully designed. This is especially true for ETL use cases in
* which multiple input schemas are mapped into a single datasource schema designed for
* ease of end user use. In this second use case, we may want to reject an attempt to
* ingest columns other than those in the schema. To do that, set {@code sealed = true}.
* In other words, "sealed" mode works like a traditional RDBMS.
*/
public static final String SEALED_PROPERTY = "sealed";
public static final String TABLE_TYPE = "datasource";
@ -148,7 +138,7 @@ public class DatasourceDefn extends TableDefn
protected void validateColumn(ColumnSpec spec)
{
super.validateColumn(spec);
if (Columns.isTimeColumn(spec.name()) && spec.sqlType() != null) {
if (Columns.isTimeColumn(spec.name()) && spec.dataType() != null) {
// Validate type in next PR
}
}

View File

@ -205,6 +205,11 @@ public class ExternalTableDefn extends TableDefn
*/
public static final String TABLE_TYPE = "extern";
/**
* Column type for external tables.
*/
public static final String EXTERNAL_COLUMN_TYPE = "extern";
/**
* Property which holds the input source specification as serialized as JSON.
*/

View File

@ -109,7 +109,7 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
toAdd.add(prop);
} else if (existing.type() != prop.type()) {
throw new ISE(
"Format %s, property %s of class %s conflicts with another format property of class %s",
"Format [%s], property [%s] of class [%s] conflicts with another format property of class [%s]",
format.typeValue(),
prop.name(),
prop.type().sqlName(),
@ -131,7 +131,7 @@ public abstract class FormattedInputSourceDefn extends BaseInputSourceDefn
final InputFormatDefn formatDefn = formats.get(formatTag);
if (formatDefn == null) {
throw new IAE(
"Format type [%s] for property %s is not valid",
"Format type [%s] for property [%s] is not valid",
formatTag,
InputFormat.TYPE_PROPERTY
);

View File

@ -188,7 +188,7 @@ public class TableBuilder
public TableBuilder timeColumn()
{
return column(Columns.TIME_COLUMN, Columns.TIMESTAMP);
return column(Columns.TIME_COLUMN, Columns.LONG);
}
public TableBuilder column(String name, String sqlType)

View File

@ -39,8 +39,8 @@ public class BaseExternTableTest
{
public static final Map<String, Object> CSV_FORMAT = ImmutableMap.of("type", CsvInputFormat.TYPE_KEY);
protected static final List<ColumnSpec> COLUMNS = Arrays.asList(
new ColumnSpec("x", Columns.VARCHAR, null),
new ColumnSpec("y", Columns.BIGINT, null)
new ColumnSpec("x", Columns.STRING, null),
new ColumnSpec("y", Columns.LONG, null)
);
protected final ObjectMapper mapper = DefaultObjectMapper.INSTANCE;

View File

@ -48,7 +48,7 @@ public class CsvInputFormatTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(CSV_FORMAT)
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -70,8 +70,8 @@ public class CsvInputFormatTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
.column("a", Columns.VARCHAR)
.column("b", Columns.BIGINT)
.column("a", Columns.STRING)
.column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();

View File

@ -28,12 +28,13 @@ import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -49,6 +50,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@ -230,6 +232,8 @@ public class DatasourceTableTest
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
assertTrue(facade.columnFacades().isEmpty());
}
// OK to have no column type
@ -241,72 +245,45 @@ public class DatasourceTableTest
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
assertNotNull(facade.jsonMapper());
assertEquals(1, facade.properties().size());
assertEquals(1, facade.columnFacades().size());
ColumnFacade col = facade.columnFacades().get(0);
assertSame(spec.columns().get(0), col.spec());
assertFalse(col.isTime());
assertFalse(col.hasType());
assertNull(col.druidType());
}
// Can have a legal scalar type
{
TableSpec spec = builder.copy()
.column("foo", Columns.VARCHAR)
.column("foo", Columns.STRING)
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
assertEquals(1, facade.columnFacades().size());
ColumnFacade col = facade.columnFacades().get(0);
assertSame(spec.columns().get(0), col.spec());
assertFalse(col.isTime());
assertTrue(col.hasType());
assertSame(ColumnType.STRING, col.druidType());
}
// Reject duplicate columns
{
TableSpec spec = builder.copy()
.column("foo", Columns.VARCHAR)
.column("bar", Columns.BIGINT)
.column("foo", Columns.STRING)
.column("bar", Columns.LONG)
.buildSpec();
expectValidationSucceeds(spec);
}
{
TableSpec spec = builder.copy()
.column("foo", Columns.VARCHAR)
.column("foo", Columns.BIGINT)
.column("foo", Columns.STRING)
.column("foo", Columns.LONG)
.buildSpec();
expectValidationFails(spec);
}
{
TableSpec spec = builder.copy()
.column(Columns.TIME_COLUMN, null)
.column("s", Columns.VARCHAR)
.column("bi", Columns.BIGINT)
.column("f", Columns.FLOAT)
.column("d", Columns.DOUBLE)
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
}
}
@Test
public void testRollup()
{
TableMetadata table = TableBuilder.datasource("foo", "P1D")
.column(Columns.TIME_COLUMN, "TIMESTAMP('PT1M')")
.column("a", null)
.column("b", Columns.VARCHAR)
.column("c", "SUM(BIGINT)")
.build();
table.validate();
List<ColumnSpec> columns = table.spec().columns();
assertEquals(4, columns.size());
assertEquals(Columns.TIME_COLUMN, columns.get(0).name());
assertEquals("TIMESTAMP('PT1M')", columns.get(0).sqlType());
assertEquals("a", columns.get(1).name());
assertNull(columns.get(1).sqlType());
assertEquals("b", columns.get(2).name());
assertEquals(Columns.VARCHAR, columns.get(2).sqlType());
assertEquals("c", columns.get(3).name());
assertEquals("SUM(BIGINT)", columns.get(3).sqlType());
}
@Test
@ -321,6 +298,14 @@ public class DatasourceTableTest
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
assertEquals(1, facade.columnFacades().size());
ColumnFacade col = facade.columnFacades().get(0);
assertSame(spec.columns().get(0), col.spec());
assertTrue(col.isTime());
assertTrue(col.hasType());
assertSame(ColumnType.LONG, col.druidType());
}
// Time column can only have TIMESTAMP type
@ -330,14 +315,20 @@ public class DatasourceTableTest
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
DatasourceFacade facade = new DatasourceFacade(registry.resolve(table.spec()));
assertEquals(1, facade.columnFacades().size());
ColumnFacade col = facade.columnFacades().get(0);
assertSame(spec.columns().get(0), col.spec());
assertTrue(col.isTime());
assertTrue(col.hasType());
assertSame(ColumnType.LONG, col.druidType());
}
{
TableSpec spec = builder.copy()
.column(Columns.TIME_COLUMN, "TIMESTAMP('PT5M')")
.column(Columns.TIME_COLUMN, Columns.STRING)
.buildSpec();
ResolvedTable table = registry.resolve(spec);
table.validate();
expectValidationFails(spec);
}
}
@ -365,7 +356,7 @@ public class DatasourceTableTest
.property("tag1", "some value")
.property("tag2", "second value")
.column(new ColumnSpec("a", null, colProps))
.column("b", Columns.VARCHAR)
.column("b", Columns.STRING)
.buildSpec();
// Sanity check
@ -493,7 +484,7 @@ public class DatasourceTableTest
List<ColumnSpec> colUpdates = Collections.singletonList(
new ColumnSpec(
"a",
Columns.BIGINT,
Columns.LONG,
null
)
);
@ -502,7 +493,7 @@ public class DatasourceTableTest
List<ColumnSpec> columns = merged.columns();
assertEquals(1, columns.size());
assertEquals("a", columns.get(0).name());
assertEquals(Columns.BIGINT, columns.get(0).sqlType());
assertEquals(Columns.LONG, columns.get(0).dataType());
}
@Test
@ -521,12 +512,12 @@ public class DatasourceTableTest
List<ColumnSpec> colUpdates = Arrays.asList(
new ColumnSpec(
"a",
Columns.BIGINT,
Columns.LONG,
updatedProps
),
new ColumnSpec(
"c",
Columns.VARCHAR,
Columns.STRING,
null
)
);
@ -537,14 +528,14 @@ public class DatasourceTableTest
List<ColumnSpec> columns = merged.columns();
assertEquals(3, columns.size());
assertEquals("a", columns.get(0).name());
assertEquals(Columns.BIGINT, columns.get(0).sqlType());
assertEquals(Columns.LONG, columns.get(0).dataType());
Map<String, Object> colProps = columns.get(0).properties();
assertEquals(2, colProps.size());
assertEquals("new value", colProps.get("colProp1"));
assertEquals("third value", colProps.get("tag3"));
assertEquals("c", columns.get(2).name());
assertEquals(Columns.VARCHAR, columns.get(2).sqlType());
assertEquals(Columns.STRING, columns.get(2).dataType());
}
/**
@ -560,9 +551,9 @@ public class DatasourceTableTest
.description("Web server performance metrics")
.property(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000)
.hiddenColumns("foo", "bar")
.column("__time", Columns.TIMESTAMP)
.column("host", Columns.VARCHAR, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host"))
.column("bytesSent", Columns.BIGINT, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent"))
.column("__time", Columns.LONG)
.column("host", Columns.STRING, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "The web server host"))
.column("bytesSent", Columns.LONG, ImmutableMap.of(TableDefn.DESCRIPTION_PROPERTY, "Number of response bytes sent"))
.clusterColumns(new ClusterKeySpec("a", false), new ClusterKeySpec("b", true))
.sealed(true)
.buildSpec();

View File

@ -55,7 +55,7 @@ public class DelimitedInputFormatTest extends BaseExternTableTest
DelimitedFormatDefn.DELIMITER_FIELD, "|"
)
)
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -78,8 +78,8 @@ public class DelimitedInputFormatTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
.column("a", Columns.VARCHAR)
.column("b", Columns.BIGINT)
.column("a", Columns.STRING)
.column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();

View File

@ -126,7 +126,7 @@ public class ExternalTableTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -147,16 +147,16 @@ public class ExternalTableTest extends BaseExternTableTest
.inputSource(toMap(inputSource))
.inputFormat(formatToMap(format))
.description("Sample Wikipedia data")
.column("timetamp", Columns.VARCHAR)
.column("page", Columns.VARCHAR)
.column("language", Columns.VARCHAR)
.column("unpatrolled", Columns.VARCHAR)
.column("newPage", Columns.VARCHAR)
.column("robot", Columns.VARCHAR)
.column("added", Columns.VARCHAR)
.column("namespace", Columns.BIGINT)
.column("deleted", Columns.BIGINT)
.column("delta", Columns.BIGINT)
.column("timetamp", Columns.STRING)
.column("page", Columns.STRING)
.column("language", Columns.STRING)
.column("unpatrolled", Columns.STRING)
.column("newPage", Columns.STRING)
.column("robot", Columns.STRING)
.column("added", Columns.STRING)
.column("namespace", Columns.LONG)
.column("deleted", Columns.LONG)
.column("delta", Columns.LONG)
.build();
LOG.info(table.spec().toString());
}
@ -179,9 +179,9 @@ public class ExternalTableTest extends BaseExternTableTest
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "https://example.com/{}")
.description("Example parameterized external table")
.column("timetamp", Columns.VARCHAR)
.column("metric", Columns.VARCHAR)
.column("value", Columns.BIGINT)
.column("timetamp", Columns.STRING)
.column("metric", Columns.STRING)
.column("value", Columns.LONG)
.build();
LOG.info(table.spec().toString());
}

View File

@ -68,8 +68,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -83,8 +83,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://example.com/")
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -103,8 +103,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
);
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -221,8 +221,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -256,8 +256,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
.inputSource(ImmutableMap.of("type", HttpInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -305,8 +305,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
table.validate();
@ -387,8 +387,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -421,8 +421,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
.inputSource(httpToMap(inputSource))
.inputFormat(CSV_FORMAT)
.property(HttpInputSourceDefn.URI_TEMPLATE_PROPERTY, "http://foo.com/{}")
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -489,8 +489,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation

View File

@ -51,7 +51,7 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", InlineInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -63,7 +63,7 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
// No format: not valid. For inline, format must be provided to match data
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -86,7 +86,7 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("x", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -130,8 +130,8 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
args.put(InlineInputSourceDefn.DATA_PROPERTY, Arrays.asList("a,b", "c,d"));
args.put(FormattedInputSourceDefn.FORMAT_PARAMETER, CsvFormatDefn.TYPE_KEY);
final List<ColumnSpec> columns = Arrays.asList(
new ColumnSpec("a", Columns.VARCHAR, null),
new ColumnSpec("b", Columns.VARCHAR, null)
new ColumnSpec("a", Columns.STRING, null),
new ColumnSpec("b", Columns.STRING, null)
);
final TableFunction fn = defn.adHocTableFn();
@ -157,8 +157,8 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a,b\nc,d\n")))
.inputFormat(CSV_FORMAT)
.column("a", Columns.VARCHAR)
.column("b", Columns.VARCHAR)
.column("a", Columns.STRING)
.column("b", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -183,8 +183,8 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
// Cannot supply columns with the function
List<ColumnSpec> columns = Arrays.asList(
new ColumnSpec("a", Columns.VARCHAR, null),
new ColumnSpec("b", Columns.VARCHAR, null)
new ColumnSpec("a", Columns.STRING, null),
new ColumnSpec("b", Columns.STRING, null)
);
assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), columns, mapper));
}
@ -198,8 +198,8 @@ public class InlineInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a,b\nc,d")))
.inputFormat(formatToMap(format))
.column("a", Columns.VARCHAR)
.column("b", Columns.VARCHAR)
.column("a", Columns.STRING)
.column("b", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();

View File

@ -48,7 +48,7 @@ public class JsonInputFormatTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(ImmutableMap.of("type", JsonInputFormat.TYPE_KEY))
.column("a", Columns.VARCHAR)
.column("a", Columns.STRING)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -70,8 +70,8 @@ public class JsonInputFormatTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(new InlineInputSource("a\n")))
.inputFormat(formatToMap(format))
.column("a", Columns.VARCHAR)
.column("b", Columns.BIGINT)
.column("a", Columns.STRING)
.column("b", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();

View File

@ -61,8 +61,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(ImmutableMap.of("type", LocalInputSource.TYPE_KEY))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -75,8 +75,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
LocalInputSource inputSource = new LocalInputSource(new File("/tmp"), "*");
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -129,8 +129,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -150,8 +150,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
resolved.validate();
@ -169,8 +169,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(source)
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
ResolvedTable resolved = registry.resolve(table.spec());
assertThrows(IAE.class, () -> resolved.validate());
@ -321,8 +321,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -373,8 +373,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(toMap(inputSource))
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation
@ -418,8 +418,8 @@ public class LocalInputSourceDefnTest extends BaseExternTableTest
TableMetadata table = TableBuilder.external("foo")
.inputSource(BASE_DIR_ONLY)
.inputFormat(CSV_FORMAT)
.column("x", Columns.VARCHAR)
.column("y", Columns.BIGINT)
.column("x", Columns.STRING)
.column("y", Columns.LONG)
.build();
// Check validation

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.schema;
import org.apache.calcite.schema.Table;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import javax.inject.Inject;
@ -29,14 +30,17 @@ public class DruidSchema extends AbstractTableSchema
{
private final BrokerSegmentMetadataCache segmentMetadataCache;
private final DruidSchemaManager druidSchemaManager;
private final CatalogResolver catalogResolver;
@Inject
public DruidSchema(
final BrokerSegmentMetadataCache segmentMetadataCache,
final DruidSchemaManager druidSchemaManager
final DruidSchemaManager druidSchemaManager,
final CatalogResolver catalogResolver
)
{
this.segmentMetadataCache = segmentMetadataCache;
this.catalogResolver = catalogResolver;
if (druidSchemaManager != null && !(druidSchemaManager instanceof NoopDruidSchemaManager)) {
this.druidSchemaManager = druidSchemaManager;
} else {
@ -56,7 +60,7 @@ public class DruidSchema extends AbstractTableSchema
return druidSchemaManager.getTable(name);
} else {
DatasourceTable.PhysicalDatasourceMetadata dsMetadata = segmentMetadataCache.getDatasource(name);
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
return catalogResolver.resolveDatasource(name, dsMetadata);
}
}
@ -66,7 +70,7 @@ public class DruidSchema extends AbstractTableSchema
if (druidSchemaManager != null) {
return druidSchemaManager.getTableNames();
} else {
return segmentMetadataCache.getDatasourceNames();
return catalogResolver.getTableNames(segmentMetadataCache.getDatasourceNames());
}
}
}

View File

@ -23,11 +23,15 @@ import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.DataSourceInformation;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
@ -79,6 +83,24 @@ public class DatasourceTable extends DruidTable
return broadcast;
}
public Map<String, EffectiveColumnMetadata> toEffectiveColumns()
{
Map<String, EffectiveColumnMetadata> columns = new HashMap<>();
for (int i = 0; i < getRowSignature().size(); i++) {
String colName = getRowSignature().getColumnName(i);
ColumnType colType = getRowSignature().getColumnType(i).get();
EffectiveColumnMetadata colMetadata = EffectiveColumnMetadata.fromPhysical(colName, colType);
columns.put(colName, colMetadata);
}
return columns;
}
public EffectiveMetadata toEffectiveMetadata()
{
return new EffectiveMetadata(null, toEffectiveColumns(), false);
}
@Override
public boolean equals(Object o)
{
@ -115,14 +137,107 @@ public class DatasourceTable extends DruidTable
}
}
public static class EffectiveColumnMetadata
{
protected final String name;
protected final ColumnType type;
public EffectiveColumnMetadata(String name, ColumnType type)
{
this.name = name;
this.type = type;
}
public String name()
{
return name;
}
public ColumnType druidType()
{
return type;
}
public static EffectiveColumnMetadata fromPhysical(String name, ColumnType type)
{
return new EffectiveColumnMetadata(name, type);
}
@Override
public String toString()
{
return "Column{" +
"name=" + name +
", type=" + type.asTypeString() +
"}";
}
}
public static class EffectiveMetadata
{
private final DatasourceFacade catalogMetadata;
private final boolean isEmpty;
private final Map<String, EffectiveColumnMetadata> columns;
public EffectiveMetadata(
final DatasourceFacade catalogMetadata,
final Map<String, EffectiveColumnMetadata> columns,
final boolean isEmpty
)
{
this.catalogMetadata = catalogMetadata;
this.isEmpty = isEmpty;
this.columns = columns;
}
public DatasourceFacade catalogMetadata()
{
return catalogMetadata;
}
public EffectiveColumnMetadata column(String name)
{
return columns.get(name);
}
public boolean isEmpty()
{
return isEmpty;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"empty=" + isEmpty +
", columns=" + columns +
"}";
}
}
private final PhysicalDatasourceMetadata physicalMetadata;
private final EffectiveMetadata effectiveMetadata;
public DatasourceTable(
final PhysicalDatasourceMetadata physicalMetadata
)
{
super(physicalMetadata.getRowSignature());
this(
physicalMetadata.getRowSignature(),
physicalMetadata,
physicalMetadata.toEffectiveMetadata()
);
}
public DatasourceTable(
final RowSignature rowSignature,
final PhysicalDatasourceMetadata physicalMetadata,
final EffectiveMetadata effectiveMetadata
)
{
super(rowSignature);
this.physicalMetadata = physicalMetadata;
this.effectiveMetadata = effectiveMetadata;
}
@Override
@ -143,6 +258,11 @@ public class DatasourceTable extends DruidTable
return physicalMetadata.isBroadcast();
}
public EffectiveMetadata effectiveMetadata()
{
return effectiveMetadata;
}
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table)
{
@ -176,9 +296,10 @@ public class DatasourceTable extends DruidTable
public String toString()
{
// Don't include the row signature: it is the same as in
// physicalMetadata.
return "DruidTable{" +
physicalMetadata +
// effectiveMetadata.
return "DruidTable{physicalMetadata=" +
(physicalMetadata == null ? "null" : physicalMetadata.toString()) +
", effectiveMetadata=" + effectiveMetadata +
'}';
}
}

View File

@ -481,9 +481,9 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
buf.append(sig.getColumnName(i)).append(" ");
ColumnType type = sig.getColumnType(i).get();
if (type == ColumnType.STRING) {
buf.append(Columns.VARCHAR);
buf.append(Columns.SQL_VARCHAR);
} else if (type == ColumnType.LONG) {
buf.append(Columns.BIGINT);
buf.append(Columns.SQL_BIGINT);
} else if (type == ColumnType.DOUBLE) {
buf.append(Columns.DOUBLE);
} else if (type == ColumnType.FLOAT) {

View File

@ -117,6 +117,7 @@ public @interface SqlTestFrameworkConfig
private SqlTestFramework createFramework(SqlTestFrameworkConfig config)
{
SqlTestFramework.Builder builder = new SqlTestFramework.Builder(testHost)
.catalogResolver(testHost.createCatalogResolver())
.minTopNThreshold(config.minTopNThreshold())
.mergeBufferCount(config.numMergeBuffers());
return builder.build();

View File

@ -31,6 +31,7 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
@ -67,7 +68,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
cache.start();
cache.awaitInitialization();
final DruidSchema druidSchema = new DruidSchema(cache, null);
final DruidSchema druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER);
Assert.assertEquals(ImmutableSet.of(), druidSchema.getTableNames());
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.RowSignatures;
@ -72,7 +73,8 @@ public class InformationSchemaTest extends BaseCalciteQueryTest
new PlannerConfig(),
null,
new NoopDruidSchemaManager(),
CalciteTests.TEST_AUTHORIZER_MAPPER
CalciteTests.TEST_AUTHORIZER_MAPPER,
CatalogResolver.NULL_RESOLVER
);
informationSchema = new InformationSchema(

View File

@ -95,6 +95,7 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
@ -266,7 +267,7 @@ public class SystemSchemaTest extends CalciteTestBase
);
cache.start();
cache.awaitInitialization();
druidSchema = new DruidSchema(cache, null);
druidSchema = new DruidSchema(cache, null, CatalogResolver.NULL_RESOLVER);
metadataView = EasyMock.createMock(MetadataSegmentView.class);
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class);

View File

@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
@ -131,14 +132,16 @@ public class QueryFrameworkUtils
final PlannerConfig plannerConfig,
@Nullable final ViewManager viewManager,
final DruidSchemaManager druidSchemaManager,
final AuthorizerMapper authorizerMapper
final AuthorizerMapper authorizerMapper,
final CatalogResolver catalogResolver
)
{
DruidSchema druidSchema = createMockSchema(
injector,
conglomerate,
walker,
druidSchemaManager
druidSchemaManager,
catalogResolver
);
SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, authorizerMapper);
@ -193,7 +196,8 @@ public class QueryFrameworkUtils
plannerConfig,
null,
new NoopDruidSchemaManager(),
authorizerMapper
authorizerMapper,
CatalogResolver.NULL_RESOLVER
);
}
@ -201,7 +205,8 @@ public class QueryFrameworkUtils
final Injector injector,
final QueryRunnerFactoryConglomerate conglomerate,
final SpecificSegmentsQuerySegmentWalker walker,
final DruidSchemaManager druidSchemaManager
final DruidSchemaManager druidSchemaManager,
final CatalogResolver catalog
)
{
final BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache(
@ -220,7 +225,8 @@ public class QueryFrameworkUtils
{
return ImmutableSet.of(CalciteTests.BROADCAST_DATASOURCE);
}
}),
}
),
null
);
@ -233,7 +239,7 @@ public class QueryFrameworkUtils
}
cache.stop();
return new DruidSchema(cache, druidSchemaManager);
return new DruidSchema(cache, druidSchemaManager, catalog);
}
public static JoinableFactory createDefaultJoinableFactory(Injector injector)

View File

@ -155,6 +155,11 @@ public class SqlTestFramework
Injector injector
);
default CatalogResolver createCatalogResolver()
{
return CatalogResolver.NULL_RESOLVER;
}
/**
* Configure the JSON mapper.
*
@ -419,7 +424,8 @@ public class SqlTestFramework
plannerConfig,
viewManager,
componentSupplier.createSchemaManager(),
framework.authorizerMapper
framework.authorizerMapper,
framework.builder.catalogResolver
);
this.plannerFactory = new PlannerFactory(