Validate target dataSource for INSERT. (#12129)

This commit is contained in:
Gian Merlino 2022-01-18 09:34:23 -08:00 committed by GitHub
parent 7bdb9ebdf1
commit cf7191d2bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 2 deletions

View File

@ -66,6 +66,7 @@ import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException; import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair; import org.apache.calcite.util.Pair;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
@ -663,26 +664,36 @@ public class DruidPlanner implements Closeable
} }
final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
final String dataSource;
if (tableIdentifier.names.isEmpty()) { if (tableIdentifier.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in case. // I don't think this can happen, but include a branch for it just in case.
throw new ValidationException("INSERT requires target table."); throw new ValidationException("INSERT requires target table.");
} else if (tableIdentifier.names.size() == 1) { } else if (tableIdentifier.names.size() == 1) {
// Unqualified name. // Unqualified name.
return Iterables.getOnlyElement(tableIdentifier.names); dataSource = Iterables.getOnlyElement(tableIdentifier.names);
} else { } else {
// Qualified name. // Qualified name.
final String defaultSchemaName = final String defaultSchemaName =
Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null)); Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null));
if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) { if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
return tableIdentifier.names.get(1); dataSource = tableIdentifier.names.get(1);
} else { } else {
throw new ValidationException( throw new ValidationException(
StringUtils.format("Cannot INSERT into [%s] because it is not a Druid datasource.", tableIdentifier) StringUtils.format("Cannot INSERT into [%s] because it is not a Druid datasource.", tableIdentifier)
); );
} }
} }
try {
IdUtils.validateId("INSERT dataSource", dataSource);
}
catch (IllegalArgumentException e) {
throw new ValidationException(e.getMessage());
}
return dataSource;
} }
private String buildSQLPlanningErrorMessage(Throwable exception) private String buildSQLPlanningErrorMessage(Throwable exception)

View File

@ -181,6 +181,15 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.verify(); .verify();
} }
@Test
public void testInsertIntoInvalidDataSourceName()
{
testInsertQuery()
.sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo")
.expectValidationError(SqlPlanningException.class, "INSERT dataSource cannot contain the '/' character.")
.verify();
}
@Test @Test
public void testInsertUsingColumnList() public void testInsertUsingColumnList()
{ {