Add ability to turn off Druid Catalog specific validation done on catalog defined tables in Druid (#16465)

* * add property to enable / disable catalog validation and add tests

* * add integration tests for catalog validation disabled

* * add integration tests

* * remove debugging logs

* * fix forbidden api call
This commit is contained in:
zachjsh 2024-05-23 13:19:51 -04:00 committed by GitHub
parent 204a25d3e6
commit b0cc1ee84b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 387 additions and 2 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.testsEx.catalog;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
@ -29,14 +30,21 @@ import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.cluster.CatalogClient;
import org.apache.druid.testsEx.cluster.DruidClusterClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import static org.junit.Assert.assertTrue;
/**
* Tests that expect succesfully ingestng data into catalog defined tables and querying the data
* gives expected results.
@ -445,4 +453,171 @@ public abstract class ITCatalogIngestAndQueryTest
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Adding a new column during ingestion that is not defined in a sealed table, should fail with
* proper validation error. Disabling catalog validation, through context parameter, and issuing ingest
* query again, should succeed.
*/
@Test
public void testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled() throws Exception
{
String queryFile = "/catalog/sealedWithValidationDisabled_select.sql";
String tableName = "testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabled" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " f AS extra\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n";
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(
sqlQueryFromString(
queryInline,
ImmutableMap.of()
),
null,
null,
HttpResponseStatus.BAD_REQUEST
);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
StringUtils.format("Column [extra] is not defined in the target table [druid.%s] strict schema", tableName))
);
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
queryInline,
ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false)
);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, should result in a proper validation error. Disabling catalog validation, through context parameter, and
* issuing ingest query again, should succeed.
*
* In this test we define the table as
* <p>
* __time LONG
* double_col DOUBLE
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* <p>
* even though the data is written
* as
* <p>
* 2022-12-26T12:34:56,extra
* <p>
* When querying the table with query: 'SELECT * from ##tableName', the data is returned as:
* <p>
* __time, double_col
* 2022-12-26T12:34:56,0.0
* <p>
* because the broker knows the double_col column to be a DOUBLE, and so converts to null (0.0) at query time.
*/
@Test
public void testInsertWithIncompatibleTypeAssignmentWithValidationDisabled() throws Exception
{
String tableName = "testInsertWithIncompatibleTypeAssignmentWithValidationDisabled" + operationName;
String queryFile = "/catalog/incompatibleTypeAssignmentWithValidationDisabled_select.sql";
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("double_col", "DOUBLE")
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS double_col\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
tableName
);
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(
sqlQueryFromString(
queryInline,
ImmutableMap.of()
),
null,
null,
HttpResponseStatus.BAD_REQUEST
);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Cannot assign to target field 'double_col' of type DOUBLE from source field 'double_col' of type VARCHAR (line [4], column [3])")
);
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
queryInline,
ImmutableMap.of(QueryContexts.CATALOG_VALIDATION_ENABLED, false)
);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
private static SqlQuery sqlQueryFromString(String queryString, Map<String, Object> context)
{
return new SqlQuery(queryString, null, false, false, false, context, null);
}
}

View File

@ -0,0 +1,11 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"double_col": 0.0
}
]
}
]

View File

@ -0,0 +1,11 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"extra": "foo"
}
]
}
]

View File

@ -621,6 +621,14 @@ public class QueryContext
);
}
public boolean isCatalogValidationEnabled()
{
return getBoolean(
QueryContexts.CATALOG_VALIDATION_ENABLED,
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED
);
}
public QueryResourceId getQueryResourceId()
{
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));

View File

@ -88,6 +88,7 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";
@ -126,6 +127,7 @@ public class QueryContexts
public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;
public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true;
public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize

View File

@ -160,6 +160,24 @@ public class QueryContextsTest
);
}
@Test
public void testCatalogValidationEnabled()
{
Assert.assertEquals(
QueryContexts.DEFAULT_CATALOG_VALIDATION_ENABLED,
QueryContext.empty().isCatalogValidationEnabled()
);
Assert.assertTrue(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
true
)).isCatalogValidationEnabled());
Assert.assertFalse(QueryContext.of(ImmutableMap.of(
QueryContexts.CATALOG_VALIDATION_ENABLED,
false
)).isCatalogValidationEnabled());
}
@Test
public void testGetEnableJoinLeftScanDirect()
{

View File

@ -534,9 +534,12 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
);
}
}
if (tableMetadata == null) {
final boolean isCatalogValidationEnabled = plannerContext.queryContext().isCatalogValidationEnabled();
if (tableMetadata == null || !isCatalogValidationEnabled) {
return sourceType;
}
// disable sealed mode validation if catalog validation is disabled.
final boolean isStrict = tableMetadata.isSealed();
final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
for (RelDataTypeField sourceField : sourceFields) {
@ -592,6 +595,8 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
// matches above.
final RelDataType targetType = typeFactory.createStructType(fields);
final SqlValidatorTable target = insertNs.resolve().getTable();
// disable type checking if catalog validation is disabled.
checkTypeAssignment(scope, target, sourceType, targetType, insert);
return targetType;
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -56,9 +57,20 @@ import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@SqlTestFrameworkConfig.ComponentSupplier(CatalogIngestionDmlComponentSupplier.class)
public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
{
private static final Map<String, Object> CONTEXT_WITH_VALIDATION_DISABLED;
static {
CONTEXT_WITH_VALIDATION_DISABLED = new HashMap<>(DEFAULT_CONTEXT);
CONTEXT_WITH_VALIDATION_DISABLED.put(QueryContexts.CATALOG_VALIDATION_ENABLED, false);
}
private final String operationName;
private final String dmlPrefixPattern;
@ -919,7 +931,7 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2\n" +
" d AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
@ -933,6 +945,65 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a sealed table, when catalog validation is disabled,
* should plan accordingly.
*/
@Test
public void testInsertAddNonDefinedColumnIntoSealedCatalogTableAndValidationDisabled()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.LONG)
.add("extra2", ColumnType.STRING)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" d AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("fooSealed", signature)
.expectResources(dataSourceWrite("fooSealed"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "c", "d", "v0", "v1")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Inserting into a catalog table with a WITH source succeeds
@ -1104,6 +1175,10 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
.verify();
}
/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, should result in a proper validation error.
*/
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignment()
{
@ -1120,6 +1195,48 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
.verify();
}
/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, when catalog validation is disabled, should plan accordingly.
*/
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled()
{
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING_ARRAY)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "v0")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
* the column, should result in a proper validation error.
*/
@Test
public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment()
{
@ -1135,4 +1252,42 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
"Cannot assign to target field 'unique_dim1' of type COMPLEX<hyperUnique> from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}
/**
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
* the column, when catalog validation is disabled, should plan accordingly.
*/
@Test
public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignmentAndValidationDisabled()
{
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("unique_dim1", ColumnType.STRING_ARRAY)
.build();
testIngestionQuery()
.context(CONTEXT_WITH_VALIDATION_DISABLED)
.sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS unique_dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "array(\"dim1\")", ColumnType.STRING_ARRAY)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "v0")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
}