mirror of https://github.com/apache/druid.git
SQL: Groundwork for views. (#3962)
* SQL: Groundwork for views. They are not actually exposed to users at this point, but enough is there to have some test cases in CalciteQueryTest. * Remove unused imports. * Fix injection problem.
This commit is contained in:
parent
ad477cb454
commit
64248d31b6
|
@ -23,8 +23,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -50,7 +52,10 @@ import io.druid.server.coordination.DruidServerMetadata;
|
|||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.table.DruidTable;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import io.druid.sql.calcite.view.DruidViewMacro;
|
||||
import io.druid.sql.calcite.view.ViewManager;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.calcite.schema.Function;
|
||||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.schema.impl.AbstractSchema;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -73,6 +78,7 @@ public class DruidSchema extends AbstractSchema
|
|||
private final QuerySegmentWalker walker;
|
||||
private final TimelineServerView serverView;
|
||||
private final PlannerConfig config;
|
||||
private final ViewManager viewManager;
|
||||
private final ExecutorService cacheExec;
|
||||
private final ConcurrentMap<String, Table> tables;
|
||||
|
||||
|
@ -92,12 +98,14 @@ public class DruidSchema extends AbstractSchema
|
|||
public DruidSchema(
|
||||
final QuerySegmentWalker walker,
|
||||
final TimelineServerView serverView,
|
||||
final PlannerConfig config
|
||||
final PlannerConfig config,
|
||||
final ViewManager viewManager
|
||||
)
|
||||
{
|
||||
this.walker = Preconditions.checkNotNull(walker, "walker");
|
||||
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
|
||||
this.config = Preconditions.checkNotNull(config, "config");
|
||||
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
|
||||
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
|
||||
this.tables = Maps.newConcurrentMap();
|
||||
}
|
||||
|
@ -274,6 +282,16 @@ public class DruidSchema extends AbstractSchema
|
|||
return ImmutableMap.copyOf(tables);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Multimap<String, Function> getFunctionMultimap()
|
||||
{
|
||||
final ImmutableMultimap.Builder<String, Function> builder = ImmutableMultimap.builder();
|
||||
for (Map.Entry<String, DruidViewMacro> entry : viewManager.getViews().entrySet()) {
|
||||
builder.put(entry);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private DruidTable computeTable(final String dataSource)
|
||||
{
|
||||
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
|
||||
|
|
|
@ -21,8 +21,11 @@ package io.druid.sql.calcite.schema;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
|
@ -39,16 +42,19 @@ import org.apache.calcite.schema.SchemaPlus;
|
|||
import org.apache.calcite.schema.Statistic;
|
||||
import org.apache.calcite.schema.Statistics;
|
||||
import org.apache.calcite.schema.Table;
|
||||
import org.apache.calcite.schema.TableMacro;
|
||||
import org.apache.calcite.schema.impl.AbstractSchema;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class InformationSchema extends AbstractSchema
|
||||
{
|
||||
public static final String NAME = "INFORMATION_SCHEMA";
|
||||
|
||||
private static final String EMPTY_CATALOG = "";
|
||||
private static final String SCHEMATA_TABLE = "SCHEMATA";
|
||||
private static final String TABLES_TABLE = "TABLES";
|
||||
private static final String COLUMNS_TABLE = "COLUMNS";
|
||||
|
@ -126,7 +132,7 @@ public class InformationSchema extends AbstractSchema
|
|||
{
|
||||
final SchemaPlus subSchema = rootSchema.getSubSchema(schemaName);
|
||||
return new Object[]{
|
||||
"", // CATALOG_NAME
|
||||
EMPTY_CATALOG, // CATALOG_NAME
|
||||
subSchema.getName(), // SCHEMA_NAME
|
||||
null, // SCHEMA_OWNER
|
||||
null, // DEFAULT_CHARACTER_SET_CATALOG
|
||||
|
@ -174,21 +180,44 @@ public class InformationSchema extends AbstractSchema
|
|||
public Iterable<Object[]> apply(final String schemaName)
|
||||
{
|
||||
final SchemaPlus subSchema = rootSchema.getSubSchema(schemaName);
|
||||
final Set<String> tableNames = subSchema.getTableNames();
|
||||
return FluentIterable.from(tableNames).transform(
|
||||
new Function<String, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final String tableName)
|
||||
{
|
||||
return new Object[]{
|
||||
null, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE
|
||||
};
|
||||
}
|
||||
}
|
||||
return Iterables.filter(
|
||||
Iterables.concat(
|
||||
FluentIterable.from(subSchema.getTableNames()).transform(
|
||||
new Function<String, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final String tableName)
|
||||
{
|
||||
return new Object[]{
|
||||
EMPTY_CATALOG, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE
|
||||
};
|
||||
}
|
||||
}
|
||||
),
|
||||
FluentIterable.from(subSchema.getFunctionNames()).transform(
|
||||
new Function<String, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final String functionName)
|
||||
{
|
||||
if (getView(subSchema, functionName) != null) {
|
||||
return new Object[]{
|
||||
EMPTY_CATALOG, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
functionName, // TABLE_NAME
|
||||
"VIEW" // TABLE_TYPE
|
||||
};
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
Predicates.notNull()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -230,50 +259,49 @@ public class InformationSchema extends AbstractSchema
|
|||
public Iterable<Object[]> apply(final String schemaName)
|
||||
{
|
||||
final SchemaPlus subSchema = rootSchema.getSubSchema(schemaName);
|
||||
final Set<String> tableNames = subSchema.getTableNames();
|
||||
final JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(TYPE_SYSTEM);
|
||||
return FluentIterable.from(tableNames).transformAndConcat(
|
||||
new Function<String, Iterable<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<Object[]> apply(final String tableName)
|
||||
{
|
||||
return FluentIterable
|
||||
.from(subSchema.getTable(tableName).getRowType(typeFactory).getFieldList())
|
||||
.transform(
|
||||
new Function<RelDataTypeField, Object[]>()
|
||||
|
||||
return Iterables.concat(
|
||||
Iterables.filter(
|
||||
Iterables.concat(
|
||||
FluentIterable.from(subSchema.getTableNames()).transform(
|
||||
new Function<String, Iterable<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final RelDataTypeField field)
|
||||
public Iterable<Object[]> apply(final String tableName)
|
||||
{
|
||||
final RelDataType type = field.getType();
|
||||
boolean isNumeric = SqlTypeName.NUMERIC_TYPES.contains(type.getSqlTypeName());
|
||||
boolean isCharacter = SqlTypeName.CHAR_TYPES.contains(type.getSqlTypeName());
|
||||
boolean isDateTime = SqlTypeName.DATETIME_TYPES.contains(type.getSqlTypeName());
|
||||
return new Object[]{
|
||||
"", // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
field.getName(), // COLUMN_NAME
|
||||
String.valueOf(field.getIndex()), // ORDINAL_POSITION
|
||||
"", // COLUMN_DEFAULT
|
||||
type.isNullable() ? "YES" : "NO", // IS_NULLABLE
|
||||
type.getSqlTypeName().toString(), // DATA_TYPE
|
||||
null, // CHARACTER_MAXIMUM_LENGTH
|
||||
null, // CHARACTER_OCTET_LENGTH
|
||||
isNumeric ? String.valueOf(type.getPrecision()) : null, // NUMERIC_PRECISION
|
||||
isNumeric ? "10" : null, // NUMERIC_PRECISION_RADIX
|
||||
isNumeric ? String.valueOf(type.getScale()) : null, // NUMERIC_SCALE
|
||||
isDateTime ? String.valueOf(type.getPrecision()) : null, // DATETIME_PRECISION
|
||||
isCharacter ? type.getCharset().name() : null, // CHARACTER_SET_NAME
|
||||
isCharacter ? type.getCollation().getCollationName() : null, // COLLATION_NAME
|
||||
type.getSqlTypeName().getJdbcOrdinal() // JDBC_TYPE (Druid extension)
|
||||
};
|
||||
return generateColumnMetadata(
|
||||
schemaName,
|
||||
tableName,
|
||||
subSchema.getTable(tableName),
|
||||
typeFactory
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
),
|
||||
FluentIterable.from(subSchema.getFunctionNames()).transform(
|
||||
new Function<String, Iterable<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<Object[]> apply(final String functionName)
|
||||
{
|
||||
final TableMacro viewMacro = getView(subSchema, functionName);
|
||||
if (viewMacro == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return generateColumnMetadata(
|
||||
schemaName,
|
||||
functionName,
|
||||
viewMacro.apply(ImmutableList.of()),
|
||||
typeFactory
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
Predicates.notNull()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -299,5 +327,78 @@ public class InformationSchema extends AbstractSchema
|
|||
{
|
||||
return TableType.SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Iterable<Object[]> generateColumnMetadata(
|
||||
final String schemaName,
|
||||
final String tableName,
|
||||
final Table table,
|
||||
final RelDataTypeFactory typeFactory
|
||||
)
|
||||
{
|
||||
if (table == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return FluentIterable
|
||||
.from(table.getRowType(typeFactory).getFieldList())
|
||||
.transform(
|
||||
new Function<RelDataTypeField, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(final RelDataTypeField field)
|
||||
{
|
||||
final RelDataType type = field.getType();
|
||||
boolean isNumeric = SqlTypeName.NUMERIC_TYPES.contains(type.getSqlTypeName());
|
||||
boolean isCharacter = SqlTypeName.CHAR_TYPES.contains(type.getSqlTypeName());
|
||||
boolean isDateTime = SqlTypeName.DATETIME_TYPES.contains(type.getSqlTypeName());
|
||||
return new Object[]{
|
||||
EMPTY_CATALOG, // TABLE_CATALOG
|
||||
schemaName, // TABLE_SCHEMA
|
||||
tableName, // TABLE_NAME
|
||||
field.getName(), // COLUMN_NAME
|
||||
String.valueOf(field.getIndex()), // ORDINAL_POSITION
|
||||
"", // COLUMN_DEFAULT
|
||||
type.isNullable() ? "YES" : "NO", // IS_NULLABLE
|
||||
type.getSqlTypeName().toString(), // DATA_TYPE
|
||||
null, // CHARACTER_MAXIMUM_LENGTH
|
||||
null, // CHARACTER_OCTET_LENGTH
|
||||
isNumeric ? String.valueOf(type.getPrecision()) : null, // NUMERIC_PRECISION
|
||||
isNumeric ? "10" : null, // NUMERIC_PRECISION_RADIX
|
||||
isNumeric ? String.valueOf(type.getScale()) : null, // NUMERIC_SCALE
|
||||
isDateTime ? String.valueOf(type.getPrecision()) : null, // DATETIME_PRECISION
|
||||
isCharacter ? type.getCharset().name() : null, // CHARACTER_SET_NAME
|
||||
isCharacter ? type.getCollation().getCollationName() : null, // COLLATION_NAME
|
||||
type.getSqlTypeName().getJdbcOrdinal() // JDBC_TYPE (Druid extension)
|
||||
};
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a view macro that may or may not be defined in a certain schema. If it's not defined, returns null.
|
||||
*
|
||||
* @param schemaPlus schema
|
||||
* @param functionName function name
|
||||
*
|
||||
* @return view, or null
|
||||
*/
|
||||
@Nullable
|
||||
private static TableMacro getView(final SchemaPlus schemaPlus, final String functionName)
|
||||
{
|
||||
// Look for a zero-arg function that is also a TableMacro. The returned value
|
||||
// is never null so we don't need to check for that.
|
||||
final Collection<org.apache.calcite.schema.Function> functions =
|
||||
schemaPlus.getFunctions(functionName);
|
||||
|
||||
for (org.apache.calcite.schema.Function function : functions) {
|
||||
if (function.getParameters().isEmpty() && function instanceof TableMacro) {
|
||||
return (TableMacro) function;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.view;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.sql.calcite.planner.DruidPlanner;
|
||||
import io.druid.sql.calcite.planner.PlannerFactory;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rel.type.RelDataTypeImpl;
|
||||
import org.apache.calcite.schema.FunctionParameter;
|
||||
import org.apache.calcite.schema.TableMacro;
|
||||
import org.apache.calcite.schema.TranslatableTable;
|
||||
import org.apache.calcite.schema.impl.ViewTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DruidViewMacro implements TableMacro
|
||||
{
|
||||
private final PlannerFactory plannerFactory;
|
||||
private final String viewSql;
|
||||
|
||||
public DruidViewMacro(final PlannerFactory plannerFactory, final String viewSql)
|
||||
{
|
||||
this.plannerFactory = plannerFactory;
|
||||
this.viewSql = viewSql;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TranslatableTable apply(final List<Object> arguments)
|
||||
{
|
||||
final RelDataType rowType;
|
||||
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
|
||||
rowType = planner.plan(viewSql).rowType();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return new ViewTable(
|
||||
null,
|
||||
RelDataTypeImpl.proto(rowType),
|
||||
viewSql,
|
||||
ImmutableList.of(DruidSchema.NAME),
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FunctionParameter> getParameters()
|
||||
{
|
||||
return ImmutableList.of();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.view;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.sql.calcite.planner.PlannerFactory;
|
||||
import org.apache.calcite.schema.TableMacro;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* View manager that stores all views in-process. Not meant for serious usage, since views are not saved nor
|
||||
* are they shared across processes.
|
||||
*/
|
||||
public class InProcessViewManager implements ViewManager
|
||||
{
|
||||
private final ConcurrentMap<String, DruidViewMacro> views;
|
||||
|
||||
@Inject
|
||||
public InProcessViewManager()
|
||||
{
|
||||
this.views = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public void createView(final PlannerFactory plannerFactory, final String viewName, final String viewSql)
|
||||
{
|
||||
final TableMacro oldValue = views.putIfAbsent(viewName, new DruidViewMacro(plannerFactory, viewSql));
|
||||
if (oldValue != null) {
|
||||
throw new ISE("View[%s] already exists", viewName);
|
||||
}
|
||||
}
|
||||
|
||||
public void alterView(final PlannerFactory plannerFactory, final String viewName, final String viewSql)
|
||||
{
|
||||
final TableMacro oldValue = views.replace(viewName, new DruidViewMacro(plannerFactory, viewSql));
|
||||
if (oldValue != null) {
|
||||
throw new ISE("View[%s] does not exist", viewName);
|
||||
}
|
||||
}
|
||||
|
||||
public void dropView(final String viewName)
|
||||
{
|
||||
final TableMacro oldValue = views.remove(viewName);
|
||||
if (oldValue == null) {
|
||||
throw new ISE("View[%s] does not exist", viewName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DruidViewMacro> getViews()
|
||||
{
|
||||
return views;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.view;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.sql.calcite.planner.PlannerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* View manager that does not support views.
|
||||
*/
|
||||
public class NoopViewManager implements ViewManager
|
||||
{
|
||||
@Override
|
||||
public void createView(final PlannerFactory plannerFactory, final String viewName, final String viewSql)
|
||||
{
|
||||
throw new UnsupportedOperationException("Noop view manager cannot do anything");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void alterView(final PlannerFactory plannerFactory, final String viewName, final String viewSql)
|
||||
{
|
||||
throw new UnsupportedOperationException("Noop view manager cannot do anything");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropView(final String viewName)
|
||||
{
|
||||
throw new UnsupportedOperationException("Noop view manager cannot do anything");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DruidViewMacro> getViews()
|
||||
{
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.view;
|
||||
|
||||
import io.druid.sql.calcite.planner.PlannerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* View managers allow {@link io.druid.sql.calcite.schema.DruidSchema} to support views. They must be
|
||||
* thread-safe.
|
||||
*/
|
||||
public interface ViewManager
|
||||
{
|
||||
void createView(final PlannerFactory plannerFactory, final String viewName, final String viewSql);
|
||||
|
||||
void alterView(final PlannerFactory plannerFactory, final String viewName, final String viewSql);
|
||||
|
||||
void dropView(final String viewName);
|
||||
|
||||
Map<String, DruidViewMacro> getViews();
|
||||
}
|
|
@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Provider;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
|
@ -45,6 +45,8 @@ import io.druid.sql.calcite.expression.SubstringExtractionOperator;
|
|||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import io.druid.sql.calcite.view.NoopViewManager;
|
||||
import io.druid.sql.calcite.view.ViewManager;
|
||||
import io.druid.sql.http.SqlResource;
|
||||
import org.apache.calcite.schema.SchemaPlus;
|
||||
|
||||
|
@ -85,6 +87,8 @@ public class SqlModule implements Module
|
|||
JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
|
||||
LifecycleModule.register(binder, DruidSchema.class);
|
||||
binder.bind(ViewManager.class).to(NoopViewManager.class);
|
||||
binder.bind(SchemaPlus.class).toProvider(SchemaPlusProvider.class);
|
||||
|
||||
for (Class<? extends SqlAggregator> clazz : DEFAULT_AGGREGATOR_CLASSES) {
|
||||
SqlBindings.addAggregator(binder, clazz);
|
||||
|
@ -106,13 +110,15 @@ public class SqlModule implements Module
|
|||
}
|
||||
}
|
||||
|
||||
@Provides
|
||||
public SchemaPlus createRootSchema(final DruidSchema druidSchema)
|
||||
public static class SchemaPlusProvider implements Provider<SchemaPlus>
|
||||
{
|
||||
if (isEnabled()) {
|
||||
@Inject
|
||||
private DruidSchema druidSchema;
|
||||
|
||||
@Override
|
||||
public SchemaPlus get()
|
||||
{
|
||||
return Calcites.createRootSchema(druidSchema);
|
||||
} else {
|
||||
throw new IllegalStateException("Cannot provide SchemaPlus when SQL is disabled.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -272,13 +272,13 @@ public class DruidAvaticaHandlerTest
|
|||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ROW(
|
||||
Pair.of("TABLE_CAT", null),
|
||||
Pair.of("TABLE_CAT", ""),
|
||||
Pair.of("TABLE_NAME", "foo"),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
),
|
||||
ROW(
|
||||
Pair.of("TABLE_CAT", null),
|
||||
Pair.of("TABLE_CAT", ""),
|
||||
Pair.of("TABLE_NAME", "foo2"),
|
||||
Pair.of("TABLE_SCHEM", "druid"),
|
||||
Pair.of("TABLE_TYPE", "TABLE")
|
||||
|
|
|
@ -92,6 +92,7 @@ import io.druid.sql.calcite.schema.DruidSchema;
|
|||
import io.druid.sql.calcite.util.CalciteTests;
|
||||
import io.druid.sql.calcite.util.QueryLogHook;
|
||||
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
|
||||
import io.druid.sql.calcite.view.InProcessViewManager;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.schema.SchemaPlus;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -264,11 +265,13 @@ public class CalciteQueryTest
|
|||
testQuery(
|
||||
"SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n"
|
||||
+ "FROM INFORMATION_SCHEMA.TABLES\n"
|
||||
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE')",
|
||||
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
|
||||
ImmutableList.<Query>of(),
|
||||
ImmutableList.of(
|
||||
new Object[]{"druid", "foo", "TABLE"},
|
||||
new Object[]{"druid", "foo2", "TABLE"},
|
||||
new Object[]{"druid", "aview", "VIEW"},
|
||||
new Object[]{"druid", "bview", "VIEW"},
|
||||
new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"},
|
||||
new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"},
|
||||
new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}
|
||||
|
@ -277,7 +280,7 @@ public class CalciteQueryTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInformationSchemaColumns() throws Exception
|
||||
public void testInformationSchemaColumnsOnTable() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE\n"
|
||||
|
@ -295,6 +298,20 @@ public class CalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInformationSchemaColumnsOnView() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE\n"
|
||||
+ "FROM INFORMATION_SCHEMA.COLUMNS\n"
|
||||
+ "WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = 'aview'",
|
||||
ImmutableList.<Query>of(),
|
||||
ImmutableList.of(
|
||||
new Object[]{"dim1_firstchar", "VARCHAR", "NO"}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplainInformationSchemaColumns() throws Exception
|
||||
{
|
||||
|
@ -970,6 +987,49 @@ public class CalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountStarOnView() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT COUNT(*) FROM druid.aview WHERE dim1_firstchar <> 'z'",
|
||||
ImmutableList.<Query>of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.filters(AND(
|
||||
SELECTOR("dim2", "a", null),
|
||||
NOT(SELECTOR("dim1", "z", new SubstringDimExtractionFn(0, 1)))
|
||||
))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{2L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplainCountStarOnView() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"EXPLAIN PLAN FOR SELECT COUNT(*) FROM aview WHERE dim1_firstchar <> 'z'",
|
||||
ImmutableList.<Query>of(),
|
||||
ImmutableList.of(
|
||||
new Object[]{
|
||||
"DruidQueryRel(dataSource=[foo], "
|
||||
+ "filter=[(dim2 = a && !substring(0, 1)(dim1) = z)], "
|
||||
+ "dimensions=[[]], "
|
||||
+ "aggregations=[[Aggregation{aggregatorFactories=[CountAggregatorFactory{name='a0'}], "
|
||||
+ "postAggregator=null, "
|
||||
+ "finalizingPostAggregatorFactory=null}]])\n"
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountStarWithLikeFilter() throws Exception
|
||||
{
|
||||
|
@ -2810,6 +2870,50 @@ public class CalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterOnCurrentTimestampOnView() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT * FROM bview",
|
||||
ImmutableList.<Query>of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(new Interval("2000-01-02/2002")))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{5L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterOnCurrentTimestampLosAngelesOnView() throws Exception
|
||||
{
|
||||
// Tests that query context still applies to view SQL; note the result is different from
|
||||
// "testFilterOnCurrentTimestampOnView" above.
|
||||
|
||||
testQuery(
|
||||
PlannerContext.create(PLANNER_CONFIG_DEFAULT, QUERY_CONTEXT_LOS_ANGELES),
|
||||
"SELECT * FROM bview",
|
||||
ImmutableList.<Query>of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(new Interval("2000-01-02T08Z/2002-01-01T08Z")))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(AGGS(new CountAggregatorFactory("a0")))
|
||||
.context(TIMESERIES_CONTEXT_LOS_ANGELES)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{4L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterOnNotTimeFloor() throws Exception
|
||||
{
|
||||
|
@ -3892,10 +3996,26 @@ public class CalciteQueryTest
|
|||
) throws Exception
|
||||
{
|
||||
final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
|
||||
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig);
|
||||
final InProcessViewManager viewManager = new InProcessViewManager();
|
||||
final DruidSchema druidSchema = CalciteTests.createMockSchema(walker, plannerConfig, viewManager);
|
||||
final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema);
|
||||
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
|
||||
|
||||
final PlannerFactory plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
|
||||
|
||||
viewManager.createView(
|
||||
plannerFactory,
|
||||
"aview",
|
||||
"SELECT SUBSTRING(dim1, 1, 1) AS dim1_firstchar FROM foo WHERE dim2 = 'a'"
|
||||
);
|
||||
|
||||
viewManager.createView(
|
||||
plannerFactory,
|
||||
"bview",
|
||||
"SELECT COUNT(*) FROM druid.foo\n"
|
||||
+ "WHERE __time >= CURRENT_TIMESTAMP + INTERVAL '1' DAY AND __time < TIMESTAMP '2002-01-01 00:00:00'"
|
||||
);
|
||||
|
||||
try (DruidPlanner planner = plannerFactory.createPlanner(plannerContext.getQueryContext())) {
|
||||
final PlannerResult plan = planner.plan(sql);
|
||||
return Sequences.toList(plan.run(), Lists.<Object[]>newArrayList());
|
||||
|
|
|
@ -38,6 +38,7 @@ import io.druid.sql.calcite.table.DruidTable;
|
|||
import io.druid.sql.calcite.util.CalciteTests;
|
||||
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
|
||||
import io.druid.sql.calcite.util.TestServerInventoryView;
|
||||
import io.druid.sql.calcite.view.NoopViewManager;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.LinearShardSpec;
|
||||
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
|
||||
|
@ -148,7 +149,8 @@ public class DruidSchemaTest
|
|||
schema = new DruidSchema(
|
||||
walker,
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
PLANNER_CONFIG_DEFAULT
|
||||
PLANNER_CONFIG_DEFAULT,
|
||||
new NoopViewManager()
|
||||
);
|
||||
|
||||
schema.start();
|
||||
|
|
|
@ -80,6 +80,8 @@ import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
|||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import io.druid.sql.calcite.view.NoopViewManager;
|
||||
import io.druid.sql.calcite.view.ViewManager;
|
||||
import io.druid.sql.guice.SqlModule;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.LinearShardSpec;
|
||||
|
@ -360,11 +362,21 @@ public class CalciteTests
|
|||
final SpecificSegmentsQuerySegmentWalker walker,
|
||||
final PlannerConfig plannerConfig
|
||||
)
|
||||
{
|
||||
return createMockSchema(walker, plannerConfig, new NoopViewManager());
|
||||
}
|
||||
|
||||
public static DruidSchema createMockSchema(
|
||||
final SpecificSegmentsQuerySegmentWalker walker,
|
||||
final PlannerConfig plannerConfig,
|
||||
final ViewManager viewManager
|
||||
)
|
||||
{
|
||||
final DruidSchema schema = new DruidSchema(
|
||||
walker,
|
||||
new TestServerInventoryView(walker.getSegments()),
|
||||
plannerConfig
|
||||
plannerConfig,
|
||||
viewManager
|
||||
);
|
||||
|
||||
schema.start();
|
||||
|
|
Loading…
Reference in New Issue